You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by su...@apache.org on 2021/06/30 00:21:48 UTC

[gobblin] branch master updated: [GOBBLIN-1483] Handle null valued ConsumerRecords in Kafka Streaming Extractor[]

This is an automated email from the ASF dual-hosted git repository.

suvasude pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 15c88f3  [GOBBLIN-1483] Handle null valued ConsumerRecords in Kafka Streaming Extractor[]
15c88f3 is described below

commit 15c88f33330f60f62b18c367d02220839d7ab1a6
Author: suvasude <su...@linkedin.biz>
AuthorDate: Tue Jun 29 17:21:41 2021 -0700

    [GOBBLIN-1483] Handle null valued ConsumerRecords in Kafka Streaming Extractor[]
    
    Closes #3323 from sv2000/npeKafkaNullRecord
---
 .../extract/kafka/KafkaExtractorStatsTracker.java  | 31 +++++++++++
 .../extract/kafka/KafkaStreamingExtractor.java     | 40 +++++++++-----
 .../kafka/KafkaExtractorStatsTrackerTest.java      | 17 ++++++
 .../extract/kafka/KafkaStreamTestUtils.java        | 62 ++++++++++------------
 .../extract/kafka/KafkaStreamingExtractorTest.java | 36 +++++++++----
 5 files changed, 131 insertions(+), 55 deletions(-)

diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
index 25c94d6..b1bf197 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTracker.java
@@ -66,6 +66,7 @@ public class KafkaExtractorStatsTracker {
   private static final String MIN_LOG_APPEND_TIMESTAMP = "minLogAppendTimestamp";
   private static final String MAX_LOG_APPEND_TIMESTAMP = "maxLogAppendTimestamp";
   private static final String UNDECODABLE_MESSAGE_COUNT = "undecodableMessageCount";
+  private static final String NULL_RECORD_COUNT = "nullRecordCount";
   private static final String PARTITION_TOTAL_SIZE = "partitionTotalSize";
   private static final String AVG_RECORD_PULL_TIME = "avgRecordPullTime";
   private static final String AVG_RECORD_SIZE = "avgRecordSize";
@@ -92,6 +93,9 @@ public class KafkaExtractorStatsTracker {
   //TopicPartitions.
   @Getter
   private int undecodableMessageCount = 0;
+  @Getter
+  private int nullRecordCount = 0;
+
   private List<KafkaPartition> partitions;
   private long maxPossibleLatency;
 
@@ -157,6 +161,7 @@ public class KafkaExtractorStatsTracker {
   @Data
   public static class ExtractorStats {
     private long decodingErrorCount = -1L;
+    private long nullRecordCount = -1L;
     private double avgMillisPerRecord = -1;
     private long avgRecordSize;
     private long elapsedTime;
@@ -198,6 +203,15 @@ public class KafkaExtractorStatsTracker {
   }
 
   /**
+   *
+   * @param partitionIdx index of Kafka topic partition.
+   * @return the number of null valued records for a given partition id.
+   */
+  public Long getNullRecordCount(int partitionIdx) {
+    return this.statsMap.get(this.partitions.get(partitionIdx)).getNullRecordCount();
+  }
+
+  /**
    * Called when the KafkaExtractor encounters an undecodeable record.
    */
   public void onUndecodeableRecord(int partitionIdx) {
@@ -206,6 +220,22 @@ public class KafkaExtractorStatsTracker {
     incrementErrorCount(partitionIdx);
   }
 
+  public void onNullRecord(int partitionIdx) {
+    this.nullRecordCount++;
+    incrementNullRecordCount(partitionIdx);
+  }
+
+  private void incrementNullRecordCount(int partitionIdx) {
+    this.statsMap.computeIfPresent(this.partitions.get(partitionIdx), (k, v) -> {
+      if (v.nullRecordCount < 0) {
+        v.nullRecordCount = 1;
+      } else {
+        v.nullRecordCount++;
+      }
+      return v;
+    });
+  }
+
   private void incrementErrorCount(int partitionIdx) {
     this.statsMap.computeIfPresent(this.partitions.get(partitionIdx), (k, v) -> {
       if (v.decodingErrorCount < 0) {
@@ -412,6 +442,7 @@ public class KafkaExtractorStatsTracker {
         Long.toString(TimeUnit.NANOSECONDS.toMillis(stats.getFetchMessageBufferTime())));
     tagsForPartition.put(READ_RECORD_TIME, Long.toString(TimeUnit.NANOSECONDS.toMillis(stats.getReadRecordTime())));
     tagsForPartition.put(UNDECODABLE_MESSAGE_COUNT, Long.toString(stats.getDecodingErrorCount()));
+    tagsForPartition.put(NULL_RECORD_COUNT, Long.toString(stats.getNullRecordCount()));
     tagsForPartition.put(LAST_RECORD_HEADER_TIMESTAMP, Long.toString(stats.getLastSuccessfulRecordHeaderTimestamp()));
 
     // Commit avg time to pull a record for each partition
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
index 9f2ac76..bc2904b 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractor.java
@@ -87,7 +87,8 @@ import static org.apache.gobblin.source.extractor.extract.kafka.workunit.packer.
 public class KafkaStreamingExtractor<S> extends FlushingExtractor<S, DecodeableKafkaRecord> {
   public static final String DATASET_KEY = "dataset";
   public static final String DATASET_PARTITION_KEY = "datasetPartition";
-  private static final Long MAX_LOG_DECODING_ERRORS = 5L;
+  private static final Long MAX_LOG_ERRORS = 100L;
+
   private static final String KAFKA_EXTRACTOR_STATS_REPORTING_INTERVAL_MINUTES_KEY =
       "gobblin.kafka.extractor.statsReportingIntervalMinutes";
   private static final Long DEFAULT_KAFKA_EXTRACTOR_STATS_REPORTING_INTERVAL_MINUTES = 1L;
@@ -350,20 +351,33 @@ public class KafkaStreamingExtractor<S> extends FlushingExtractor<S, DecodeableK
     this.readStartTime = System.nanoTime();
     long fetchStartTime = System.nanoTime();
     try {
-      while (this.messageIterator == null || !this.messageIterator.hasNext()) {
-        Long currentTime = System.currentTimeMillis();
-        //it's time to flush, so break the while loop and directly return null
-        if ((currentTime - timeOfLastFlush) > this.flushIntervalMillis) {
-          return new FlushRecordEnvelope();
+      DecodeableKafkaRecord kafkaConsumerRecord;
+      while(true) {
+        while (this.messageIterator == null || !this.messageIterator.hasNext()) {
+          Long currentTime = System.currentTimeMillis();
+          //it's time to flush, so break the while loop and directly return null
+          if ((currentTime - timeOfLastFlush) > this.flushIntervalMillis) {
+            return new FlushRecordEnvelope();
+          }
+          try {
+            fetchStartTime = System.nanoTime();
+            this.messageIterator = this.kafkaConsumerClient.consume();
+          } catch (Exception e) {
+            log.error("Failed to consume from Kafka", e);
+          }
         }
-        try {
-          fetchStartTime = System.nanoTime();
-          this.messageIterator = this.kafkaConsumerClient.consume();
-        } catch (Exception e) {
-          log.error("Failed to consume from Kafka", e);
+        kafkaConsumerRecord = (DecodeableKafkaRecord) this.messageIterator.next();
+        if (kafkaConsumerRecord.getValue() != null) {
+          break;
+        } else {
+          //Filter the null-valued records early, so that they do not break the pipeline downstream.
+          if (shouldLogError()) {
+            log.error("Encountered a null-valued record at offset: {}, partition: {}", kafkaConsumerRecord.getOffset(),
+                kafkaConsumerRecord.getPartition());
+          }
+          this.statsTracker.onNullRecord(this.partitionIdToIndexMap.get(kafkaConsumerRecord.getPartition()));
         }
       }
-      DecodeableKafkaRecord kafkaConsumerRecord = (DecodeableKafkaRecord) this.messageIterator.next();
 
       int partitionIndex = this.partitionIdToIndexMap.get(kafkaConsumerRecord.getPartition());
       this.statsTracker.onFetchNextMessageBuffer(partitionIndex, fetchStartTime);
@@ -395,7 +409,7 @@ public class KafkaStreamingExtractor<S> extends FlushingExtractor<S, DecodeableK
   }
 
   private boolean shouldLogError() {
-    return this.statsTracker.getUndecodableMessageCount() <= MAX_LOG_DECODING_ERRORS;
+    return (this.statsTracker.getUndecodableMessageCount() + this.statsTracker.getNullRecordCount()) <= MAX_LOG_ERRORS;
   }
 
   @Override
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
index 9cecc59..bef77d8 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractorStatsTrackerTest.java
@@ -74,6 +74,23 @@ public class KafkaExtractorStatsTrackerTest {
   }
 
   @Test
+  public void testOnNullRecord() {
+    //Ensure that counters are initialized correctly
+    Assert.assertEquals(this.extractorStatsTracker.getNullRecordCount(0).longValue(), -1);
+    Assert.assertEquals(this.extractorStatsTracker.getNullRecordCount(0).longValue(), -1);
+
+    //Ensure that counters are updated correctly after 1st call to KafkaExtractorStatsTracker#onNullRecord()
+    this.extractorStatsTracker.onNullRecord(0);
+    Assert.assertEquals(this.extractorStatsTracker.getNullRecordCount(0).longValue(), 1);
+    Assert.assertEquals(this.extractorStatsTracker.getNullRecordCount(1).longValue(), -1);
+
+    //Ensure that counters are updated correctly after 2nd call to KafkaExtractorStatsTracker#onUndecodeableRecord()
+    this.extractorStatsTracker.onNullRecord(0);
+    Assert.assertEquals(this.extractorStatsTracker.getNullRecordCount(0).longValue(), 2);
+    Assert.assertEquals(this.extractorStatsTracker.getNullRecordCount(1).longValue(), -1);
+  }
+
+  @Test
   public void testResetStartFetchEpochTime() {
     long currentTime = System.currentTimeMillis();
     this.extractorStatsTracker.resetStartFetchEpochTime(1);
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamTestUtils.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamTestUtils.java
index 731c0b1..25647a0 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamTestUtils.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamTestUtils.java
@@ -43,7 +43,6 @@ import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
 import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient;
 import org.apache.gobblin.kafka.client.KafkaConsumerRecord;
 import org.apache.gobblin.metrics.kafka.KafkaSchemaRegistry;
-import org.apache.gobblin.metrics.kafka.SchemaRegistryException;
 import org.apache.gobblin.source.extractor.extract.LongWatermark;
 import org.apache.gobblin.util.ConfigUtils;
 
@@ -78,23 +77,21 @@ public class KafkaStreamTestUtils {
    */
   public static class MockKafkaConsumerClient implements GobblinKafkaConsumerClient {
     public static final String NUM_PARTITIONS_ASSIGNED = "gobblin.kafka.streaming.numPartitions";
+    public static final String CAN_RETURN_NULL_VALUED_RECORDS = "gobblin.kafka.streaming.canReturnNulls";
 
     private final Map<KafkaPartition, Long> latestOffsets = Maps.newHashMap();
     private final Random random = new Random();
     private final String topicName;
+    private final boolean canReturnNullValuedRecords;
     private final List<Integer> partitionIds;
 
     protected MockKafkaConsumerClient(Config baseConfig) {
       this.topicName = baseConfig.getString(KafkaSource.TOPIC_NAME);
       int numPartitionsAssigned = ConfigUtils.getInt(baseConfig, NUM_PARTITIONS_ASSIGNED, 0);
+      this.canReturnNullValuedRecords = ConfigUtils.getBoolean(baseConfig, CAN_RETURN_NULL_VALUED_RECORDS, false);
       this.partitionIds = getPartitionIds(baseConfig, numPartitionsAssigned);
     }
 
-    public MockKafkaConsumerClient() {
-      topicName = "";
-      partitionIds = Lists.newArrayList();
-    }
-
     private List<Integer> getPartitionIds(Config baseConfig, int numPartitionsAssigned) {
       List<Integer> partitionIds = Lists.newArrayList();
       for (int i = 0; i < numPartitionsAssigned; i++) {
@@ -110,7 +107,7 @@ public class KafkaStreamTestUtils {
      */
     @Override
     public Iterator<KafkaConsumerRecord> consume() {
-      return new MockIterator(this.topicName, this.partitionIds);
+      return new MockIterator(this.topicName, this.partitionIds, this.canReturnNullValuedRecords);
     }
 
     @Override
@@ -125,14 +122,12 @@ public class KafkaStreamTestUtils {
     }
 
     @Override
-    public long getEarliestOffset(KafkaPartition partition)
-        throws KafkaOffsetRetrievalFailureException {
+    public long getEarliestOffset(KafkaPartition partition) {
       return 0;
     }
 
     @Override
-    public long getLatestOffset(KafkaPartition partition)
-        throws KafkaOffsetRetrievalFailureException {
+    public long getLatestOffset(KafkaPartition partition) {
       return 0;
     }
 
@@ -180,23 +175,23 @@ public class KafkaStreamTestUtils {
     }
 
     @Override
-    protected Schema fetchSchemaByKey(String key) throws SchemaRegistryException {
+    protected Schema fetchSchemaByKey(String key) {
       return null;
     }
 
     @Override
-    public Schema getLatestSchemaByTopic(String topic) throws SchemaRegistryException {
+    public Schema getLatestSchemaByTopic(String topic) {
       return latestSchema;
     }
 
     @Override
-    public String register(Schema schema) throws SchemaRegistryException {
+    public String register(Schema schema) {
       return null;
     }
 
     @Override
-    public String register(Schema schema, String name) throws SchemaRegistryException {
-      this.latestSchema = schema;
+    public String register(Schema schema, String name) {
+      latestSchema = schema;
       return schema.toString();
     }
   }
@@ -209,20 +204,18 @@ public class KafkaStreamTestUtils {
     }
 
     @Override
-    public String register(String name, Schema schema)
-        throws org.apache.gobblin.kafka.schemareg.SchemaRegistryException {
+    public String register(String name, Schema schema) {
       this.latestSchema = schema;
       return schema.toString();
     }
 
     @Override
-    public Schema getById(String id) throws IOException, org.apache.gobblin.kafka.schemareg.SchemaRegistryException {
+    public Schema getById(String id) {
       return null;
     }
 
     @Override
-    public Schema getLatestSchema(String name)
-        throws IOException, org.apache.gobblin.kafka.schemareg.SchemaRegistryException {
+    public Schema getLatestSchema(String name) {
       return this.latestSchema;
     }
 
@@ -239,7 +232,7 @@ public class KafkaStreamTestUtils {
    * partition ids.
    */
   public static class MockIterator implements Iterator<KafkaConsumerRecord> {
-    //Schema for LiKafka10ConsumerRecords. TODO: Enhance the iterator to return random records
+    // Schema for LiKafka10ConsumerRecords. TODO: Enhance the iterator to return random records
     // according to a given schema.
     private static final String SCHEMA =
         "{\"namespace\": \"example.avro\",\n" + " \"type\": \"record\",\n" + " \"name\": \"user\",\n"
@@ -247,22 +240,23 @@ public class KafkaStreamTestUtils {
             + "     {\"name\": \"DUMMY\", \"type\": [\"null\",\"string\"]}\n" + " ]\n" + "}";
 
     private final Schema schema = new Schema.Parser().parse(SCHEMA);
-    private final Random random = new Random();
     private final String topicName;
     private final long maxNumRecords;
     private final List<Integer> partitionIds;
     private final long[] nextOffsets;
+    private final boolean canReturnNullRecords;
     private long numRecordsReturnedSoFar;
     private int partitionIdx = 0;
 
-    public MockIterator(String topicName, List<Integer> partitionIds) {
-      this(topicName, partitionIds, Long.MAX_VALUE);
+    public MockIterator(String topicName, List<Integer> partitionIds, boolean canReturnNullRecords) {
+      this(topicName, partitionIds, canReturnNullRecords, Long.MAX_VALUE);
     }
 
-    public MockIterator(String topicName, List<Integer> partitionIds, long numRecords) {
+    public MockIterator(String topicName, List<Integer> partitionIds, boolean canReturnNullRecords, long numRecords) {
       this.topicName = topicName;
       this.maxNumRecords = numRecords;
       this.partitionIds = partitionIds;
+      this.canReturnNullRecords = canReturnNullRecords;
       this.nextOffsets = new long[partitionIds.size()];
     }
 
@@ -287,10 +281,10 @@ public class KafkaStreamTestUtils {
     @Override
     public KafkaConsumerRecord next() {
       this.numRecordsReturnedSoFar++;
-      return getMockConsumerRecord(this.numRecordsReturnedSoFar);
+      return getMockConsumerRecord();
     }
 
-    private KafkaConsumerRecord getMockConsumerRecord(long numRecordsReturnedSoFar) {
+    private KafkaConsumerRecord getMockConsumerRecord() {
       DecodeableKafkaRecord mockRecord = Mockito.mock(DecodeableKafkaRecord.class);
       Mockito.when(mockRecord.getValue()).thenReturn(getRecord());
       Mockito.when(mockRecord.getTopic()).thenReturn(topicName);
@@ -303,12 +297,14 @@ public class KafkaStreamTestUtils {
     }
 
     private GenericRecord getRecord() {
-      GenericRecord record = new GenericData.Record(schema);
-      record.put("name", UUID.randomUUID());
-      return record;
+      if ((!this.canReturnNullRecords) || (this.numRecordsReturnedSoFar % 2 == 0)) {
+        GenericRecord record = new GenericData.Record(schema);
+        record.put("name", UUID.randomUUID());
+        return record;
+      } else {
+        return null;
+      }
     }
   }
-
-
 }
 
diff --git a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractorTest.java b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractorTest.java
index 927fe74..922d130 100644
--- a/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractorTest.java
+++ b/gobblin-modules/gobblin-kafka-common/src/test/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaStreamingExtractorTest.java
@@ -26,9 +26,12 @@ import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
 import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.kafka.client.DecodeableKafkaRecord;
 import org.apache.gobblin.publisher.DataPublisher;
 import org.apache.gobblin.source.extractor.DataRecordException;
 import org.apache.gobblin.source.extractor.extract.FlushingExtractor;
+import org.apache.gobblin.stream.RecordEnvelope;
+import org.apache.gobblin.stream.StreamEntity;
 
 
 public class KafkaStreamingExtractorTest {
@@ -37,9 +40,9 @@ public class KafkaStreamingExtractorTest {
 
   @BeforeClass
   public void setUp() {
-    WorkUnitState state = KafkaExtractorUtils.getWorkUnitState("testTopic", numPartitions);
-    state.setProp(FlushingExtractor.FLUSH_DATA_PUBLISHER_CLASS, TestDataPublisher.class.getName());
-    this.streamingExtractor = new KafkaStreamingExtractor(state);
+    WorkUnitState state1 = KafkaExtractorUtils.getWorkUnitState("testTopic", numPartitions);
+    state1.setProp(FlushingExtractor.FLUSH_DATA_PUBLISHER_CLASS, TestDataPublisher.class.getName());
+    this.streamingExtractor = new KafkaStreamingExtractor(state1);
   }
 
   @Test
@@ -48,17 +51,17 @@ public class KafkaStreamingExtractorTest {
     MultiLongWatermark highWatermark1 = new MultiLongWatermark(this.streamingExtractor.highWatermark);
 
     //Read 3 records
-    this.streamingExtractor.readStreamEntityImpl();
+    StreamEntity<DecodeableKafkaRecord> streamEntity = this.streamingExtractor.readStreamEntityImpl();
     Assert.assertEquals(this.streamingExtractor.nextWatermark.get(0), 1L);
     Assert.assertEquals(this.streamingExtractor.nextWatermark.get(1), 0L);
     Assert.assertEquals(this.streamingExtractor.nextWatermark.get(2), 0L);
 
-    streamingExtractor.readStreamEntityImpl();
+    this.streamingExtractor.readStreamEntityImpl();
     Assert.assertEquals(this.streamingExtractor.nextWatermark.get(0), 1L);
     Assert.assertEquals(this.streamingExtractor.nextWatermark.get(1), 1L);
     Assert.assertEquals(this.streamingExtractor.nextWatermark.get(2), 0L);
 
-    streamingExtractor.readStreamEntityImpl();
+    this.streamingExtractor.readStreamEntityImpl();
     Assert.assertEquals(this.streamingExtractor.nextWatermark.get(0), 1L);
     Assert.assertEquals(this.streamingExtractor.nextWatermark.get(1), 1L);
     Assert.assertEquals(this.streamingExtractor.nextWatermark.get(2), 1L);
@@ -103,7 +106,7 @@ public class KafkaStreamingExtractorTest {
   }
 
   @Test
-  public void testGenerateAdditionalTagHelper() throws Exception {
+  public void testGenerateAdditionalTagHelper() {
     // Verifying that produce rate has been added.
     Map<KafkaPartition, Map<String, String>> result = this.streamingExtractor.getAdditionalTagsHelper();
     for (Map<String, String> entry: result.values()) {
@@ -111,6 +114,22 @@ public class KafkaStreamingExtractorTest {
     }
   }
 
+  @Test
+  public void testReadRecordEnvelopeImpl()
+      throws IOException {
+    WorkUnitState state = KafkaExtractorUtils.getWorkUnitState("testTopic", numPartitions);
+    state.setProp(FlushingExtractor.FLUSH_DATA_PUBLISHER_CLASS, TestDataPublisher.class.getName());
+    //Enable config that allows underlying KafkaConsumerClient to return null-valued Kafka records.
+    state.setProp(KafkaStreamTestUtils.MockKafkaConsumerClient.CAN_RETURN_NULL_VALUED_RECORDS, "true");
+    KafkaStreamingExtractor streamingExtractorWithNulls = new KafkaStreamingExtractor(state);
+
+    //Extract 4 records. Ensure each record returned by readRecordEnvelopeImpl() is a non-null valued record.
+    for (int i = 0; i < 4; i++) {
+      RecordEnvelope<DecodeableKafkaRecord> recordEnvelope = streamingExtractorWithNulls.readRecordEnvelopeImpl();
+      Assert.assertNotNull(recordEnvelope.getRecord().getValue() != null);
+    }
+  }
+
   static class TestDataPublisher extends DataPublisher {
     public TestDataPublisher(WorkUnitState state) {
       super(state);
@@ -131,8 +150,7 @@ public class KafkaStreamingExtractorTest {
     }
 
     @Override
-    public void close()
-        throws IOException {
+    public void close() {
 
     }
   }