You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@crunch.apache.org by mk...@apache.org on 2016/10/20 02:35:02 UTC

crunch git commit: CRUNCH-621: Added check into hasPendingData to check if there is a large number of requests with no data to make sure there is still data there.

Repository: crunch
Updated Branches:
  refs/heads/master 5944f81b2 -> ef8d60f2b


CRUNCH-621: Added check into hasPendingData to check if there is a large number of requests with no data to make sure there is still data there.


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/ef8d60f2
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/ef8d60f2
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/ef8d60f2

Branch: refs/heads/master
Commit: ef8d60f2b51e552a0d2296d2b4711008cd0b58ee
Parents: 5944f81
Author: Micah Whitacre <mk...@gmail.com>
Authored: Tue Sep 13 10:35:35 2016 -0500
Committer: Micah Whitacre <mk...@apache.org>
Committed: Wed Oct 19 21:12:49 2016 -0500

----------------------------------------------------------------------
 .../org/apache/crunch/kafka/KafkaUtils.java     | 13 +++++
 .../kafka/inputformat/KafkaRecordReader.java    | 52 +++++++++++++++--
 .../kafka/inputformat/KafkaRecordReaderIT.java  | 61 ++++++++++++++++++++
 3 files changed, 122 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/ef8d60f2/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
index 9065bee..f3da5e9 100644
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
@@ -71,6 +71,19 @@ public class KafkaUtils {
   public static final String KAFKA_RETRY_ATTEMPTS_DEFAULT_STRING = Integer.toString(KAFKA_RETRY_ATTEMPTS_DEFAULT);
 
   /**
+   * Configuration property for the number of retry attempts that will be made to Kafka in the event of getting empty
+   * responses.
+   */
+  public static final String KAFKA_EMPTY_RETRY_ATTEMPTS_KEY = "org.apache.crunch.kafka.retry.empty.attempts";
+
+  /**
+   * Default number of empty retry attempts.
+   */
+  public static final int KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT = 10;
+  public static final String KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT_STRING =
+    Integer.toString(KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT);
+
+  /**
    * Converts the provided {@code config} into a {@link Properties} object to connect with Kafka.
    * @param config the config to read properties
    * @return a properties instance populated with all of the values inside the provided {@code config}.

http://git-wip-us.apache.org/repos/asf/crunch/blob/ef8d60f2/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java
index ad73217..14c8030 100644
--- a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java
@@ -17,7 +17,9 @@
  */
 package org.apache.crunch.kafka.inputformat;
 
+import kafka.api.OffsetRequest;
 import org.apache.crunch.CrunchRuntimeException;
+import org.apache.crunch.kafka.KafkaUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.RecordReader;
@@ -34,11 +36,15 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
 
 import static org.apache.crunch.kafka.KafkaSource.CONSUMER_POLL_TIMEOUT_DEFAULT;
 import static org.apache.crunch.kafka.KafkaSource.CONSUMER_POLL_TIMEOUT_KEY;
+import static org.apache.crunch.kafka.KafkaUtils.KAFKA_EMPTY_RETRY_ATTEMPTS_KEY;
 import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_ATTEMPTS_DEFAULT;
 import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_ATTEMPTS_KEY;
+import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT;
 import static org.apache.crunch.kafka.KafkaUtils.getKafkaConnectionProperties;
 
 /**
@@ -59,15 +65,23 @@ public class KafkaRecordReader<K, V> extends RecordReader<K, V> {
   private long startingOffset;
   private long currentOffset;
   private int maxNumberAttempts;
+  private Properties connectionProperties;
+  private TopicPartition topicPartition;
+  private int concurrentEmptyResponses;
+  private int maxConcurrentEmptyResponses;
 
   @Override
   public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
-    consumer = new KafkaConsumer<>(getKafkaConnectionProperties(taskAttemptContext.getConfiguration()));
     if(!(inputSplit instanceof KafkaInputSplit)){
       throw new CrunchRuntimeException("InputSplit for RecordReader is not valid split type.");
     }
     KafkaInputSplit split = (KafkaInputSplit) inputSplit;
-    TopicPartition topicPartition = split.getTopicPartition();
+    topicPartition = split.getTopicPartition();
+
+    connectionProperties = getKafkaConnectionProperties(taskAttemptContext.getConfiguration());
+
+    consumer = new KafkaConsumer<>(connectionProperties);
+
     consumer.assign(Collections.singletonList(topicPartition));
     //suggested hack to gather info without gathering data
     consumer.poll(0);
@@ -86,6 +100,8 @@ public class KafkaRecordReader<K, V> extends RecordReader<K, V> {
     Configuration config = taskAttemptContext.getConfiguration();
     consumerPollTimeout = config.getLong(CONSUMER_POLL_TIMEOUT_KEY, CONSUMER_POLL_TIMEOUT_DEFAULT);
     maxNumberAttempts = config.getInt(KAFKA_RETRY_ATTEMPTS_KEY, KAFKA_RETRY_ATTEMPTS_DEFAULT);
+    maxConcurrentEmptyResponses = config.getInt(KAFKA_EMPTY_RETRY_ATTEMPTS_KEY, KAFKA_RETRY_EMPTY_ATTEMPTS_DEFAULT);
+    concurrentEmptyResponses = 0;
   }
 
   @Override
@@ -130,7 +146,19 @@ public class KafkaRecordReader<K, V> extends RecordReader<K, V> {
   private boolean hasPendingData(){
     //offset range is exclusive at the end which means the ending offset is one higher
     // than the actual physical last offset
-    return currentOffset < endingOffset-1;
+
+    boolean hasPending = currentOffset < endingOffset-1;
+
+    if(concurrentEmptyResponses > maxConcurrentEmptyResponses){
+      long earliest = getEarliestOffset();
+      if(earliest == endingOffset){
+        LOG.warn("Possible data loss for {} as earliest {} is equal to {} and greater than expected current {}.",
+          new Object[]{topicPartition, earliest, endingOffset, currentOffset});
+        return false;
+      }
+    }
+
+    return hasPending;
   }
 
   private Iterator<ConsumerRecord<K, V>> getRecords() {
@@ -151,11 +179,14 @@ public class KafkaRecordReader<K, V> extends RecordReader<K, V> {
           }
         }
         if((records == null || records.isEmpty()) && hasPendingData()){
-          LOG.warn("No records retrieved but pending offsets to consume therefore polling again.");
+          concurrentEmptyResponses++;
+          LOG.warn("No records retrieved but pending offsets to consume therefore polling again. Attempt {}/{}",
+            concurrentEmptyResponses, maxConcurrentEmptyResponses);
         }else{
           success = true;
         }
       }
+      concurrentEmptyResponses = 0;
 
       if(records == null || records.isEmpty()){
         LOG.info("No records retrieved from Kafka therefore nothing to iterate over.");
@@ -171,6 +202,19 @@ public class KafkaRecordReader<K, V> extends RecordReader<K, V> {
     return consumer;
   }
 
+  protected long getEarliestOffset(){
+    Map<TopicPartition, Long> brokerOffsets = KafkaUtils
+      .getBrokerOffsets(connectionProperties, OffsetRequest.EarliestTime(), topicPartition.topic());
+    Long offset = brokerOffsets.get(topicPartition);
+    if(offset == null){
+      LOG.debug("Unable to determine earliest offset for {} so returning earliest {}", topicPartition,
+        OffsetRequest.EarliestTime());
+      return OffsetRequest.EarliestTime();
+    }
+    LOG.debug("Earliest offset for {} is {}", topicPartition, offset);
+    return offset;
+  }
+
   @Override
   public void close() throws IOException {
     LOG.debug("Closing the record reader.");

http://git-wip-us.apache.org/repos/asf/crunch/blob/ef8d60f2/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java
index 15970c1..c15b4d9 100644
--- a/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java
@@ -294,6 +294,47 @@ public class KafkaRecordReaderIT {
     assertThat(keysRead.size(), is(keys.size()));
   }
 
+  @Test
+  public void pollEarliestEqualsEnding() throws IOException, InterruptedException {
+    List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10);
+
+    Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic);
+    Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic);
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      Long endingOffset = endOffsets.get(entry.getKey());
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
+    }
+
+    KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
+
+    Set<String> keysRead = new HashSet<>();
+    //read all data from all splits
+    for (Map.Entry<TopicPartition, Pair<Long, Long>> partitionInfo : offsets.entrySet()) {
+      KafkaInputSplit split = new KafkaInputSplit(partitionInfo.getKey().topic(), partitionInfo.getKey().partition(),
+              partitionInfo.getValue().first(), partitionInfo.getValue().second());
+
+      when(consumer.poll(Matchers.anyLong())).thenReturn(ConsumerRecords.<String, String>empty());
+      KafkaRecordReader<String, String> recordReader = new EarliestRecordReader<>(consumer,
+              partitionInfo.getValue().second());
+      recordReader.initialize(split, context);
+
+      int numRecordsFound = 0;
+      while (recordReader.nextKeyValue()) {
+        keysRead.add(recordReader.getCurrentKey());
+        numRecordsFound++;
+      }
+      recordReader.close();
+
+      //assert that it encountered a partitions worth of data
+      assertThat(numRecordsFound, is(0));
+    }
+
+    //validate the same number of unique keys was read as were written.
+    assertThat(keysRead.size(), is(0));
+  }
+
 
   private static class NullAtStartKafkaRecordReader<K, V> extends KafkaRecordReader<K, V>{
 
@@ -342,4 +383,24 @@ public class KafkaRecordReaderIT {
     }
   }
 
+  private static class EarliestRecordReader<K,V> extends KafkaRecordReader<K, V>{
+
+    private final long earliest;
+    private final Consumer consumer;
+
+    public EarliestRecordReader(Consumer consumer, long earliest){
+      this.earliest = earliest;
+      this.consumer = consumer;
+    }
+
+    @Override
+    protected Consumer<K, V> getConsumer() {
+      return consumer;
+    }
+
+    @Override
+    protected long getEarliestOffset() {
+      return earliest;
+    }
+  }
 }