You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/04/17 09:30:42 UTC

flink git commit: [streaming][connectors] Add more configuration options to PersistentKafkaSource

Repository: flink
Updated Branches:
  refs/heads/master c2faa6fe3 -> ea710204e


[streaming][connectors] Add more configuration options to PersistentKafkaSource

This closes #607


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ea710204
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ea710204
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ea710204

Branch: refs/heads/master
Commit: ea710204e4d1ac6ac6d3e6ce31215aa0b8ff50b1
Parents: c2faa6f
Author: Robert Metzger <rm...@apache.org>
Authored: Thu Apr 16 15:55:44 2015 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Fri Apr 17 09:25:37 2015 +0200

----------------------------------------------------------------------
 .../kafka/api/simple/PersistentKafkaSource.java |  40 ++++---
 .../simple/iterator/KafkaConsumerIterator.java  |   8 +-
 .../iterator/KafkaIdleConsumerIterator.java     |   3 +
 .../KafkaMultiplePartitionsIterator.java        |  11 +-
 .../iterator/KafkaSinglePartitionIterator.java  | 117 ++++++++++++-------
 .../kafka/api/simple/offset/GivenOffset.java    |   3 +
 .../kafka/api/simple/offset/KafkaOffset.java    |  26 ++++-
 .../streaming/connectors/kafka/KafkaITCase.java |  31 ++---
 .../src/test/resources/log4j-test.properties    |   2 +-
 9 files changed, 157 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ea710204/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
index f9d9508..2af8fb2 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
@@ -57,15 +57,20 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
 	private static final Logger LOG = LoggerFactory.getLogger(PersistentKafkaSource.class);
 
 	public static final String WAIT_ON_EMPTY_FETCH_KEY = "flink.waitOnEmptyFetchMillis";
+	public static final String WAIT_ON_FAILED_LEADER_MS_KEY = "flink.waitOnFailedLeaderDetection";
+	public static final int WAIT_ON_FAILED_LEADER__MS_DEFAULT = 2000;
+
+	public static final String MAX_FAILED_LEADER_RETRIES_KEY = "flink.maxLeaderDetectionRetries";
+	public static final int MAX_FAILED_LEADER_RETRIES_DEFAULT = 3;
 
 	private final String topicId;
 	private final KafkaOffset startingOffset;
 	private transient ConsumerConfig consumerConfig; // ConsumerConfig is not serializable.
 
 	private transient KafkaConsumerIterator iterator;
-	private transient OperatorState<Map<Integer, KafkaOffset>> kafkaOffSet;
+	private transient OperatorState<Map<Integer, KafkaOffset>> kafkaOffSetOperatorState;
 
-	private transient Map<Integer, KafkaOffset> partitions;
+	private transient Map<Integer, KafkaOffset> partitionOffsets;
 
 	/**
 	 * Creates a persistent Kafka source that consumes a topic.
@@ -180,6 +185,8 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
 		}
 	}
 
+	// ---------------------- Source lifecycle methods (open / run / cancel ) -----------------
+
 	@SuppressWarnings("unchecked")
 	@Override
 	public void open(Configuration parameters) throws InterruptedException {
@@ -193,29 +200,32 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
 		int numberOfPartitions = kafkaTopicUtils.getNumberOfPartitions(topicId);
 
 		if (indexOfSubtask >= numberOfPartitions) {
+			LOG.info("Creating idle consumer because this subtask ({}) is higher than the number partitions ({})", indexOfSubtask + 1, numberOfPartitions);
 			iterator = new KafkaIdleConsumerIterator();
 		} else {
 			if (context.containsState("kafka")) {
-				kafkaOffSet = (OperatorState<Map<Integer, KafkaOffset>>) context.getState("kafka");
+				LOG.info("Initializing PersistentKafkaSource from existing state.");
+				kafkaOffSetOperatorState = (OperatorState<Map<Integer, KafkaOffset>>) context.getState("kafka");
 
-				partitions = kafkaOffSet.getState();
+				partitionOffsets = kafkaOffSetOperatorState.getState();
 			} else {
-				partitions = new HashMap<Integer, KafkaOffset>();
+				LOG.info("No existing state found. Creating new");
+				partitionOffsets = new HashMap<Integer, KafkaOffset>();
 
 				for (int partitionIndex = indexOfSubtask; partitionIndex < numberOfPartitions; partitionIndex += numberOfSubtasks) {
-					partitions.put(partitionIndex, startingOffset);
+					partitionOffsets.put(partitionIndex, startingOffset);
 				}
 
-				kafkaOffSet = new OperatorState<Map<Integer, KafkaOffset>>(partitions);
+				kafkaOffSetOperatorState = new OperatorState<Map<Integer, KafkaOffset>>(partitionOffsets);
 
-				context.registerState("kafka", kafkaOffSet);
+				context.registerState("kafka", kafkaOffSetOperatorState);
 			}
 
-			iterator = new KafkaMultiplePartitionsIterator(topicId, partitions, kafkaTopicUtils, this.consumerConfig);
+			iterator = new KafkaMultiplePartitionsIterator(topicId, partitionOffsets, kafkaTopicUtils, this.consumerConfig);
 
 			if (LOG.isInfoEnabled()) {
-				LOG.info("PersistentKafkaSource ({}/{}) listening to partitions {} of topic {}.",
-						indexOfSubtask + 1, numberOfSubtasks, partitions.keySet(), topicId);
+				LOG.info("PersistentKafkaSource ({}/{}) listening to partitionOffsets {} of topic {}.",
+						indexOfSubtask + 1, numberOfSubtasks, partitionOffsets.keySet(), topicId);
 			}
 		}
 
@@ -236,8 +246,8 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
 			collector.collect(out);
 
 			// TODO avoid object creation
-			partitions.put(msg.getPartition(), new GivenOffset(msg.getOffset()));
-			kafkaOffSet.update(partitions);
+			partitionOffsets.put(msg.getPartition(), new GivenOffset(msg.getOffset()));
+			kafkaOffSetOperatorState.update(partitionOffsets);
 		}
 	}
 
@@ -246,6 +256,10 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
 		LOG.info("PersistentKafkaSource has been cancelled");
 	}
 
+
+
+	// ---------------------- (Java)Serialization methods for the consumerConfig -----------------
+
 	private void writeObject(ObjectOutputStream out)
 			throws IOException, ClassNotFoundException {
 		out.defaultWriteObject();

http://git-wip-us.apache.org/repos/asf/flink/blob/ea710204/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaConsumerIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaConsumerIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaConsumerIterator.java
index ef87b34..42fe003 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaConsumerIterator.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaConsumerIterator.java
@@ -24,9 +24,9 @@ import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadat
  */
 public interface KafkaConsumerIterator {
 
-	public void initialize() throws InterruptedException;
+	void initialize() throws InterruptedException;
 
-	public boolean hasNext();
+	boolean hasNext();
 
 	/**
 	 * Returns the next message received from Kafka as a
@@ -34,7 +34,7 @@ public interface KafkaConsumerIterator {
 	 *
 	 * @return next message as a byte array.
 	 */
-	public byte[] next() throws InterruptedException;
+	byte[] next() throws InterruptedException;
 
 	/**
 	 * Returns the next message and its offset received from
@@ -42,5 +42,5 @@ public interface KafkaConsumerIterator {
 	 *
 	 * @return next message and its offset.
 	 */
-	public MessageWithMetadata nextWithOffset() throws InterruptedException;
+	MessageWithMetadata nextWithOffset() throws InterruptedException;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ea710204/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaIdleConsumerIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaIdleConsumerIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaIdleConsumerIterator.java
index af16ab5..1935118 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaIdleConsumerIterator.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaIdleConsumerIterator.java
@@ -21,6 +21,9 @@ import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadat
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * No-op iterator. Used when more source tasks are available than Kafka partitions
+ */
 public class KafkaIdleConsumerIterator implements KafkaConsumerIterator {
 
 	private static final Logger LOG = LoggerFactory.getLogger(KafkaIdleConsumerIterator.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/ea710204/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
index 9da1bea..daebaaf 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
@@ -29,6 +29,11 @@ import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+/**
+ * Iterator over multiple Kafka partitions.
+ *
+ * This is needed when num partitions > num kafka sources.
+ */
 public class KafkaMultiplePartitionsIterator implements KafkaConsumerIterator {
 
 	private static final Logger LOG = LoggerFactory.getLogger(KafkaMultiplePartitionsIterator.class);
@@ -55,9 +60,13 @@ public class KafkaMultiplePartitionsIterator implements KafkaConsumerIterator {
 
 	@Override
 	public void initialize() throws InterruptedException {
+		LOG.info("Initializing iterator with {} partitions", partitions.size());
+		String partInfo = "";
 		for (KafkaSinglePartitionIterator partition : partitions) {
 			partition.initialize();
+			partInfo += partition.toString() + " ";
 		}
+		LOG.info("Initialized partitions {}", partInfo);
 	}
 
 	@Override
@@ -91,7 +100,7 @@ public class KafkaMultiplePartitionsIterator implements KafkaConsumerIterator {
 			// do not wait if a new message has been fetched
 			if (!gotNewMessage) {
 				try {
-					Thread.sleep(consumerConfig.props().getInt(PersistentKafkaSource.WAIT_ON_EMPTY_FETCH_KEY));
+					Thread.sleep(consumerConfig.props().getInt(PersistentKafkaSource.WAIT_ON_EMPTY_FETCH_KEY), consumerConfig.fetchWaitMaxMs());
 				} catch (InterruptedException e) {
 					LOG.warn("Interrupted while waiting for new messages", e);
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea710204/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
index 0aaa771..bf3b3f4 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
@@ -29,6 +29,8 @@ import java.util.Set;
 import kafka.consumer.ConsumerConfig;
 import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
 import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadata;
+import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
+import org.apache.flink.streaming.connectors.kafka.api.simple.offset.BeginningOffset;
 import org.apache.flink.streaming.connectors.kafka.api.simple.offset.CurrentOffset;
 import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset;
 import org.slf4j.Logger;
@@ -48,6 +50,8 @@ import kafka.message.MessageAndOffset;
 
 /**
  * Iterates the records received from a partition of a Kafka topic as byte arrays.
+ *
+ * This code is in parts based on https://gist.github.com/ashrithr/5811266.
  */
 public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Serializable {
 
@@ -55,8 +59,6 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 
 	private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePartitionIterator.class);
 
-	private static final long DEFAULT_WAIT_ON_EMPTY_FETCH = 10000L;
-
 	private List<String> hosts;
 	private String topic;
 	private int partition;
@@ -107,24 +109,12 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 	 * Initializes the connection by detecting the leading broker of
 	 * the topic and establishing a connection to it.
 	 */
-	public void initialize() throws InterruptedException {
+	public void initialize() {
 		if (LOG.isInfoEnabled()) {
 			LOG.info("Initializing consumer {} / {} with hosts {}", topic, partition, hosts);
 		}
 
-		PartitionMetadata metadata;
-		do {
-			metadata = findLeader(hosts, topic, partition);
-			try {
-				Thread.sleep(DEFAULT_WAIT_ON_EMPTY_FETCH);
-			} catch (InterruptedException e) {
-				throw new InterruptedException("Establishing connection to Kafka failed");
-			}
-		} while (metadata == null);
-
-		if (metadata.leader() == null) {
-			throw new RuntimeException("Can't find Leader for Topic and Partition. (at " + hosts + ")");
-		}
+		PartitionMetadata metadata = getPartitionMetadata();
 
 		leadBroker = metadata.leader();
 		clientName = "Client_" + topic + "_" + partition;
@@ -134,16 +124,7 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 		try {
 			readOffset = initialOffset.getOffset(consumer, topic, partition, clientName);
 		} catch (NotLeaderForPartitionException e) {
-			do {
-				metadata = findLeader(hosts, topic, partition);
-
-				try {
-					Thread.sleep(DEFAULT_WAIT_ON_EMPTY_FETCH);
-				} catch (InterruptedException ie) {
-					throw new InterruptedException("Establishing connection to Kafka failed");
-				}
-			} while (metadata == null);
-			readOffset = initialOffset.getOffset(consumer, topic, partition, clientName);
+			throw new RuntimeException("Unable to get offset",e);
 		}
 
 		try {
@@ -156,6 +137,38 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 		}
 	}
 
+	private PartitionMetadata getPartitionMetadata() {
+		PartitionMetadata metadata;
+		int retry = 0;
+		int waitTime = consumerConfig.props().getInt(PersistentKafkaSource.WAIT_ON_FAILED_LEADER_MS_KEY, PersistentKafkaSource.WAIT_ON_FAILED_LEADER__MS_DEFAULT);
+		do {
+			metadata = findLeader(hosts, topic, partition);
+			/*try {
+				Thread.sleep(10000);
+			} catch (InterruptedException e) {
+				throw new RuntimeException("Establishing connection to Kafka failed", e);
+			} */
+			if(metadata == null) {
+				retry++;
+				if(retry == consumerConfig.props().getInt(PersistentKafkaSource.MAX_FAILED_LEADER_RETRIES_KEY, PersistentKafkaSource.MAX_FAILED_LEADER_RETRIES_DEFAULT)) {
+					throw new RuntimeException("Tried finding a leader "+retry+" times without success");
+				}
+				LOG.warn("Unable to get leader and partition metadata. Waiting {} ms until retrying. Retries so far {}",waitTime, retry-1);
+				try {
+					Thread.sleep(waitTime);
+				} catch (InterruptedException e) {
+					throw new RuntimeException("Establishing connection to Kafka failed", e);
+				}
+			}
+		} while (metadata == null);
+
+		if (metadata.leader() == null) {
+			throw new RuntimeException("Can't find Leader for Topic and Partition. (at " + hosts + ")");
+		}
+
+		return metadata;
+	}
+
 	/**
 	 * Sets the partition to read from.
 	 *
@@ -185,18 +198,18 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 	 *
 	 * @return next message as a byte array.
 	 */
-	public byte[] next() throws InterruptedException {
+	public byte[] next() {
 		return nextWithOffset().getMessage();
 	}
 
-	public boolean fetchHasNext() throws InterruptedException {
+	public boolean fetchHasNext() {
 		synchronized (fetchResponse) {
 			if (!iter.hasNext()) {
 				try {
 					resetFetchResponse(readOffset);
 				} catch (ClosedChannelException e) {
 					if (LOG.isWarnEnabled()) {
-						LOG.warn("Got ClosedChannelException, trying to find new leader.");
+						LOG.warn("Got ClosedChannelException, trying to find new leader.", e);
 					}
 					findNewLeader();
 				}
@@ -213,7 +226,7 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 	 *
 	 * @return next message and its offset.
 	 */
-	public MessageWithMetadata nextWithOffset() throws InterruptedException {
+	public MessageWithMetadata nextWithOffset() {
 
 		synchronized (fetchResponse) {
 			if (!iter.hasNext()) {
@@ -243,7 +256,7 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 	//  Internal utilities
 	// --------------------------------------------------------------------------------------------
 
-	private void resetFetchResponse(long offset) throws InterruptedException, ClosedChannelException {
+	private void resetFetchResponse(long offset) throws ClosedChannelException {
 		FetchRequest req = new FetchRequestBuilder().clientId(clientName)
 				.addFetch(topic, partition, offset, consumerConfig.fetchMessageMaxBytes()).build();
 
@@ -258,10 +271,18 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 
 			if (code == ErrorMapping.OffsetOutOfRangeCode()) {
 				if (LOG.isErrorEnabled()) {
-					LOG.error("Asked for invalid offset {}, setting the offset to the latest.", offset);
+					LOG.error("Asked for invalid offset {}", offset);
+				}
+				String reset = consumerConfig.autoOffsetReset();
+				if(reset.equals("smallest")) {
+					LOG.info("Setting read offset to beginning (smallest)");
+					readOffset = new BeginningOffset().getOffset(consumer, topic, partition, clientName);
+				} else if(reset.equals("largest")) {
+					LOG.info("Setting read offset to current offset (largest)");
+					readOffset = new CurrentOffset().getOffset(consumer, topic, partition, clientName);
+				} else {
+					throw new RuntimeException("Unknown 'autooffset.reset' mode '"+reset+"' Supported values are 'smallest' and 'largest'.");
 				}
-
-				readOffset = new CurrentOffset().getOffset(consumer, topic, partition, clientName);
 			}
 
 			findNewLeader();
@@ -270,15 +291,14 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 		iter = fetchResponse.messageSet(topic, partition).iterator();
 	}
 
-	private void findNewLeader() throws InterruptedException {
+	private void findNewLeader() {
 		consumer.close();
 		consumer = null;
 		leadBroker = findNewLeader(leadBroker, topic, partition);
 		consumer = new SimpleConsumer(leadBroker.host(), leadBroker.port(), consumerConfig.socketTimeoutMs(), consumerConfig.socketReceiveBufferBytes(), clientName);
 	}
 
-	private PartitionMetadata findLeader(List<String> addresses, String a_topic,
-			int a_partition) {
+	private PartitionMetadata findLeader(List<String> addresses, String topic, int partition) {
 
 		PartitionMetadata returnMetaData = null;
 		loop:
@@ -295,7 +315,7 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 			SimpleConsumer consumer = null;
 			try {
 				consumer = new SimpleConsumer(host, port, consumerConfig.socketTimeoutMs(), consumerConfig.socketReceiveBufferBytes(), "leaderLookup");
-				List<String> topics = Collections.singletonList(a_topic);
+				List<String> topics = Collections.singletonList(topic);
 
 				TopicMetadataRequest req = new TopicMetadataRequest(topics);
 
@@ -304,7 +324,7 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 				List<TopicMetadata> metaData = resp.topicsMetadata();
 				for (TopicMetadata item : metaData) {
 					for (PartitionMetadata part : item.partitionsMetadata()) {
-						if (part.partitionId() == a_partition) {
+						if (part.partitionId() == partition) {
 							returnMetaData = part;
 							break loop;
 						}
@@ -313,10 +333,9 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 			} catch (Exception e) {
 				if (e instanceof ClosedChannelException) {
 					LOG.warn("Got ClosedChannelException while trying to communicate with Broker" +
-							"[{}] to find Leader for [{}, {}]. Trying other replicas.", address, a_topic, a_partition);
+							"[{}] to find Leader for [{}, {}]. Trying other replicas.", address, topic, partition);
 				} else {
-					throw new RuntimeException("Error communicating with Broker [" + address
-							+ "] to find Leader for [" + a_topic + ", " + a_partition + "]", e);
+					throw new RuntimeException("Error communicating with Broker [" + address + "] to find Leader for [" + topic + ", " + partition + "]", e);
 				}
 			} finally {
 				if (consumer != null) {
@@ -333,18 +352,18 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 		return returnMetaData;
 	}
 
-	private Broker findNewLeader(Broker a_oldLeader, String a_topic, int a_partition) throws InterruptedException {
+	private Broker findNewLeader(Broker oldLeader, String topic, int a_partition) {
 		for (int i = 0; i < 3; i++) {
 			if (LOG.isInfoEnabled()) {
 				LOG.info("Trying to find a new leader after Broker failure.");
 			}
 			boolean goToSleep = false;
-			PartitionMetadata metadata = findLeader(replicaBrokers, a_topic, a_partition);
+			PartitionMetadata metadata = findLeader(replicaBrokers, topic, a_partition);
 			if (metadata == null) {
 				goToSleep = true;
 			} else if (metadata.leader() == null) {
 				goToSleep = true;
-			} else if (a_oldLeader.host().equalsIgnoreCase(metadata.leader().host()) && i == 0) {
+			} else if (oldLeader.host().equalsIgnoreCase(metadata.leader().host()) && i == 0) {
 				// first time through if the leader hasn't changed give ZooKeeper a second to recover
 				// second time, assume the broker did recover before failover, or it was a non-Broker issue
 				//
@@ -362,4 +381,12 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
 		throw new RuntimeException("Unable to find new leader after Broker failure.");
 	}
 
+	public int getId() {
+		return this.partition;
+	}
+
+	@Override
+	public String toString() {
+		return "SinglePartitionIterator{partition="+partition+" readOffset="+readOffset+"}";
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ea710204/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java
index fef6325..3aec7ff 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java
@@ -19,6 +19,9 @@ package org.apache.flink.streaming.connectors.kafka.api.simple.offset;
 
 import kafka.javaapi.consumer.SimpleConsumer;
 
+/**
+ * Offset given by a message read from Kafka.
+ */
 public class GivenOffset extends KafkaOffset {
 
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/flink/blob/ea710204/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
index ac45e32..2eaa2b8 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.connectors.kafka.api.simple.offset;
 
 import java.io.Serializable;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -29,6 +30,9 @@ import kafka.common.TopicAndPartition;
 import kafka.javaapi.OffsetResponse;
 import kafka.javaapi.consumer.SimpleConsumer;
 
+/**
+ * Superclass for various kinds of KafkaOffsets.
+ */
 public abstract class KafkaOffset implements Serializable {
 
 	private static final Logger LOG = LoggerFactory.getLogger(KafkaOffset.class);
@@ -38,6 +42,15 @@ public abstract class KafkaOffset implements Serializable {
 	public abstract long getOffset(SimpleConsumer consumer, String topic, int partition,
 			String clientName);
 
+	/**
+	 *
+	 * @param consumer
+	 * @param topic
+	 * @param partition
+	 * @param whichTime Type of offset request (latest time / earliest time)
+	 * @param clientName
+	 * @return
+	 */
 	protected long getLastOffset(SimpleConsumer consumer, String topic, int partition,
 			long whichTime, String clientName) {
 		TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
@@ -49,22 +62,25 @@ public abstract class KafkaOffset implements Serializable {
 		OffsetResponse response = consumer.getOffsetsBefore(request);
 
 		while (response.hasError()) {
-			switch (response.errorCode(topic, partition)) {
+			int errorCode = response.errorCode(topic, partition);
+			LOG.warn("Response has error. Error code "+errorCode);
+			switch (errorCode) {
 				case 6:
 				case 3:
 					LOG.warn("Kafka broker trying to fetch from a non-leader broker.");
 					break;
 				default:
-					throw new RuntimeException("Error fetching data from Kafka broker. Reason: "
-							+ response.errorCode(topic, partition));
+					throw new RuntimeException("Error fetching data from Kafka broker. Error code " + errorCode);
 			}
 
-			request = new kafka.javaapi.OffsetRequest(requestInfo,
-					kafka.api.OffsetRequest.CurrentVersion(), clientName);
+			request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
 			response = consumer.getOffsetsBefore(request);
 		}
 
 		long[] offsets = response.offsets(topic, partition);
+		if(offsets.length > 1) {
+			LOG.warn("The offset response unexpectedly contained more than one offset: "+ Arrays.toString(offsets) + " Using only first one");
+		}
 		return offsets[0];
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ea710204/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 9ecd0a3..87c6a34 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -670,8 +670,7 @@ public class KafkaITCase {
 		createTestTopic(topic, 2, 2);
 
 		KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString);
-		final String leaderToShutDown =
-				kafkaTopicUtils.waitAndGetPartitionMetadata(topic, 0).leader().get().connectionString();
+		final String leaderToShutDown = kafkaTopicUtils.waitAndGetPartitionMetadata(topic, 0).leader().get().connectionString();
 
 		final Thread brokerShutdown = new Thread(new Runnable() {
 			@Override
@@ -711,13 +710,14 @@ public class KafkaITCase {
 		consuming.addSink(new SinkFunction<String>() {
 			int elCnt = 0;
 			int start = 0;
-			int numOfMessagesToReceive = 100;
+			int numOfMessagesToBeCorrect = 100;
+			int stopAfterMessages = 150;
 
-			BitSet validator = new BitSet(numOfMessagesToReceive + 1);
+			BitSet validator = new BitSet(numOfMessagesToBeCorrect + 1);
 
 			@Override
 			public void invoke(String value) throws Exception {
-				LOG.debug("Got " + value);
+				LOG.info("Got message = " + value + " leader has shut down "+leaderHasShutDown+" el cnt = "+elCnt+" to rec"+ numOfMessagesToBeCorrect);
 				String[] sp = value.split("-");
 				int v = Integer.parseInt(sp[1]);
 
@@ -736,16 +736,15 @@ public class KafkaITCase {
 					shutdownKafkaBroker = true;
 				}
 
-				if (elCnt == numOfMessagesToReceive && leaderHasShutDown) {
-					// check if everything in the bitset is set to true
-					int nc;
-					if ((nc = validator.nextClearBit(0)) != numOfMessagesToReceive) {
-						throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
+				if(leaderHasShutDown) { // it only makes sence to check once the shutdown is completed
+					if (elCnt >= stopAfterMessages ) {
+						// check if everything in the bitset is set to true
+						int nc;
+						if ((nc = validator.nextClearBit(0)) < numOfMessagesToBeCorrect) {
+							throw new RuntimeException("The bitset was not set to 1 on all elements to be checked. Next clear:" + nc + " Set: " + validator);
+						}
+						throw new SuccessException();
 					}
-					throw new SuccessException();
-				} else if (elCnt == numOfMessagesToReceive) {
-					numOfMessagesToReceive += 50;
-					LOG.info("Waiting for more messages till {}", numOfMessagesToReceive);
 				}
 			}
 		});
@@ -759,7 +758,9 @@ public class KafkaITCase {
 				LOG.info("Starting source.");
 				int cnt = 0;
 				while (running) {
-					collector.collect("kafka-" + cnt++);
+					String msg = "kafka-" + cnt++;
+					collector.collect(msg);
+					LOG.info("sending message = "+msg);
 
 					if ((cnt - 1) % 20 == 0) {
 						LOG.debug("Sending message #{}", cnt - 1);

http://git-wip-us.apache.org/repos/asf/flink/blob/ea710204/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties
index dc20726..9ede613 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 ################################################################################
 
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
 
 log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
 log4j.appender.testlogger.target = System.err