You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by rm...@apache.org on 2015/04/17 09:30:42 UTC
flink git commit: [streaming][connectors] Add more configuration
options to PersistentKafkaSource
Repository: flink
Updated Branches:
refs/heads/master c2faa6fe3 -> ea710204e
[streaming][connectors] Add more configuration options to PersistentKafkaSource
This closes #607
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ea710204
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ea710204
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ea710204
Branch: refs/heads/master
Commit: ea710204e4d1ac6ac6d3e6ce31215aa0b8ff50b1
Parents: c2faa6f
Author: Robert Metzger <rm...@apache.org>
Authored: Thu Apr 16 15:55:44 2015 +0200
Committer: Robert Metzger <rm...@apache.org>
Committed: Fri Apr 17 09:25:37 2015 +0200
----------------------------------------------------------------------
.../kafka/api/simple/PersistentKafkaSource.java | 40 ++++---
.../simple/iterator/KafkaConsumerIterator.java | 8 +-
.../iterator/KafkaIdleConsumerIterator.java | 3 +
.../KafkaMultiplePartitionsIterator.java | 11 +-
.../iterator/KafkaSinglePartitionIterator.java | 117 ++++++++++++-------
.../kafka/api/simple/offset/GivenOffset.java | 3 +
.../kafka/api/simple/offset/KafkaOffset.java | 26 ++++-
.../streaming/connectors/kafka/KafkaITCase.java | 31 ++---
.../src/test/resources/log4j-test.properties | 2 +-
9 files changed, 157 insertions(+), 84 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/ea710204/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
index f9d9508..2af8fb2 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/PersistentKafkaSource.java
@@ -57,15 +57,20 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
private static final Logger LOG = LoggerFactory.getLogger(PersistentKafkaSource.class);
public static final String WAIT_ON_EMPTY_FETCH_KEY = "flink.waitOnEmptyFetchMillis";
+ public static final String WAIT_ON_FAILED_LEADER_MS_KEY = "flink.waitOnFailedLeaderDetection";
+ public static final int WAIT_ON_FAILED_LEADER__MS_DEFAULT = 2000;
+
+ public static final String MAX_FAILED_LEADER_RETRIES_KEY = "flink.maxLeaderDetectionRetries";
+ public static final int MAX_FAILED_LEADER_RETRIES_DEFAULT = 3;
private final String topicId;
private final KafkaOffset startingOffset;
private transient ConsumerConfig consumerConfig; // ConsumerConfig is not serializable.
private transient KafkaConsumerIterator iterator;
- private transient OperatorState<Map<Integer, KafkaOffset>> kafkaOffSet;
+ private transient OperatorState<Map<Integer, KafkaOffset>> kafkaOffSetOperatorState;
- private transient Map<Integer, KafkaOffset> partitions;
+ private transient Map<Integer, KafkaOffset> partitionOffsets;
/**
* Creates a persistent Kafka source that consumes a topic.
@@ -180,6 +185,8 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
}
}
+ // ---------------------- Source lifecycle methods (open / run / cancel ) -----------------
+
@SuppressWarnings("unchecked")
@Override
public void open(Configuration parameters) throws InterruptedException {
@@ -193,29 +200,32 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
int numberOfPartitions = kafkaTopicUtils.getNumberOfPartitions(topicId);
if (indexOfSubtask >= numberOfPartitions) {
+ LOG.info("Creating idle consumer because this subtask ({}) is higher than the number partitions ({})", indexOfSubtask + 1, numberOfPartitions);
iterator = new KafkaIdleConsumerIterator();
} else {
if (context.containsState("kafka")) {
- kafkaOffSet = (OperatorState<Map<Integer, KafkaOffset>>) context.getState("kafka");
+ LOG.info("Initializing PersistentKafkaSource from existing state.");
+ kafkaOffSetOperatorState = (OperatorState<Map<Integer, KafkaOffset>>) context.getState("kafka");
- partitions = kafkaOffSet.getState();
+ partitionOffsets = kafkaOffSetOperatorState.getState();
} else {
- partitions = new HashMap<Integer, KafkaOffset>();
+ LOG.info("No existing state found. Creating new");
+ partitionOffsets = new HashMap<Integer, KafkaOffset>();
for (int partitionIndex = indexOfSubtask; partitionIndex < numberOfPartitions; partitionIndex += numberOfSubtasks) {
- partitions.put(partitionIndex, startingOffset);
+ partitionOffsets.put(partitionIndex, startingOffset);
}
- kafkaOffSet = new OperatorState<Map<Integer, KafkaOffset>>(partitions);
+ kafkaOffSetOperatorState = new OperatorState<Map<Integer, KafkaOffset>>(partitionOffsets);
- context.registerState("kafka", kafkaOffSet);
+ context.registerState("kafka", kafkaOffSetOperatorState);
}
- iterator = new KafkaMultiplePartitionsIterator(topicId, partitions, kafkaTopicUtils, this.consumerConfig);
+ iterator = new KafkaMultiplePartitionsIterator(topicId, partitionOffsets, kafkaTopicUtils, this.consumerConfig);
if (LOG.isInfoEnabled()) {
- LOG.info("PersistentKafkaSource ({}/{}) listening to partitions {} of topic {}.",
- indexOfSubtask + 1, numberOfSubtasks, partitions.keySet(), topicId);
+ LOG.info("PersistentKafkaSource ({}/{}) listening to partitionOffsets {} of topic {}.",
+ indexOfSubtask + 1, numberOfSubtasks, partitionOffsets.keySet(), topicId);
}
}
@@ -236,8 +246,8 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
collector.collect(out);
// TODO avoid object creation
- partitions.put(msg.getPartition(), new GivenOffset(msg.getOffset()));
- kafkaOffSet.update(partitions);
+ partitionOffsets.put(msg.getPartition(), new GivenOffset(msg.getOffset()));
+ kafkaOffSetOperatorState.update(partitionOffsets);
}
}
@@ -246,6 +256,10 @@ public class PersistentKafkaSource<OUT> extends ConnectorSource<OUT> {
LOG.info("PersistentKafkaSource has been cancelled");
}
+
+
+ // ---------------------- (Java)Serialization methods for the consumerConfig -----------------
+
private void writeObject(ObjectOutputStream out)
throws IOException, ClassNotFoundException {
out.defaultWriteObject();
http://git-wip-us.apache.org/repos/asf/flink/blob/ea710204/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaConsumerIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaConsumerIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaConsumerIterator.java
index ef87b34..42fe003 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaConsumerIterator.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaConsumerIterator.java
@@ -24,9 +24,9 @@ import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadat
*/
public interface KafkaConsumerIterator {
- public void initialize() throws InterruptedException;
+ void initialize() throws InterruptedException;
- public boolean hasNext();
+ boolean hasNext();
/**
* Returns the next message received from Kafka as a
@@ -34,7 +34,7 @@ public interface KafkaConsumerIterator {
*
* @return next message as a byte array.
*/
- public byte[] next() throws InterruptedException;
+ byte[] next() throws InterruptedException;
/**
* Returns the next message and its offset received from
@@ -42,5 +42,5 @@ public interface KafkaConsumerIterator {
*
* @return next message and its offset.
*/
- public MessageWithMetadata nextWithOffset() throws InterruptedException;
+ MessageWithMetadata nextWithOffset() throws InterruptedException;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ea710204/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaIdleConsumerIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaIdleConsumerIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaIdleConsumerIterator.java
index af16ab5..1935118 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaIdleConsumerIterator.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaIdleConsumerIterator.java
@@ -21,6 +21,9 @@ import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadat
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * No-op iterator. Used when more source tasks are available than Kafka partitions
+ */
public class KafkaIdleConsumerIterator implements KafkaConsumerIterator {
private static final Logger LOG = LoggerFactory.getLogger(KafkaIdleConsumerIterator.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/ea710204/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
index 9da1bea..daebaaf 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaMultiplePartitionsIterator.java
@@ -29,6 +29,11 @@ import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+/**
+ * Iterator over multiple Kafka partitions.
+ *
+ * This is needed when num partitions > num kafka sources.
+ */
public class KafkaMultiplePartitionsIterator implements KafkaConsumerIterator {
private static final Logger LOG = LoggerFactory.getLogger(KafkaMultiplePartitionsIterator.class);
@@ -55,9 +60,13 @@ public class KafkaMultiplePartitionsIterator implements KafkaConsumerIterator {
@Override
public void initialize() throws InterruptedException {
+ LOG.info("Initializing iterator with {} partitions", partitions.size());
+ String partInfo = "";
for (KafkaSinglePartitionIterator partition : partitions) {
partition.initialize();
+ partInfo += partition.toString() + " ";
}
+ LOG.info("Initialized partitions {}", partInfo);
}
@Override
@@ -91,7 +100,7 @@ public class KafkaMultiplePartitionsIterator implements KafkaConsumerIterator {
// do not wait if a new message has been fetched
if (!gotNewMessage) {
try {
- Thread.sleep(consumerConfig.props().getInt(PersistentKafkaSource.WAIT_ON_EMPTY_FETCH_KEY));
+ Thread.sleep(consumerConfig.props().getInt(PersistentKafkaSource.WAIT_ON_EMPTY_FETCH_KEY), consumerConfig.fetchWaitMaxMs());
} catch (InterruptedException e) {
LOG.warn("Interrupted while waiting for new messages", e);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ea710204/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
index 0aaa771..bf3b3f4 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
@@ -29,6 +29,8 @@ import java.util.Set;
import kafka.consumer.ConsumerConfig;
import org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
import org.apache.flink.streaming.connectors.kafka.api.simple.MessageWithMetadata;
+import org.apache.flink.streaming.connectors.kafka.api.simple.PersistentKafkaSource;
+import org.apache.flink.streaming.connectors.kafka.api.simple.offset.BeginningOffset;
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.CurrentOffset;
import org.apache.flink.streaming.connectors.kafka.api.simple.offset.KafkaOffset;
import org.slf4j.Logger;
@@ -48,6 +50,8 @@ import kafka.message.MessageAndOffset;
/**
* Iterates the records received from a partition of a Kafka topic as byte arrays.
+ *
+ * This code is in parts based on https://gist.github.com/ashrithr/5811266.
*/
public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Serializable {
@@ -55,8 +59,6 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
private static final Logger LOG = LoggerFactory.getLogger(KafkaSinglePartitionIterator.class);
- private static final long DEFAULT_WAIT_ON_EMPTY_FETCH = 10000L;
-
private List<String> hosts;
private String topic;
private int partition;
@@ -107,24 +109,12 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
* Initializes the connection by detecting the leading broker of
* the topic and establishing a connection to it.
*/
- public void initialize() throws InterruptedException {
+ public void initialize() {
if (LOG.isInfoEnabled()) {
LOG.info("Initializing consumer {} / {} with hosts {}", topic, partition, hosts);
}
- PartitionMetadata metadata;
- do {
- metadata = findLeader(hosts, topic, partition);
- try {
- Thread.sleep(DEFAULT_WAIT_ON_EMPTY_FETCH);
- } catch (InterruptedException e) {
- throw new InterruptedException("Establishing connection to Kafka failed");
- }
- } while (metadata == null);
-
- if (metadata.leader() == null) {
- throw new RuntimeException("Can't find Leader for Topic and Partition. (at " + hosts + ")");
- }
+ PartitionMetadata metadata = getPartitionMetadata();
leadBroker = metadata.leader();
clientName = "Client_" + topic + "_" + partition;
@@ -134,16 +124,7 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
try {
readOffset = initialOffset.getOffset(consumer, topic, partition, clientName);
} catch (NotLeaderForPartitionException e) {
- do {
- metadata = findLeader(hosts, topic, partition);
-
- try {
- Thread.sleep(DEFAULT_WAIT_ON_EMPTY_FETCH);
- } catch (InterruptedException ie) {
- throw new InterruptedException("Establishing connection to Kafka failed");
- }
- } while (metadata == null);
- readOffset = initialOffset.getOffset(consumer, topic, partition, clientName);
+ throw new RuntimeException("Unable to get offset",e);
}
try {
@@ -156,6 +137,38 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
}
}
+ private PartitionMetadata getPartitionMetadata() {
+ PartitionMetadata metadata;
+ int retry = 0;
+ int waitTime = consumerConfig.props().getInt(PersistentKafkaSource.WAIT_ON_FAILED_LEADER_MS_KEY, PersistentKafkaSource.WAIT_ON_FAILED_LEADER__MS_DEFAULT);
+ do {
+ metadata = findLeader(hosts, topic, partition);
+ /*try {
+ Thread.sleep(10000);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Establishing connection to Kafka failed", e);
+ } */
+ if(metadata == null) {
+ retry++;
+ if(retry == consumerConfig.props().getInt(PersistentKafkaSource.MAX_FAILED_LEADER_RETRIES_KEY, PersistentKafkaSource.MAX_FAILED_LEADER_RETRIES_DEFAULT)) {
+ throw new RuntimeException("Tried finding a leader "+retry+" times without success");
+ }
+ LOG.warn("Unable to get leader and partition metadata. Waiting {} ms until retrying. Retries so far {}",waitTime, retry-1);
+ try {
+ Thread.sleep(waitTime);
+ } catch (InterruptedException e) {
+ throw new RuntimeException("Establishing connection to Kafka failed", e);
+ }
+ }
+ } while (metadata == null);
+
+ if (metadata.leader() == null) {
+ throw new RuntimeException("Can't find Leader for Topic and Partition. (at " + hosts + ")");
+ }
+
+ return metadata;
+ }
+
/**
* Sets the partition to read from.
*
@@ -185,18 +198,18 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
*
* @return next message as a byte array.
*/
- public byte[] next() throws InterruptedException {
+ public byte[] next() {
return nextWithOffset().getMessage();
}
- public boolean fetchHasNext() throws InterruptedException {
+ public boolean fetchHasNext() {
synchronized (fetchResponse) {
if (!iter.hasNext()) {
try {
resetFetchResponse(readOffset);
} catch (ClosedChannelException e) {
if (LOG.isWarnEnabled()) {
- LOG.warn("Got ClosedChannelException, trying to find new leader.");
+ LOG.warn("Got ClosedChannelException, trying to find new leader.", e);
}
findNewLeader();
}
@@ -213,7 +226,7 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
*
* @return next message and its offset.
*/
- public MessageWithMetadata nextWithOffset() throws InterruptedException {
+ public MessageWithMetadata nextWithOffset() {
synchronized (fetchResponse) {
if (!iter.hasNext()) {
@@ -243,7 +256,7 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
// Internal utilities
// --------------------------------------------------------------------------------------------
- private void resetFetchResponse(long offset) throws InterruptedException, ClosedChannelException {
+ private void resetFetchResponse(long offset) throws ClosedChannelException {
FetchRequest req = new FetchRequestBuilder().clientId(clientName)
.addFetch(topic, partition, offset, consumerConfig.fetchMessageMaxBytes()).build();
@@ -258,10 +271,18 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
if (code == ErrorMapping.OffsetOutOfRangeCode()) {
if (LOG.isErrorEnabled()) {
- LOG.error("Asked for invalid offset {}, setting the offset to the latest.", offset);
+ LOG.error("Asked for invalid offset {}", offset);
+ }
+ String reset = consumerConfig.autoOffsetReset();
+ if(reset.equals("smallest")) {
+ LOG.info("Setting read offset to beginning (smallest)");
+ readOffset = new BeginningOffset().getOffset(consumer, topic, partition, clientName);
+ } else if(reset.equals("largest")) {
+ LOG.info("Setting read offset to current offset (largest)");
+ readOffset = new CurrentOffset().getOffset(consumer, topic, partition, clientName);
+ } else {
+ throw new RuntimeException("Unknown 'autooffset.reset' mode '"+reset+"' Supported values are 'smallest' and 'largest'.");
}
-
- readOffset = new CurrentOffset().getOffset(consumer, topic, partition, clientName);
}
findNewLeader();
@@ -270,15 +291,14 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
iter = fetchResponse.messageSet(topic, partition).iterator();
}
- private void findNewLeader() throws InterruptedException {
+ private void findNewLeader() {
consumer.close();
consumer = null;
leadBroker = findNewLeader(leadBroker, topic, partition);
consumer = new SimpleConsumer(leadBroker.host(), leadBroker.port(), consumerConfig.socketTimeoutMs(), consumerConfig.socketReceiveBufferBytes(), clientName);
}
- private PartitionMetadata findLeader(List<String> addresses, String a_topic,
- int a_partition) {
+ private PartitionMetadata findLeader(List<String> addresses, String topic, int partition) {
PartitionMetadata returnMetaData = null;
loop:
@@ -295,7 +315,7 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
SimpleConsumer consumer = null;
try {
consumer = new SimpleConsumer(host, port, consumerConfig.socketTimeoutMs(), consumerConfig.socketReceiveBufferBytes(), "leaderLookup");
- List<String> topics = Collections.singletonList(a_topic);
+ List<String> topics = Collections.singletonList(topic);
TopicMetadataRequest req = new TopicMetadataRequest(topics);
@@ -304,7 +324,7 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
List<TopicMetadata> metaData = resp.topicsMetadata();
for (TopicMetadata item : metaData) {
for (PartitionMetadata part : item.partitionsMetadata()) {
- if (part.partitionId() == a_partition) {
+ if (part.partitionId() == partition) {
returnMetaData = part;
break loop;
}
@@ -313,10 +333,9 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
} catch (Exception e) {
if (e instanceof ClosedChannelException) {
LOG.warn("Got ClosedChannelException while trying to communicate with Broker" +
- "[{}] to find Leader for [{}, {}]. Trying other replicas.", address, a_topic, a_partition);
+ "[{}] to find Leader for [{}, {}]. Trying other replicas.", address, topic, partition);
} else {
- throw new RuntimeException("Error communicating with Broker [" + address
- + "] to find Leader for [" + a_topic + ", " + a_partition + "]", e);
+ throw new RuntimeException("Error communicating with Broker [" + address + "] to find Leader for [" + topic + ", " + partition + "]", e);
}
} finally {
if (consumer != null) {
@@ -333,18 +352,18 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
return returnMetaData;
}
- private Broker findNewLeader(Broker a_oldLeader, String a_topic, int a_partition) throws InterruptedException {
+ private Broker findNewLeader(Broker oldLeader, String topic, int a_partition) {
for (int i = 0; i < 3; i++) {
if (LOG.isInfoEnabled()) {
LOG.info("Trying to find a new leader after Broker failure.");
}
boolean goToSleep = false;
- PartitionMetadata metadata = findLeader(replicaBrokers, a_topic, a_partition);
+ PartitionMetadata metadata = findLeader(replicaBrokers, topic, a_partition);
if (metadata == null) {
goToSleep = true;
} else if (metadata.leader() == null) {
goToSleep = true;
- } else if (a_oldLeader.host().equalsIgnoreCase(metadata.leader().host()) && i == 0) {
+ } else if (oldLeader.host().equalsIgnoreCase(metadata.leader().host()) && i == 0) {
// first time through if the leader hasn't changed give ZooKeeper a second to recover
// second time, assume the broker did recover before failover, or it was a non-Broker issue
//
@@ -362,4 +381,12 @@ public class KafkaSinglePartitionIterator implements KafkaConsumerIterator, Seri
throw new RuntimeException("Unable to find new leader after Broker failure.");
}
+ public int getId() {
+ return this.partition;
+ }
+
+ @Override
+ public String toString() {
+ return "SinglePartitionIterator{partition="+partition+" readOffset="+readOffset+"}";
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ea710204/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java
index fef6325..3aec7ff 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/GivenOffset.java
@@ -19,6 +19,9 @@ package org.apache.flink.streaming.connectors.kafka.api.simple.offset;
import kafka.javaapi.consumer.SimpleConsumer;
+/**
+ * Offset given by a message read from Kafka.
+ */
public class GivenOffset extends KafkaOffset {
private static final long serialVersionUID = 1L;
http://git-wip-us.apache.org/repos/asf/flink/blob/ea710204/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
index ac45e32..2eaa2b8 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/offset/KafkaOffset.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.kafka.api.simple.offset;
import java.io.Serializable;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@@ -29,6 +30,9 @@ import kafka.common.TopicAndPartition;
import kafka.javaapi.OffsetResponse;
import kafka.javaapi.consumer.SimpleConsumer;
+/**
+ * Superclass for various kinds of KafkaOffsets.
+ */
public abstract class KafkaOffset implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(KafkaOffset.class);
@@ -38,6 +42,15 @@ public abstract class KafkaOffset implements Serializable {
public abstract long getOffset(SimpleConsumer consumer, String topic, int partition,
String clientName);
+ /**
+ *
+ * @param consumer
+ * @param topic
+ * @param partition
+ * @param whichTime Type of offset request (latest time / earliest time)
+ * @param clientName
+ * @return
+ */
protected long getLastOffset(SimpleConsumer consumer, String topic, int partition,
long whichTime, String clientName) {
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
@@ -49,22 +62,25 @@ public abstract class KafkaOffset implements Serializable {
OffsetResponse response = consumer.getOffsetsBefore(request);
while (response.hasError()) {
- switch (response.errorCode(topic, partition)) {
+ int errorCode = response.errorCode(topic, partition);
+ LOG.warn("Response has error. Error code "+errorCode);
+ switch (errorCode) {
case 6:
case 3:
LOG.warn("Kafka broker trying to fetch from a non-leader broker.");
break;
default:
- throw new RuntimeException("Error fetching data from Kafka broker. Reason: "
- + response.errorCode(topic, partition));
+ throw new RuntimeException("Error fetching data from Kafka broker. Error code " + errorCode);
}
- request = new kafka.javaapi.OffsetRequest(requestInfo,
- kafka.api.OffsetRequest.CurrentVersion(), clientName);
+ request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
response = consumer.getOffsetsBefore(request);
}
long[] offsets = response.offsets(topic, partition);
+ if(offsets.length > 1) {
+ LOG.warn("The offset response unexpectedly contained more than one offset: "+ Arrays.toString(offsets) + " Using only first one");
+ }
return offsets[0];
}
http://git-wip-us.apache.org/repos/asf/flink/blob/ea710204/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
index 9ecd0a3..87c6a34 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
@@ -670,8 +670,7 @@ public class KafkaITCase {
createTestTopic(topic, 2, 2);
KafkaTopicUtils kafkaTopicUtils = new KafkaTopicUtils(zookeeperConnectionString);
- final String leaderToShutDown =
- kafkaTopicUtils.waitAndGetPartitionMetadata(topic, 0).leader().get().connectionString();
+ final String leaderToShutDown = kafkaTopicUtils.waitAndGetPartitionMetadata(topic, 0).leader().get().connectionString();
final Thread brokerShutdown = new Thread(new Runnable() {
@Override
@@ -711,13 +710,14 @@ public class KafkaITCase {
consuming.addSink(new SinkFunction<String>() {
int elCnt = 0;
int start = 0;
- int numOfMessagesToReceive = 100;
+ int numOfMessagesToBeCorrect = 100;
+ int stopAfterMessages = 150;
- BitSet validator = new BitSet(numOfMessagesToReceive + 1);
+ BitSet validator = new BitSet(numOfMessagesToBeCorrect + 1);
@Override
public void invoke(String value) throws Exception {
- LOG.debug("Got " + value);
+ LOG.info("Got message = " + value + " leader has shut down "+leaderHasShutDown+" el cnt = "+elCnt+" to rec"+ numOfMessagesToBeCorrect);
String[] sp = value.split("-");
int v = Integer.parseInt(sp[1]);
@@ -736,16 +736,15 @@ public class KafkaITCase {
shutdownKafkaBroker = true;
}
- if (elCnt == numOfMessagesToReceive && leaderHasShutDown) {
- // check if everything in the bitset is set to true
- int nc;
- if ((nc = validator.nextClearBit(0)) != numOfMessagesToReceive) {
- throw new RuntimeException("The bitset was not set to 1 on all elements. Next clear:" + nc + " Set: " + validator);
+ if(leaderHasShutDown) { // it only makes sence to check once the shutdown is completed
+ if (elCnt >= stopAfterMessages ) {
+ // check if everything in the bitset is set to true
+ int nc;
+ if ((nc = validator.nextClearBit(0)) < numOfMessagesToBeCorrect) {
+ throw new RuntimeException("The bitset was not set to 1 on all elements to be checked. Next clear:" + nc + " Set: " + validator);
+ }
+ throw new SuccessException();
}
- throw new SuccessException();
- } else if (elCnt == numOfMessagesToReceive) {
- numOfMessagesToReceive += 50;
- LOG.info("Waiting for more messages till {}", numOfMessagesToReceive);
}
}
});
@@ -759,7 +758,9 @@ public class KafkaITCase {
LOG.info("Starting source.");
int cnt = 0;
while (running) {
- collector.collect("kafka-" + cnt++);
+ String msg = "kafka-" + cnt++;
+ collector.collect(msg);
+ LOG.info("sending message = "+msg);
if ((cnt - 1) % 20 == 0) {
LOG.debug("Sending message #{}", cnt - 1);
http://git-wip-us.apache.org/repos/asf/flink/blob/ea710204/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties
index dc20726..9ede613 100644
--- a/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties
+++ b/flink-staging/flink-streaming/flink-streaming-connectors/src/test/resources/log4j-test.properties
@@ -16,7 +16,7 @@
# limitations under the License.
################################################################################
-log4j.rootLogger=OFF, testlogger
+log4j.rootLogger=INFO, testlogger
log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
log4j.appender.testlogger.target = System.err