You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/04/14 17:19:24 UTC

flink git commit: [FLINK-1753] [streaming] Added Kafka broker failure test

Repository: flink
Updated Branches:
  refs/heads/master e5a3b95a2 -> cb34e976d


[FLINK-1753] [streaming] Added Kafka broker failure test

This closes #589


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cb34e976
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cb34e976
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cb34e976

Branch: refs/heads/master
Commit: cb34e976d7c71e87a3fd8c4902f761c70cc453ec
Parents: e5a3b95
Author: Gábor Hermann <re...@gmail.com>
Authored: Mon Mar 23 14:18:55 2015 +0100
Committer: Robert Metzger <rm...@apache.org>
Committed: Tue Apr 14 15:33:27 2015 +0200

----------------------------------------------------------------------
 docs/streaming_guide.md                         |  23 +-
 .../connectors/kafka/api/KafkaSink.java         |  86 ++++---
 .../kafka/api/simple/KafkaTopicUtils.java       | 152 +++++++++++--
 .../kafka/api/simple/PersistentKafkaSource.java |   8 +-
 .../KafkaMultiplePartitionsIterator.java        |  16 +-
 .../iterator/KafkaSinglePartitionIterator.java  | 142 +++++++++---
 .../kafka/api/simple/offset/KafkaOffset.java    |  24 +-
 .../streaming/connectors/kafka/KafkaITCase.java | 223 +++++++++++++++----
 .../connectors/kafka/KafkaTopicUtilsTest.java   | 152 +++++++++++++
 .../kafka/util/KafkaLocalSystemTime.java        |  48 ++++
 10 files changed, 730 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cb34e976/docs/streaming_guide.md
----------------------------------------------------------------------
diff --git a/docs/streaming_guide.md b/docs/streaming_guide.md
index 3a9a1c4..47b4509 100644
--- a/docs/streaming_guide.md
+++ b/docs/streaming_guide.md
@@ -1257,12 +1257,12 @@ Example:
 <div class="codetabs" markdown="1">
 <div data-lang="java" markdown="1">
 {% highlight java %}
-stream.addSink(new PersistentKafkaSource<String>("localhost:2181", "test", new SimpleStringSchema()));
+stream.addSource(new PersistentKafkaSource<String>("localhost:2181", "test", new SimpleStringSchema()));
 {% endhighlight %}
 </div>
 <div data-lang="scala" markdown="1">
 {% highlight scala %}
-stream.addSink(new PersistentKafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
+stream.addSource(new PersistentKafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
 {% endhighlight %}
 </div>
 </div>
@@ -1291,6 +1291,25 @@ stream.addSink(new KafkaSink[String]("localhost:2181", "test", new SimpleStringS
 </div>
 </div>
 
+The user can also define custom Kafka producer configuration for the KafkaSink with the constructor:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+public KafkaSink(String zookeeperAddress, String topicId, Properties producerConfig,
+      SerializationSchema<IN, byte[]> serializationSchema)
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+public KafkaSink(String zookeeperAddress, String topicId, Properties producerConfig,
+      SerializationSchema serializationSchema)
+{% endhighlight %}
+</div>
+</div>
+
+If this constructor is used, the user needs to make sure to set the broker with the "metadata.broker.list" property. Also the serializer configuration should be left default, the serialization should be set via SerializationSchema.
+
 More about Kafka can be found [here](https://kafka.apache.org/documentation.html).
 
 [Back to top](#top)

http://git-wip-us.apache.org/repos/asf/flink/blob/cb34e976/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
index 0bbf9a7..376d96f 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.kafka.api;
 
+import java.util.Map;
 import java.util.Properties;
 
 import org.apache.flink.api.java.ClosureCleaner;
@@ -27,6 +28,8 @@ import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
 import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
 import org.apache.flink.util.NetUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
 
@@ -45,8 +48,10 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 
 	private static final long serialVersionUID = 1L;
 
+	private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
+
 	private Producer<IN, byte[]> producer;
-	private Properties props;
+	private Properties userDefinedProperties;
 	private String topicId;
 	private String zookeeperAddress;
 	private SerializationSchema<IN, byte[]> schema;
@@ -54,8 +59,8 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 	private Class<? extends SerializableKafkaPartitioner> partitionerClass = null;
 
 	/**
-	 * Creates a KafkaSink for a given topic. The partitioner distributes the
-	 * messages between the partitions of the topics.
+	 * Creates a KafkaSink for a given topic. The sink produces its input to
+	 * the topic.
 	 *
 	 * @param zookeeperAddress
 	 * 		Address of the Zookeeper host (with port number).
@@ -64,27 +69,27 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 	 * @param serializationSchema
 	 * 		User defined serialization schema.
 	 */
-	@SuppressWarnings({ "rawtypes", "unchecked" })
 	public KafkaSink(String zookeeperAddress, String topicId,
 			SerializationSchema<IN, byte[]> serializationSchema) {
-		this(zookeeperAddress, topicId, serializationSchema, (Class) null);
+		this(zookeeperAddress, topicId, new Properties(), serializationSchema);
 	}
 
 	/**
-	 * Creates a KafkaSink for a given topic. The sink produces its input into
-	 * the topic.
+	 * Creates a KafkaSink for a given topic with custom Producer configuration.
+	 * If you use this constructor, the broker should be set with the "metadata.broker.list"
+	 * configuration.
 	 *
 	 * @param zookeeperAddress
 	 * 		Address of the Zookeeper host (with port number).
 	 * @param topicId
 	 * 		ID of the Kafka topic.
+	 * @param producerConfig
+	 * 		Configurations of the Kafka producer
 	 * @param serializationSchema
 	 * 		User defined serialization schema.
-	 * @param partitioner
-	 * 		User defined partitioner.
 	 */
-	public KafkaSink(String zookeeperAddress, String topicId,
-			SerializationSchema<IN, byte[]> serializationSchema, SerializableKafkaPartitioner partitioner) {
+	public KafkaSink(String zookeeperAddress, String topicId, Properties producerConfig,
+			SerializationSchema<IN, byte[]> serializationSchema) {
 		NetUtils.ensureCorrectHostnamePort(zookeeperAddress);
 		Preconditions.checkNotNull(topicId, "TopicID not set");
 		ClosureCleaner.ensureSerializable(partitioner);
@@ -92,18 +97,32 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 		this.zookeeperAddress = zookeeperAddress;
 		this.topicId = topicId;
 		this.schema = serializationSchema;
+		this.partitionerClass = null;
+		this.userDefinedProperties = producerConfig;
+	}
+
+	/**
+	 * Creates a KafkaSink for a given topic. The sink produces its input to
+	 * the topic.
+	 *
+	 * @param zookeeperAddress
+	 * 		Address of the Zookeeper host (with port number).
+	 * @param topicId
+	 * 		ID of the Kafka topic.
+	 * @param serializationSchema
+	 * 		User defined serialization schema.
+	 * @param partitioner
+	 * 		User defined partitioner.
+	 */
+	public KafkaSink(String zookeeperAddress, String topicId,
+			SerializationSchema<IN, byte[]> serializationSchema, SerializableKafkaPartitioner partitioner) {
+		this(zookeeperAddress, topicId, serializationSchema);
 		this.partitioner = partitioner;
 	}
 
 	public KafkaSink(String zookeeperAddress, String topicId,
 			SerializationSchema<IN, byte[]> serializationSchema, Class<? extends SerializableKafkaPartitioner> partitioner) {
-		NetUtils.ensureCorrectHostnamePort(zookeeperAddress);
-		Preconditions.checkNotNull(topicId, "TopicID not set");
-		ClosureCleaner.ensureSerializable(partitioner);
-
-		this.zookeeperAddress = zookeeperAddress;
-		this.topicId = topicId;
-		this.schema = serializationSchema;
+		this(zookeeperAddress, topicId, serializationSchema);
 		this.partitionerClass = partitioner;
 	}
 
@@ -114,33 +133,42 @@ public class KafkaSink<IN> extends RichSinkFunction<IN> {
 	public void open(Configuration configuration) {
 
 		KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperAddress);
-		String brokerAddress = kafkaTopicUtils.getLeaderBrokerAddressForTopic(topicId);
+		String listOfBrokers = kafkaTopicUtils.getBrokerList(topicId);
 
-		props = new Properties();
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Broker list: {}", listOfBrokers);
+		}
+
+		Properties properties = new Properties();
 
-		props.put("metadata.broker.list", brokerAddress);
-		props.put("request.required.acks", "1");
+		properties.put("metadata.broker.list", listOfBrokers);
+		properties.put("request.required.acks", "-1");
+		properties.put("message.send.max.retries", "10");
 
-		props.put("serializer.class", DefaultEncoder.class.getCanonicalName());
+		properties.put("serializer.class", DefaultEncoder.class.getCanonicalName());
 
 		// this will not be used as the key will not be serialized
-		props.put("key.serializer.class", DefaultEncoder.class.getCanonicalName());
+		properties.put("key.serializer.class", DefaultEncoder.class.getCanonicalName());
+
+		for (Map.Entry<Object, Object> propertiesEntry : userDefinedProperties.entrySet()) {
+			properties.put(propertiesEntry.getKey(), propertiesEntry.getValue());
+		}
 
 		if (partitioner != null) {
-			props.put("partitioner.class", PartitionerWrapper.class.getCanonicalName());
+			properties.put("partitioner.class", PartitionerWrapper.class.getCanonicalName());
 			// java serialization will do the rest.
-			props.put(PartitionerWrapper.SERIALIZED_WRAPPER_NAME, partitioner);
+			properties.put(PartitionerWrapper.SERIALIZED_WRAPPER_NAME, partitioner);
 		}
 		if (partitionerClass != null) {
-			props.put("partitioner.class", partitionerClass);
+			properties.put("partitioner.class", partitionerClass);
 		}
 
-		ProducerConfig config = new ProducerConfig(props);
+		ProducerConfig config = new ProducerConfig(properties);
 
 		try {
 			producer = new Producer<IN, byte[]>(config);
 		} catch (NullPointerException e) {
-			throw new RuntimeException("Cannot connect to Kafka broker " + brokerAddress, e);
+			throw new RuntimeException("Cannot connect to Kafka broker " + listOfBrokers, e);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cb34e976/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
index 9e09ea8..365961d 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
@@ -19,7 +19,10 @@ package org.apache.flink.streaming.connectors.kafka.api.simple;
 
 import java.io.UnsupportedEncodingException;
 import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Properties;
+import java.util.Set;
 
 import org.I0Itec.zkclient.ZkClient;
 import org.I0Itec.zkclient.exception.ZkMarshallingError;
@@ -31,6 +34,8 @@ import kafka.admin.AdminUtils;
 import kafka.api.PartitionMetadata;
 import kafka.api.TopicMetadata;
 import kafka.cluster.Broker;
+import kafka.common.LeaderNotAvailableException;
+import kafka.common.UnknownTopicOrPartitionException;
 import scala.collection.JavaConversions;
 import scala.collection.Seq;
 
@@ -42,22 +47,29 @@ public class KafkaTopicUtils {
 
 	private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicUtils.class);
 
-	private final ZkClient zkClient;
+	private ZkClient zkClient;
 
 	public static final int DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MS = 10000;
 	public static final int DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS = 10000;
 
+	private final String zookeeperAddress;
+	private final int sessionTimeoutMs;
+	private final int connectionTimeoutMs;
+
+	private volatile boolean isRunning = false;
+
 	public KafkaTopicUtils(String zookeeperServer) {
 		this(zookeeperServer, DEFAULT_ZOOKEEPER_SESSION_TIMEOUT_MS, DEFAULT_ZOOKEEPER_CONNECTION_TIMEOUT_MS);
 	}
 
 	public KafkaTopicUtils(String zookeeperAddress, int sessionTimeoutMs, int connectionTimeoutMs) {
-		zkClient = new ZkClient(zookeeperAddress, sessionTimeoutMs, connectionTimeoutMs,
-				new KafkaZKStringSerializer());
-		zkClient.waitUntilConnected();
+		this.zookeeperAddress = zookeeperAddress;
+		this.sessionTimeoutMs = sessionTimeoutMs;
+		this.connectionTimeoutMs = connectionTimeoutMs;
 	}
 
 	public void createTopic(String topicName, int numOfPartitions, int replicationFactor) {
+
 		LOG.info("Creating Kafka topic '{}'", topicName);
 		Properties topicConfig = new Properties();
 		if (topicExists(topicName)) {
@@ -65,36 +77,150 @@ public class KafkaTopicUtils {
 				LOG.warn("Kafka topic \"{}\" already exists. Returning without action.", topicName);
 			}
 		} else {
+			LOG.info("Connecting zookeeper");
+
+			initZkClient();
 			AdminUtils.createTopic(zkClient, topicName, numOfPartitions, replicationFactor, topicConfig);
+			closeZkClient();
 		}
 	}
 
+	public String getBrokerList(String topicName) {
+		return getBrokerAddressList(getBrokerAddresses(topicName));
+	}
+
+	public String getBrokerList(String topicName, int partitionId) {
+		return getBrokerAddressList(getBrokerAddresses(topicName, partitionId));
+	}
+
+	public Set<String> getBrokerAddresses(String topicName) {
+		int numOfPartitions = getNumberOfPartitions(topicName);
+
+		HashSet<String> brokers = new HashSet<String>();
+		for (int i = 0; i < numOfPartitions; i++) {
+			brokers.addAll(getBrokerAddresses(topicName, i));
+		}
+		return brokers;
+	}
+
+	public Set<String> getBrokerAddresses(String topicName, int partitionId) {
+		PartitionMetadata partitionMetadata = waitAndGetPartitionMetadata(topicName, partitionId);
+		Collection<Broker> inSyncReplicas = JavaConversions.asJavaCollection(partitionMetadata.isr());
+
+		HashSet<String> addresses = new HashSet<String>();
+		for (Broker broker : inSyncReplicas) {
+			addresses.add(broker.connectionString());
+		}
+		return addresses;
+	}
+
+	private static String getBrokerAddressList(Set<String> brokerAddresses) {
+		StringBuilder brokerAddressList = new StringBuilder("");
+		for (String broker : brokerAddresses) {
+			brokerAddressList.append(broker);
+			brokerAddressList.append(',');
+		}
+		brokerAddressList.deleteCharAt(brokerAddressList.length() - 1);
+
+		return brokerAddressList.toString();
+	}
+
 	public int getNumberOfPartitions(String topicName) {
-		Seq<PartitionMetadata> partitionMetadataSeq = getTopicInfo(topicName).partitionsMetadata();
+		Seq<PartitionMetadata> partitionMetadataSeq = getTopicMetadata(topicName).partitionsMetadata();
 		return JavaConversions.asJavaCollection(partitionMetadataSeq).size();
 	}
 
-	public String getLeaderBrokerAddressForTopic(String topicName) {
-		TopicMetadata topicInfo = getTopicInfo(topicName);
+	public PartitionMetadata waitAndGetPartitionMetadata(String topicName, int partitionId) {
+		isRunning = true;
+		PartitionMetadata partitionMetadata = null;
+		while (isRunning) {
+			try {
+				partitionMetadata = getPartitionMetadata(topicName, partitionId);
+				return partitionMetadata;
+			} catch (LeaderNotAvailableException e) {
+				if (LOG.isDebugEnabled()) {
+					LOG.debug("Got {} trying to fetch metadata again", e.getMessage());
+				}
+			}
+		}
+		isRunning = false;
+		return partitionMetadata;
+	}
+
+	public PartitionMetadata getPartitionMetadata(String topicName, int partitionId) {
+		PartitionMetadata partitionMetadata = getPartitionMetadataWithErrorCode(topicName, partitionId);
+		switch (partitionMetadata.errorCode()) {
+			case 0:
+				return partitionMetadata;
+			case 3:
+				throw new UnknownTopicOrPartitionException("While fetching metadata for " + topicName + " / " + partitionId);
+			case 5:
+				throw new LeaderNotAvailableException("While fetching metadata for " + topicName + " / " + partitionId);
+				default:
+					throw new RuntimeException("Unknown error occurred while fetching metadata for "
+							+ topicName + " / " + partitionId + ", with error code: " + partitionMetadata.errorCode());
+		}
+	}
+
+	private PartitionMetadata getPartitionMetadataWithErrorCode(String topicName, int partitionId) {
+		TopicMetadata topicInfo = getTopicMetadata(topicName);
 
 		Collection<PartitionMetadata> partitions = JavaConversions.asJavaCollection(topicInfo.partitionsMetadata());
-		PartitionMetadata partitionMetadata = partitions.iterator().next();
 
-		Broker leader = JavaConversions.asJavaCollection(partitionMetadata.isr()).iterator().next();
+		Iterator<PartitionMetadata> iterator = partitions.iterator();
+		for (PartitionMetadata partition : partitions) {
+			if (partition.partitionId() == partitionId) {
+				return partition;
+			}
+		}
 
-		return leader.connectionString();
+		throw new RuntimeException("No such partition: " + topicName + " / " + partitionId);
 	}
 
-	public TopicMetadata getTopicInfo(String topicName) {
+	public TopicMetadata getTopicMetadata(String topicName) {
+		TopicMetadata topicMetadata = getTopicMetadataWithErrorCode(topicName);
+		switch (topicMetadata.errorCode()) {
+			case 0:
+				return topicMetadata;
+			case 3:
+				throw new UnknownTopicOrPartitionException("While fetching metadata for topic " + topicName);
+			case 5:
+				throw new LeaderNotAvailableException("While fetching metadata for topic " + topicName);
+			default:
+				throw new RuntimeException("Unknown error occurred while fetching metadata for topic "
+						+ topicName + ", with error code: " + topicMetadata.errorCode());
+		}
+	}
+
+	private TopicMetadata getTopicMetadataWithErrorCode(String topicName) {
 		if (topicExists(topicName)) {
-			return AdminUtils.fetchTopicMetadataFromZk(topicName, zkClient);
+			initZkClient();
+			TopicMetadata topicMetadata = AdminUtils.fetchTopicMetadataFromZk(topicName, zkClient);
+			closeZkClient();
+
+			return topicMetadata;
 		} else {
 			throw new RuntimeException("Topic does not exist: " + topicName);
 		}
 	}
 
 	public boolean topicExists(String topicName) {
-		return AdminUtils.topicExists(zkClient, topicName);
+		initZkClient();
+		boolean topicExists = AdminUtils.topicExists(zkClient, topicName);
+		closeZkClient();
+
+		return topicExists;
+	}
+
+	private void initZkClient() {
+		zkClient = new ZkClient(zookeeperAddress, sessionTimeoutMs, connectionTimeoutMs,
+				new KafkaZKStringSerializer());
+		zkClient.waitUntilConnected();
+	}
+
+	private void closeZkClient() {
+		zkClient.close();
+		zkClient = null;
 	}
 
 	private static class KafkaZKStringSerializer implements ZkSerializer {

http://git-wip-us.apache.org/repos/asf/flink/blob/cb34e976/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
index 45dc1c4..7cd8a28 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
@@ -167,8 +167,6 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
 
 		int numberOfPartitions = kafkaTopicUtils.getNumberOfPartitions(topicId);
 
-		String brokerAddress = kafkaTopicUtils.getLeaderBrokerAddressForTopic(topicId);
-
 		if (indexOfSubtask >= numberOfPartitions) {
 			iterator = new KafkaIdleConsumerIterator();
 		} else {
@@ -188,7 +186,7 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
 				context.registerState("kafka", kafkaOffSet);
 			}
 
-			iterator = getMultiKafkaIterator(brokerAddress, topicId, partitions, waitOnEmptyFetchMillis);
+			iterator = new KafkaMultiplePartitionsIterator(topicId, partitions, kafkaTopicUtils, waitOnEmptyFetchMillis, connectTimeoutMs, bufferSize);
 
 			if (LOG.isInfoEnabled()) {
 				LOG.info("KafkaSource ({}/{}) listening to partitions {} of topic {}.",
@@ -199,10 +197,6 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
 		iterator.initialize();
 	}
 
-	protected KafkaConsumerIterator getMultiKafkaIterator(String hostName, String topic, Map<Integer, KafkaOffset> partitionsWithOffset, int waitOnEmptyFetch) {
-		return new KafkaMultiplePartitionsIterator(hostName, topic, partitionsWithOffset, waitOnEmptyFetch, this.connectTimeoutMs, this.bufferSize);
-	}
-
 	@Override
 	public void run(Collector<OUT> collector) throws Exception {
 		isRunning = true;

http://git-wip-us.apache.org/repos/asf/flink/blob/cb34e976/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
index 40e1ff2..b76421e 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
 import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadata;
 import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset;
 import org.slf4j.Logger;
@@ -33,25 +34,22 @@ public class KafkaMultiplePartitionsIterator implements KafkaConsumerIterator {
 	protected List<KafkaSinglePartitionIterator> partitions;
 	protected final int waitOnEmptyFetch;
 
-	public KafkaMultiplePartitionsIterator(String hostName, String topic,
+	public KafkaMultiplePartitionsIterator(String topic,
 										Map<Integer, KafkaOffset> partitionsWithOffset,
+										KafkaTopicUtils kafkaTopicUtils,
 										int waitOnEmptyFetch, int connectTimeoutMs, int bufferSize) {
 		partitions = new ArrayList<KafkaSinglePartitionIterator>(partitionsWithOffset.size());
 
-		String[] hostAndPort = hostName.split(":");
-
-		String host = hostAndPort[0];
-		int port = Integer.parseInt(hostAndPort[1]);
-
 		this.waitOnEmptyFetch = waitOnEmptyFetch;
 
 		for (Map.Entry<Integer, KafkaOffset> partitionWithOffset : partitionsWithOffset.entrySet()) {
 			partitions.add(new KafkaSinglePartitionIterator(
-					host,
-					port,
 					topic,
 					partitionWithOffset.getKey(),
-					partitionWithOffset.getValue(), connectTimeoutMs, bufferSize));
+					partitionWithOffset.getValue(),
+					kafkaTopicUtils,
+					connectTimeoutMs,
+					bufferSize));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/cb34e976/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
index cf49e43..6e326e5 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
@@ -19,11 +19,14 @@ package org.apache.flink.streaming.connectors.kafka.api.simple.iterator;
 
 import java.io.Serializable;
 import java.nio.ByteBuffer;
+import java.nio.channels.ClosedChannelException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Set;
 
+import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
 import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadata;
 import org.apache.flink.streaming.connectors.kafka.api.simple.offset.CurrentOffset;
 import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset;
@@ -32,7 +35,9 @@ import org.slf4j.LoggerFactory;
 
 import kafka.api.FetchRequest;
 import kafka.api.FetchRequestBuilder;
+import kafka.cluster.Broker;
 import kafka.common.ErrorMapping;
+import kafka.common.NotLeaderForPartitionException;
 import kafka.javaapi.FetchResponse;
 import kafka.javaapi.PartitionMetadata;
 import kafka.javaapi.TopicMetadata;
@@ -53,34 +58,43 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 
 	private List<String> hosts;
 	private String topic;
-	private int port;
 	private int partition;
 	private long readOffset;
 	private transient SimpleConsumer consumer;
 	private List<String> replicaBrokers;
 	private String clientName;
-	private String leadBroker;
+	private Broker leadBroker;
 	private final int connectTimeoutMs;
 	private final int bufferSize;
 
 	private KafkaOffset initialOffset;
 	private transient Iterator<MessageAndOffset> iter;
 	private transient FetchResponse fetchResponse;
+	private volatile boolean isRunning;
 
 	/**
 	 * Constructor with configurable wait time on empty fetch. For connecting to the Kafka service
 	 * we use the so called simple or low level Kafka API thus directly connecting to one of the brokers.
 	 *
-	 * @param hostName Hostname of a known Kafka broker
-	 * @param port Port of the known Kafka broker
-	 * @param topic Name of the topic to listen to
-	 * @param partition Partition in the chosen topic
+	 * @param topic
+	 * 		Name of the topic to listen to
+	 * @param partition
+	 * 		Partition in the chosen topic
+	 * @param initialOffset
+	 * 		Offset to start consuming at
+	 * @param kafkaTopicUtils
+	 * 		Util for receiving topic metadata
+	 * @param connectTimeoutMs
+	 * 		Connection timeout in milliseconds
+	 * @param bufferSize
+	 * 		Size of buffer
 	 */
-	public KafkaSinglePartitionIterator(String hostName, int port, String topic, int partition, KafkaOffset initialOffset,
-										int connectTimeoutMs, int bufferSize) {
-		this.hosts = new ArrayList<String>();
-		hosts.add(hostName);
-		this.port = port;
+	public KafkaSinglePartitionIterator(String topic, int partition, KafkaOffset initialOffset,
+			KafkaTopicUtils kafkaTopicUtils, int connectTimeoutMs, int bufferSize) {
+
+		Set<String> brokerAddresses = kafkaTopicUtils.getBrokerAddresses(topic, partition);
+		this.hosts = new ArrayList<String>(brokerAddresses);
+
 		this.connectTimeoutMs = connectTimeoutMs;
 		this.bufferSize = bufferSize;
 		this.topic = topic;
@@ -88,7 +102,7 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 
 		this.initialOffset = initialOffset;
 
-		replicaBrokers = new ArrayList<String>();
+		this.replicaBrokers = new ArrayList<String>();
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -100,29 +114,55 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 	 * the topic and establishing a connection to it.
 	 */
 	public void initialize() throws InterruptedException {
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Initializing consumer {} / {} with hosts {}", topic, partition, hosts);
+		}
+
 		PartitionMetadata metadata;
+		isRunning = true;
 		do {
-			metadata = findLeader(hosts, port, topic, partition);
+			metadata = findLeader(hosts, topic, partition);
 			try {
 				Thread.sleep(DEFAULT_WAIT_ON_EMPTY_FETCH);
 			} catch (InterruptedException e) {
 				throw new InterruptedException("Establishing connection to Kafka failed");
 			}
-		} while (metadata == null);
+		} while (isRunning && metadata == null);
+		isRunning = false;
 
 		if (metadata.leader() == null) {
-			throw new RuntimeException("Can't find Leader for Topic and Partition. (at " + hosts.get(0)
-					+ ":" + port);
+			throw new RuntimeException("Can't find Leader for Topic and Partition. (at " + hosts + ")");
 		}
 
-		leadBroker = metadata.leader().host();
+		leadBroker = metadata.leader();
 		clientName = "Client_" + topic + "_" + partition;
 
-		consumer = new SimpleConsumer(leadBroker, port, connectTimeoutMs, bufferSize, clientName);
+		consumer = new SimpleConsumer(leadBroker.host(), leadBroker.port(), connectTimeoutMs, bufferSize, clientName);
+
+		try {
+			readOffset = initialOffset.getOffset(consumer, topic, partition, clientName);
+		} catch (NotLeaderForPartitionException e) {
+			do {
+
+				metadata = findLeader(hosts, topic, partition);
 
-		readOffset = initialOffset.getOffset(consumer, topic, partition, clientName);
+				try {
+					Thread.sleep(DEFAULT_WAIT_ON_EMPTY_FETCH);
+				} catch (InterruptedException ie) {
+					throw new InterruptedException("Establishing connection to Kafka failed");
+				}
+			} while (metadata == null);
+			readOffset = initialOffset.getOffset(consumer, topic, partition, clientName);
+		}
 
-		resetFetchResponse(readOffset);
+		try {
+			resetFetchResponse(readOffset);
+		} catch (ClosedChannelException e) {
+			if (LOG.isWarnEnabled()) {
+				LOG.warn("Got ClosedChannelException, trying to find new leader.");
+			}
+			findNewLeader();
+		}
 	}
 
 	/**
@@ -161,7 +201,14 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 	public boolean fetchHasNext() throws InterruptedException {
 		synchronized (fetchResponse) {
 			if (!iter.hasNext()) {
-				resetFetchResponse(readOffset);
+				try {
+					resetFetchResponse(readOffset);
+				} catch (ClosedChannelException e) {
+					if (LOG.isWarnEnabled()) {
+						LOG.warn("Got ClosedChannelException, trying to find new leader.");
+					}
+					findNewLeader();
+				}
 				return iter.hasNext();
 			} else {
 				return true;
@@ -205,7 +252,7 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 	//  Internal utilities
 	// --------------------------------------------------------------------------------------------
 
-	private void resetFetchResponse(long offset) throws InterruptedException {
+	private void resetFetchResponse(long offset) throws InterruptedException, ClosedChannelException {
 		FetchRequest req = new FetchRequestBuilder().clientId(clientName)
 				.addFetch(topic, partition, offset, 100000).build();
 
@@ -225,24 +272,43 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 
 				readOffset = new CurrentOffset().getOffset(consumer, topic, partition, clientName);
 			}
-			consumer.close();
-			consumer = null;
-			leadBroker = findNewLeader(leadBroker, topic, partition, port);
+
+			findNewLeader();
 		}
 
 		iter = fetchResponse.messageSet(topic, partition).iterator();
 	}
 
-	private PartitionMetadata findLeader(List<String> a_hosts, int a_port, String a_topic,
+	private void findNewLeader() throws InterruptedException {
+		consumer.close();
+		consumer = null;
+		leadBroker = findNewLeader(leadBroker, topic, partition);
+		consumer = new SimpleConsumer(leadBroker.host(), leadBroker.port(), 100000, 64 * 1024, clientName);
+	}
+
+	@SuppressWarnings("ConstantConditions")
+	private PartitionMetadata findLeader(List<String> addresses, String a_topic,
 			int a_partition) {
+
 		PartitionMetadata returnMetaData = null;
 		loop:
-		for (String seed : a_hosts) {
+		for (String address : addresses) {
+
+			if (LOG.isDebugEnabled()) {
+				LOG.debug("Trying to find leader via broker: {}", address);
+			}
+
+			String[] split = address.split(":");
+			String host = split[0];
+			int port = Integer.parseInt(split[1]);
+
 			SimpleConsumer consumer = null;
 			try {
-				consumer = new SimpleConsumer(seed, a_port, connectTimeoutMs, bufferSize, "leaderLookup");
+				consumer = new SimpleConsumer(host, port, connectTimeoutMs, bufferSize, "leaderLookup");
 				List<String> topics = Collections.singletonList(a_topic);
+
 				TopicMetadataRequest req = new TopicMetadataRequest(topics);
+
 				kafka.javaapi.TopicMetadataResponse resp = consumer.send(req);
 
 				List<TopicMetadata> metaData = resp.topicsMetadata();
@@ -255,8 +321,13 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 					}
 				}
 			} catch (Exception e) {
-				throw new RuntimeException("Error communicating with Broker [" + seed
-						+ "] to find Leader for [" + a_topic + ", " + a_partition + "]", e);
+				if (e instanceof ClosedChannelException) {
+					LOG.warn("Got ClosedChannelException while trying to communicate with Broker" +
+							"[{}] to find Leader for [{}, {}]. Trying other replicas.", address, a_topic, a_partition);
+				} else {
+					throw new RuntimeException("Error communicating with Broker [" + address
+							+ "] to find Leader for [" + a_topic + ", " + a_partition + "]", e);
+				}
 			} finally {
 				if (consumer != null) {
 					consumer.close();
@@ -266,30 +337,31 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 		if (returnMetaData != null) {
 			replicaBrokers.clear();
 			for (kafka.cluster.Broker replica : returnMetaData.replicas()) {
-				replicaBrokers.add(replica.host());
+				replicaBrokers.add(replica.host() + ":" + replica.port());
 			}
 		}
 		return returnMetaData;
 	}
 
-	private String findNewLeader(String a_oldLeader, String a_topic, int a_partition, int a_port) throws InterruptedException {
+	@SuppressWarnings({"ConstantConditions", "UnusedAssignment"})
+	private Broker findNewLeader(Broker a_oldLeader, String a_topic, int a_partition) throws InterruptedException {
 		for (int i = 0; i < 3; i++) {
 			if (LOG.isInfoEnabled()) {
 				LOG.info("Trying to find a new leader after Broker failure.");
 			}
 			boolean goToSleep = false;
-			PartitionMetadata metadata = findLeader(replicaBrokers, a_port, a_topic, a_partition);
+			PartitionMetadata metadata = findLeader(replicaBrokers, a_topic, a_partition);
 			if (metadata == null) {
 				goToSleep = true;
 			} else if (metadata.leader() == null) {
 				goToSleep = true;
-			} else if (a_oldLeader.equalsIgnoreCase(metadata.leader().host()) && i == 0) {
+			} else if (a_oldLeader.host().equalsIgnoreCase(metadata.leader().host()) && i == 0) {
 				// first time through if the leader hasn't changed give ZooKeeper a second to recover
 				// second time, assume the broker did recover before failover, or it was a non-Broker issue
 				//
 				goToSleep = true;
 			} else {
-				return metadata.leader().host();
+				return metadata.leader();
 			}
 			if (goToSleep) {
 				try {

http://git-wip-us.apache.org/repos/asf/flink/blob/cb34e976/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
index 4dfd314..ac45e32 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
@@ -21,6 +21,9 @@ import java.io.Serializable;
 import java.util.HashMap;
 import java.util.Map;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import kafka.api.PartitionOffsetRequestInfo;
 import kafka.common.TopicAndPartition;
 import kafka.javaapi.OffsetResponse;
@@ -28,6 +31,8 @@ import kafka.javaapi.consumer.SimpleConsumer;
 
 public abstract class KafkaOffset implements Serializable {
 
+	private static final Logger LOG = LoggerFactory.getLogger(KafkaOffset.class);
+
 	private static final long serialVersionUID = 1L;
 
 	public abstract long getOffset(SimpleConsumer consumer, String topic, int partition,
@@ -38,14 +43,27 @@ public abstract class KafkaOffset implements Serializable {
 		TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
 		Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
 		requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
+
 		kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo,
 				kafka.api.OffsetRequest.CurrentVersion(), clientName);
 		OffsetResponse response = consumer.getOffsetsBefore(request);
 
-		if (response.hasError()) {
-			throw new RuntimeException("Error fetching data from Kafka broker. Reason: "
-					+ response.errorCode(topic, partition));
+		while (response.hasError()) {
+			switch (response.errorCode(topic, partition)) {
+				case 6:
+				case 3:
+					LOG.warn("Kafka broker trying to fetch from a non-leader broker.");
+					break;
+				default:
+					throw new RuntimeException("Error fetching data from Kafka broker. Reason: "
+							+ response.errorCode(topic, partition));
+			}
+
+			request = new kafka.javaapi.OffsetRequest(requestInfo,
+					kafka.api.OffsetRequest.CurrentVersion(), clientName);
+			response = consumer.getOffsetsBefore(request);
 		}
+
 		long[] offsets = response.offsets(topic, partition);
 		return offsets[0];
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/cb34e976/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 9344722..b416839 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -24,7 +24,9 @@ import java.io.File;
 import java.io.IOException;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.ArrayList;
 import java.util.BitSet;
+import java.util.List;
 import java.util.Properties;
 
 import org.apache.commons.lang.SerializationUtils;
@@ -43,6 +45,7 @@ import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
 import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
 import org.apache.flink.streaming.connectors.kafka.api.simple.offset.Offset;
 import org.apache.flink.streaming.connectors.kafka.partitioner.SerializableKafkaPartitioner;
+import org.apache.flink.streaming.connectors.kafka.util.KafkaLocalSystemTime;
 import org.apache.flink.streaming.util.serialization.DeserializationSchema;
 import org.apache.flink.streaming.util.serialization.JavaDefaultStringSchema;
 import org.apache.flink.streaming.util.serialization.SerializationSchema;
@@ -58,7 +61,6 @@ import org.slf4j.LoggerFactory;
 
 import kafka.server.KafkaConfig;
 import kafka.server.KafkaServer;
-import kafka.utils.Time;
 
 /**
  * Code in this test is based on the following GitHub repository:
@@ -70,9 +72,9 @@ import kafka.utils.Time;
 public class KafkaITCase {
 
 	private static final Logger LOG = LoggerFactory.getLogger(KafkaITCase.class);
+	private static final int NUMBER_OF_KAFKA_SERVERS = 3;
 
 	private static int zkPort;
-	private static int kafkaPort;
 	private static String kafkaHost;
 
 	private static String zookeeperConnectionString;
@@ -80,30 +82,39 @@ public class KafkaITCase {
 	@ClassRule
 	public static TemporaryFolder tempFolder = new TemporaryFolder();
 	public static File tmpZkDir;
-	public static File tmpKafkaDir;
+	public static List<File> tmpKafkaDirs;
 
 	private static TestingServer zookeeper;
-	private static KafkaServer broker1;
+	private static List<KafkaServer> brokers;
 
+	private static boolean shutdownKafkaBroker;
 
 	@BeforeClass
 	public static void prepare() throws IOException {
 		LOG.info("Starting KafkaITCase.prepare()");
 		tmpZkDir = tempFolder.newFolder();
-		tmpKafkaDir = tempFolder.newFolder();
+
+		tmpKafkaDirs = new ArrayList<File>(NUMBER_OF_KAFKA_SERVERS);
+		for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
+			tmpKafkaDirs.add(tempFolder.newFolder());
+		}
+
 		kafkaHost = InetAddress.getLocalHost().getHostName();
 		zkPort = NetUtils.getAvailablePort();
-		kafkaPort = NetUtils.getAvailablePort();
 		zookeeperConnectionString = "localhost:" + zkPort;
 
 		zookeeper = null;
-		broker1 = null;
+		brokers = null;
 
 		try {
 			LOG.info("Starting Zookeeper");
 			zookeeper = getZookeeper();
 			LOG.info("Starting KafkaServer");
-			broker1 = getKafkaServer(0);
+			brokers = new ArrayList<KafkaServer>(NUMBER_OF_KAFKA_SERVERS);
+			for (int i = 0; i < NUMBER_OF_KAFKA_SERVERS; i++) {
+				brokers.add(getKafkaServer(i, tmpKafkaDirs.get(i)));
+			}
+
 			LOG.info("ZK and KafkaServer started.");
 		} catch (Throwable t) {
 			LOG.warn("Test failed with exception", t);
@@ -114,8 +125,10 @@ public class KafkaITCase {
 	@AfterClass
 	public static void shutDownServices() {
 		LOG.info("Shutting down all services");
-		if (broker1 != null) {
-			broker1.shutdown();
+		for (KafkaServer broker : brokers) {
+			if (broker != null) {
+				broker.shutdown();
+			}
 		}
 		if (zookeeper != null) {
 			try {
@@ -131,7 +144,7 @@ public class KafkaITCase {
 		LOG.info("Starting KafkaITCase.regularKafkaSourceTest()");
 
 		String topic = "regularKafkaSourceTestTopic";
-		createTestTopic(topic, 1);
+		createTestTopic(topic, 1, 1);
 
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
@@ -145,7 +158,7 @@ public class KafkaITCase {
 
 			@Override
 			public void invoke(Tuple2<Long, String> value) throws Exception {
-				LOG.info("Got " + value);
+				LOG.debug("Got " + value);
 				String[] sp = value.f1.split("-");
 				int v = Integer.parseInt(sp[1]);
 
@@ -217,7 +230,7 @@ public class KafkaITCase {
 		LOG.info("Starting KafkaITCase.tupleTestTopology()");
 
 		String topic = "tupleTestTopic";
-		createTestTopic(topic, 1);
+		createTestTopic(topic, 1, 1);
 
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
@@ -231,7 +244,7 @@ public class KafkaITCase {
 
 			@Override
 			public void invoke(Tuple2<Long, String> value) throws Exception {
-				LOG.info("Got " + value);
+				LOG.debug("Got " + value);
 				String[] sp = value.f1.split("-");
 				int v = Integer.parseInt(sp[1]);
 
@@ -305,8 +318,8 @@ public class KafkaITCase {
 		LOG.info("Starting KafkaITCase.customPartitioningTestTopology()");
 
 		String topic = "customPartitioningTestTopic";
-		
-		createTestTopic(topic, 3);
+
+		createTestTopic(topic, 3, 1);
 
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
@@ -323,7 +336,7 @@ public class KafkaITCase {
 
 			@Override
 			public void invoke(Tuple2<Long, String> value) throws Exception {
-				LOG.info("Got " + value);
+				LOG.debug("Got " + value);
 				String[] sp = value.f1.split("-");
 				int v = Integer.parseInt(sp[1]);
 
@@ -412,6 +425,7 @@ public class KafkaITCase {
 		public int partition(Object key, int numPartitions) {
 			partitionerHasBeenCalled = true;
 
+			@SuppressWarnings("unchecked")
 			Tuple2<Long, String> tuple = (Tuple2<Long, String>) key;
 			if (tuple.f0 < 10) {
 				return 0;
@@ -441,14 +455,13 @@ public class KafkaITCase {
 		public boolean isEndOfStream(Tuple2<Long, String> nextElement) {
 			return false;
 		}
-
 	}
 
 	@Test
 	public void simpleTestTopology() throws Exception {
 		String topic = "simpleTestTopic";
 
-		createTestTopic(topic, 1);
+		createTestTopic(topic, 1, 1);
 
 		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 
@@ -462,7 +475,7 @@ public class KafkaITCase {
 
 			@Override
 			public void invoke(String value) throws Exception {
-				LOG.info("Got " + value);
+				LOG.debug("Got " + value);
 				String[] sp = value.split("-");
 				int v = Integer.parseInt(sp[1]);
 				if (start == -1) {
@@ -524,13 +537,149 @@ public class KafkaITCase {
 		}
 	}
 
+	private static boolean leaderHasShutDown = false;
+
+	@Test
+	public void brokerFailureTest() throws Exception {
+		String topic = "brokerFailureTestTopic";
+
+		createTestTopic(topic, 2, 2);
 
-	private void createTestTopic(String topic, int numberOfPartitions) {
 		KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString);
-		kafkaTopicUtils.createTopic(topic, numberOfPartitions, 1);
+		final String leaderToShutDown =
+				kafkaTopicUtils.waitAndGetPartitionMetadata(topic, 0).leader().get().connectionString();
+
+		final Thread brokerShutdown = new Thread(new Runnable() {
+			@Override
+			public void run() {
+				shutdownKafkaBroker = false;
+				while (!shutdownKafkaBroker) {
+					try {
+						Thread.sleep(10);
+					} catch (InterruptedException e) {
+						LOG.warn("Interruption", e);
+					}
+				}
+
+				for (KafkaServer kafkaServer : brokers) {
+					if (leaderToShutDown.equals(
+							kafkaServer.config().advertisedHostName()
+									+ ":"
+									+ kafkaServer.config().advertisedPort()
+					)) {
+						LOG.info("Killing Kafka Server {}", leaderToShutDown);
+						kafkaServer.shutdown();
+						leaderHasShutDown = true;
+						break;
+					}
+				}
+			}
+		});
+		brokerShutdown.start();
+
+		final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+
+		// add consuming topology:
+		DataStreamSource<String> consuming = env.addSource(
+				new PersistentKafkaSource<String>(zookeeperConnectionString, topic, new JavaDefaultStringSchema(), 5000, 10, Offset.FROM_BEGINNING));
+		consuming.setParallelism(1);
+
+		consuming.addSink(new SinkFunction<String>() {
+			int elCnt = 0;
+			int start = 0;
+			int numOfMessagesToReceive = 100;
+
+			BitSet validator = new BitSet(numOfMessagesToReceive + 1);
+
+			@Override
+			public void invoke(String value) throws Exception {
+				LOG.debug("Got " + value);
+				String[] sp = value.split("-");
+				int v = Integer.parseInt(sp[1]);
+
+				if (start == -1) {
+					start = v;
+				}
+				Assert.assertFalse("Received tuple twice", validator.get(v - start));
+				if (v - start < 0 && LOG.isWarnEnabled()) {
+					LOG.warn("Not in order: {}", value);
+				}
+
+				validator.set(v - start);
+				elCnt++;
+				if (elCnt == 20) {
+					// shut down a Kafka broker
+					shutdownKafkaBroker = true;
+				}
+
+				if (elCnt == numOfMessagesToReceive && leaderHasShutDown) {
+					// check if everything in the bitset is set to true
+					int nc;
+					if ((nc = validator.nextClearBit(0)) != numOfMessagesToReceive) {
+//						throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
+						System.out.println("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
+					}
+					throw new SuccessException();
+				} else if (elCnt == numOfMessagesToReceive) {
+					numOfMessagesToReceive += 50;
+					LOG.info("Waiting for more messages till {}", numOfMessagesToReceive);
+				}
+			}
+		});
+
+		// add producing topology
+		DataStream<String> stream = env.addSource(new SourceFunction<String>() {
+			boolean running = true;
+
+			@Override
+			public void run(Collector<String> collector) throws Exception {
+				LOG.info("Starting source.");
+				int cnt = 0;
+				while (running) {
+					collector.collect("kafka-" + cnt++);
+
+					if ((cnt - 1) % 20 == 0) {
+						LOG.debug("Sending message #{}", cnt - 1);
+					}
+
+					try {
+						Thread.sleep(10);
+					} catch (InterruptedException ignored) {
+					}
+				}
+			}
+
+			@Override
+			public void cancel() {
+				LOG.info("Source got chancel()");
+				running = false;
+			}
+		});
+		stream.addSink(new KafkaSink<String>(zookeeperConnectionString, topic, new JavaDefaultStringSchema()))
+				.setParallelism(1);
+
+		try {
+			env.setParallelism(1);
+			env.execute();
+		} catch (JobExecutionException good) {
+			Throwable t = good.getCause();
+			int limit = 0;
+			while (!(t instanceof SuccessException)) {
+				t = t.getCause();
+				if (limit++ == 20) {
+					LOG.warn("Test failed with exception", good);
+					Assert.fail("Test failed with: " + good.getMessage());
+				}
+			}
+		}
 	}
 
 
+	private void createTestTopic(String topic, int numberOfPartitions, int replicationFactor) {
+		KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString);
+		kafkaTopicUtils.createTopic(topic, numberOfPartitions, replicationFactor);
+	}
+
 	private static TestingServer getZookeeper() throws Exception {
 		return new TestingServer(zkPort, tmpZkDir);
 	}
@@ -538,42 +687,24 @@ public class KafkaITCase {
 	/**
 	 * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
 	 */
-	private static KafkaServer getKafkaServer(int brokerId) throws UnknownHostException {
+	private static KafkaServer getKafkaServer(int brokerId, File tmpFolder) throws UnknownHostException {
 		Properties kafkaProperties = new Properties();
+
+		int kafkaPort = NetUtils.getAvailablePort();
+
 		// properties have to be Strings
 		kafkaProperties.put("advertised.host.name", kafkaHost);
 		kafkaProperties.put("port", Integer.toString(kafkaPort));
 		kafkaProperties.put("broker.id", Integer.toString(brokerId));
-		kafkaProperties.put("log.dir", tmpKafkaDir.toString());
+		kafkaProperties.put("log.dir", tmpFolder.toString());
 		kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
 		KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
 
-		KafkaServer server = new KafkaServer(kafkaConfig, new LocalSystemTime());
+		KafkaServer server = new KafkaServer(kafkaConfig, new KafkaLocalSystemTime());
 		server.startup();
 		return server;
 	}
 
-	public static class LocalSystemTime implements Time {
-
-		@Override
-		public long milliseconds() {
-			return System.currentTimeMillis();
-		}
-		public long nanoseconds() {
-			return System.nanoTime();
-		}
-
-		@Override
-		public void sleep(long ms) {
-			try {
-				Thread.sleep(ms);
-			} catch (InterruptedException e) {
-				LOG.warn("Interruption", e);
-			}
-		}
-
-	}
-
 	public static class SuccessException extends Exception {
 		private static final long serialVersionUID = 1L;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/cb34e976/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTopicUtilsTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTopicUtilsTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTopicUtilsTest.java
new file mode 100644
index 0000000..5f0e198
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTopicUtilsTest.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.flink.runtime.net.NetUtils;
+import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
+import org.apache.flink.streaming.connectors.kafka.util.KafkaLocalSystemTime;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import kafka.api.PartitionMetadata;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+
+public class KafkaTopicUtilsTest {
+
+	private static final Logger LOG = LoggerFactory.getLogger(KafkaTopicUtilsTest.class);
+	private static final int NUMBER_OF_BROKERS = 2;
+	private static final String TOPIC = "myTopic";
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Test
+	public void test() {
+		int zkPort;
+		String kafkaHost;
+		String zookeeperConnectionString;
+
+		File tmpZkDir;
+		List<File> tmpKafkaDirs;
+		Map<String, KafkaServer> kafkaServers = null;
+		TestingServer zookeeper = null;
+
+		try {
+			tmpZkDir = tempFolder.newFolder();
+
+			tmpKafkaDirs = new ArrayList<File>(NUMBER_OF_BROKERS);
+			for (int i = 0; i < NUMBER_OF_BROKERS; i++) {
+				tmpKafkaDirs.add(tempFolder.newFolder());
+			}
+
+			zkPort = NetUtils.getAvailablePort();
+			kafkaHost = InetAddress.getLocalHost().getHostName();
+			zookeeperConnectionString = "localhost:" + zkPort;
+
+			// init zookeeper
+			zookeeper = new TestingServer(zkPort, tmpZkDir);
+
+			// init kafka kafkaServers
+			kafkaServers = new HashMap<String, KafkaServer>();
+
+			for (int i = 0; i < NUMBER_OF_BROKERS; i++) {
+				KafkaServer kafkaServer = getKafkaServer(kafkaHost, zookeeperConnectionString, i, tmpKafkaDirs.get(i));
+				kafkaServers.put(kafkaServer.config().advertisedHostName() + ":" + kafkaServer.config().advertisedPort(), kafkaServer);
+			}
+
+			// create Kafka topic
+			final KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString);
+			kafkaTopicUtils.createTopic(TOPIC, 1, 2);
+
+			// check whether topic exists
+			assertTrue(kafkaTopicUtils.topicExists(TOPIC));
+
+			// check number of partitions
+			assertEquals(1, kafkaTopicUtils.getNumberOfPartitions(TOPIC));
+
+			// get partition metadata without error
+			PartitionMetadata partitionMetadata = kafkaTopicUtils.waitAndGetPartitionMetadata(TOPIC, 0);
+			assertEquals(0, partitionMetadata.errorCode());
+
+			// get broker list
+			assertEquals(new HashSet<String>(kafkaServers.keySet()), kafkaTopicUtils.getBrokerAddresses(TOPIC));
+		} catch (IOException e) {
+			fail(e.toString());
+		} catch (Exception e) {
+			fail(e.toString());
+		} finally {
+			LOG.info("Shutting down all services");
+			for (KafkaServer broker : kafkaServers.values()) {
+				if (broker != null) {
+					broker.shutdown();
+				}
+			}
+
+			if (zookeeper != null) {
+				try {
+					zookeeper.stop();
+				} catch (IOException e) {
+					LOG.warn("ZK.stop() failed", e);
+				}
+			}
+		}
+	}
+
+	/**
+	 * Copied from com.github.sakserv.minicluster.KafkaLocalBrokerIntegrationTest (ASL licensed)
+	 */
+	private static KafkaServer getKafkaServer(String kafkaHost, String zookeeperConnectionString, int brokerId, File tmpFolder) throws UnknownHostException {
+		Properties kafkaProperties = new Properties();
+
+		int kafkaPort = NetUtils.getAvailablePort();
+
+		// properties have to be Strings
+		kafkaProperties.put("advertised.host.name", kafkaHost);
+		kafkaProperties.put("port", Integer.toString(kafkaPort));
+		kafkaProperties.put("broker.id", Integer.toString(brokerId));
+		kafkaProperties.put("log.dir", tmpFolder.toString());
+		kafkaProperties.put("zookeeper.connect", zookeeperConnectionString);
+		KafkaConfig kafkaConfig = new KafkaConfig(kafkaProperties);
+
+		KafkaServer server = new KafkaServer(kafkaConfig, new KafkaLocalSystemTime());
+		server.startup();
+		return server;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/cb34e976/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/util/KafkaLocalSystemTime.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/util/KafkaLocalSystemTime.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/util/KafkaLocalSystemTime.java
new file mode 100644
index 0000000..18fa46f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/util/KafkaLocalSystemTime.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka.util;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import kafka.utils.Time;
+
+public class KafkaLocalSystemTime implements Time {
+
+	private static final Logger LOG = LoggerFactory.getLogger(KafkaLocalSystemTime.class);
+
+	@Override
+	public long milliseconds() {
+		return System.currentTimeMillis();
+	}
+
+	public long nanoseconds() {
+		return System.nanoTime();
+	}
+
+	@Override
+	public void sleep(long ms) {
+		try {
+			Thread.sleep(ms);
+		} catch (InterruptedException e) {
+			LOG.warn("Interruption", e);
+		}
+	}
+
+}
+