You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by mk...@apache.org on 2016/10/20 02:35:02 UTC
crunch git commit: CRUNCH-621: Added check into hasPendingData to
check if there is a large number of requests with no data to make sure there
is still data there.
Repository: crunch
Updated Branches:
refs/heads/master 5944f81b2 -> ef8d60f2b
CRUNCH-621: Added check into hasPendingData to check if there is a large number of requests with no data to make sure there is still data there.
Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ef8d60f2
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ef8d60f2
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ef8d60f2
Branch: refs/heads/master
Commit: ef8d60f2b51e552a0d2296d2b4711008cd0b58ee
Parents: 5944f81
Author: Micah Whitacre <mk...@gmail.com>
Authored: Tue Sep 13 10:35:35 2016 -0500
Committer: Micah Whitacre <mk...@apache.org>
Committed: Wed Oct 19 21:12:49 2016 -0500
----------------------------------------------------------------------
.../org/apache/crunch/kafka/KafkaUtils.java | 13 +++++
.../kafka/inputformat/KafkaRecordReader.java | 52 +++++++++++++++--
.../kafka/inputformat/KafkaRecordReaderIT.java | 61 ++++++++++++++++++++
3 files changed, 122 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/crunch/blob/ef8d60f2/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
index 9065bee..f3da5e9 100644
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
@@ -71,6 +71,19 @@ public class KafkaUtils {
public static final String KAFKA_RETRY_ATTEMPTS_DEFAULT_STRING = Integer.toString(KAFKA_RETRY_ATTEMPTS_DEFAULT);
/**
+ * Configuration property for the number of retry attempts that will be made to Kafka in the event of getting empty
+ * responses.
+ */
+ public static final String KAFKA_EMPTY_RETRY_ATTEMPTS_KEY = "org.apache.crunch.kafka.retry.empty.attempts";
+
+ /**
+ * Default number of empty retry attempts.
+ */
+ public static final int KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT = 10;
+ public static final String KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT_STRING =
+ Integer.toString(KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT);
+
+ /**
* Converts the provided {@code config} into a {@link Properties} object to connect with Kafka.
* @param config the config to read properties
* @return a properties instance populated with all of the values inside the provided {@code config}.
http://git-wip-us.apache.org/repos/asf/crunch/blob/ef8d60f2/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java
index ad73217..14c8030 100644
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java
@@ -17,7 +17,9 @@
*/
package org.apache.crunch.kafka.inputformat;
+import kafka.api.OffsetRequest;
import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.kafka.KafkaUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.RecordReader;
@@ -34,11 +36,15 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
import static org.apache.crunch.kafka.KafkaSource.CONSUMER_POLL_TIMEOUT_DEFAULT;
import static org.apache.crunch.kafka.KafkaSource.CONSUMER_POLL_TIMEOUT_KEY;
+import static org.apache.crunch.kafka.KafkaUtils.KAFKA_EMPTY_RETRY_ATTEMPTS_KEY;
import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_ATTEMPTS_DEFAULT;
import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_ATTEMPTS_KEY;
+import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT;
import static org.apache.crunch.kafka.KafkaUtils.getKafkaConnectionProperties;
/**
@@ -59,15 +65,23 @@ public class KafkaRecordReader<K, V> extends RecordReader<K, V> {
private long startingOffset;
private long currentOffset;
private int maxNumberAttempts;
+ private Properties connectionProperties;
+ private TopicPartition topicPartition;
+ private int concurrentEmptyResponses;
+ private int maxConcurrentEmptyResponses;
@Override
public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
- consumer = new KafkaConsumer<>(getKafkaConnectionProperties(taskAttemptContext.getConfiguration()));
if(!(inputSplit instanceof KafkaInputSplit)){
throw new CrunchRuntimeException("InputSplit for RecordReader is not valid split type.");
}
KafkaInputSplit split = (KafkaInputSplit) inputSplit;
- TopicPartition topicPartition = split.getTopicPartition();
+ topicPartition = split.getTopicPartition();
+
+ connectionProperties = getKafkaConnectionProperties(taskAttemptContext.getConfiguration());
+
+ consumer = new KafkaConsumer<>(connectionProperties);
+
consumer.assign(Collections.singletonList(topicPartition));
//suggested hack to gather info without gathering data
consumer.poll(0);
@@ -86,6 +100,8 @@ public class KafkaRecordReader<K, V> extends RecordReader<K, V> {
Configuration config = taskAttemptContext.getConfiguration();
consumerPollTimeout = config.getLong(CONSUMER_POLL_TIMEOUT_KEY, CONSUMER_POLL_TIMEOUT_DEFAULT);
maxNumberAttempts = config.getInt(KAFKA_RETRY_ATTEMPTS_KEY, KAFKA_RETRY_ATTEMPTS_DEFAULT);
+ maxConcurrentEmptyResponses = config.getInt(KAFKA_EMPTY_RETRY_ATTEMPTS_KEY, KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT);
+ concurrentEmptyResponses = 0;
}
@Override
@@ -130,7 +146,19 @@ public class KafkaRecordReader<K, V> extends RecordReader<K, V> {
private boolean hasPendingData(){
//offset range is exclusive at the end which means the ending offset is one higher
// than the actual physical last offset
- return currentOffset < endingOffset-1;
+
+ boolean hasPending = currentOffset < endingOffset-1;
+
+ if(concurrentEmptyResponses > maxConcurrentEmptyResponses){
+ long earliest = getEarliestOffset();
+ if(earliest == endingOffset){
+ LOG.warn("Possible data loss for {} as earliest {} is equal to {} and greater than expected current {}.",
+ new Object[]{topicPartition, earliest, endingOffset, currentOffset});
+ return false;
+ }
+ }
+
+ return hasPending;
}
private Iterator<ConsumerRecord<K, V>> getRecords() {
@@ -151,11 +179,14 @@ public class KafkaRecordReader<K, V> extends RecordReader<K, V> {
}
}
if((records == null || records.isEmpty()) && hasPendingData()){
- LOG.warn("No records retrieved but pending offsets to consume therefore polling again.");
+ concurrentEmptyResponses++;
+ LOG.warn("No records retrieved but pending offsets to consume therefore polling again. Attempt {}/{}",
+ concurrentEmptyResponses, maxConcurrentEmptyResponses);
}else{
success = true;
}
}
+ concurrentEmptyResponses = 0;
if(records == null || records.isEmpty()){
LOG.info("No records retrieved from Kafka therefore nothing to iterate over.");
@@ -171,6 +202,19 @@ public class KafkaRecordReader<K, V> extends RecordReader<K, V> {
return consumer;
}
+ protected long getEarliestOffset(){
+ Map<TopicPartition, Long> brokerOffsets = KafkaUtils
+ .getBrokerOffsets(connectionProperties, OffsetRequest.EarliestTime(), topicPartition.topic());
+ Long offset = brokerOffsets.get(topicPartition);
+ if(offset == null){
+ LOG.debug("Unable to determine earliest offset for {} so returning earliest {}", topicPartition,
+ OffsetRequest.EarliestTime());
+ return OffsetRequest.EarliestTime();
+ }
+ LOG.debug("Earliest offset for {} is {}", topicPartition, offset);
+ return offset;
+ }
+
@Override
public void close() throws IOException {
LOG.debug("Closing the record reader.");
http://git-wip-us.apache.org/repos/asf/crunch/blob/ef8d60f2/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java
index 15970c1..c15b4d9 100644
--- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java
@@ -294,6 +294,47 @@ public class KafkaRecordReaderIT {
assertThat(keysRead.size(), is(keys.size()));
}
+ @Test
+ public void pollEarliestEqualsEnding() throws IOException, InterruptedException {
+ List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10);
+
+ Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic);
+ Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic);
+
+ Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+ for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+ Long endingOffset = endOffsets.get(entry.getKey());
+ offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
+ }
+
+ KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
+
+ Set<String> keysRead = new HashSet<>();
+ //read all data from all splits
+ for (Map.Entry<TopicPartition, Pair<Long, Long>> partitionInfo : offsets.entrySet()) {
+ KafkaInputSplit split = new KafkaInputSplit(partitionInfo.getKey().topic(), partitionInfo.getKey().partition(),
+ partitionInfo.getValue().first(), partitionInfo.getValue().second());
+
+ when(consumer.poll(Matchers.anyLong())).thenReturn(ConsumerRecords.<String, String>empty());
+ KafkaRecordReader<String, String> recordReader = new EarliestRecordReader<>(consumer,
+ partitionInfo.getValue().second());
+ recordReader.initialize(split, context);
+
+ int numRecordsFound = 0;
+ while (recordReader.nextKeyValue()) {
+ keysRead.add(recordReader.getCurrentKey());
+ numRecordsFound++;
+ }
+ recordReader.close();
+
+ //assert that it encountered a partitions worth of data
+ assertThat(numRecordsFound, is(0));
+ }
+
+ //validate the same number of unique keys was read as were written.
+ assertThat(keysRead.size(), is(0));
+ }
+
private static class NullAtStartKafkaRecordReader<K, V> extends KafkaRecordReader<K, V>{
@@ -342,4 +383,24 @@ public class KafkaRecordReaderIT {
}
}
+ private static class EarliestRecordReader<K,V> extends KafkaRecordReader<K, V>{
+
+ private final long earliest;
+ private final Consumer consumer;
+
+ public EarliestRecordReader(Consumer consumer, long earliest){
+ this.earliest = earliest;
+ this.consumer = consumer;
+ }
+
+ @Override
+ protected Consumer<K, V> getConsumer() {
+ return consumer;
+ }
+
+ @Override
+ protected long getEarliestOffset() {
+ return earliest;
+ }
+ }
}