You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/04/21 04:05:00 UTC

[incubator-gobblin] branch master updated: [GOBBLIN-738] Open a way to customize decoding KafkaConsumerRecord

This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 624fde8  [GOBBLIN-738] Open a way to customize decoding KafkaConsumerRecord
624fde8 is described below

commit 624fde8a7fa29d91db00c164864470239928b35a
Author: zhchen <zh...@linkedin.com>
AuthorDate: Sat Apr 20 21:04:55 2019 -0700

    [GOBBLIN-738] Open a way to customize decoding KafkaConsumerRecord
    
    Closes #2605 from zxcware/offset
---
 .../extractor/extract/kafka/KafkaExtractor.java    | 40 +++++++++++++---------
 1 file changed, 24 insertions(+), 16 deletions(-)

diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
index 0a4685f..3ca24bb 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
@@ -211,25 +211,10 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
 
         this.nextWatermark.set(this.currentPartitionIdx, nextValidMessage.getNextOffset());
         try {
-          D record = null;
           // track time for decode/convert depending on the record type
           long decodeStartTime = System.nanoTime();
 
-          if (nextValidMessage instanceof ByteArrayBasedKafkaRecord) {
-            record = decodeRecord((ByteArrayBasedKafkaRecord)nextValidMessage);
-          } else if (nextValidMessage instanceof DecodeableKafkaRecord){
-            // if value is null then this is a bad record that is returned for further error handling, so raise an error
-            if (((DecodeableKafkaRecord) nextValidMessage).getValue() == null) {
-              throw new DataRecordException("Could not decode Kafka record");
-            }
-
-            // get value from decodeable record and convert to the output schema if necessary
-            record = convertRecord(((DecodeableKafkaRecord<?, D>) nextValidMessage).getValue());
-          } else {
-            throw new IllegalStateException(
-                "Unsupported KafkaConsumerRecord type. The returned record can either be ByteArrayBasedKafkaRecord"
-                    + " or DecodeableKafkaRecord");
-          }
+          D record = decodeKafkaMessage(nextValidMessage);
 
           this.currentPartitionDecodeRecordTime += System.nanoTime() - decodeStartTime;
           this.currentPartitionRecordCount++;
@@ -253,6 +238,29 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
     return null;
   }
 
+  protected D decodeKafkaMessage(KafkaConsumerRecord message) throws DataRecordException, IOException {
+
+    D record = null;
+
+    if (message instanceof ByteArrayBasedKafkaRecord) {
+      record = decodeRecord((ByteArrayBasedKafkaRecord)message);
+    } else if (message instanceof DecodeableKafkaRecord){
+      // if value is null then this is a bad record that is returned for further error handling, so raise an error
+      if (((DecodeableKafkaRecord) message).getValue() == null) {
+        throw new DataRecordException("Could not decode Kafka record");
+      }
+
+      // get value from decodeable record and convert to the output schema if necessary
+      record = convertRecord(((DecodeableKafkaRecord<?, D>) message).getValue());
+    } else {
+      throw new IllegalStateException(
+          "Unsupported KafkaConsumerRecord type. The returned record can either be ByteArrayBasedKafkaRecord"
+              + " or DecodeableKafkaRecord");
+    }
+
+    return record;
+  }
+
   @Override
   public void shutdown()
       throws JobShutdownException {