You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@gobblin.apache.org by hu...@apache.org on 2019/04/21 04:05:00 UTC
[incubator-gobblin] branch master updated: [GOBBLIN-738] Open a way
to customize decoding KafkaConsumerRecord
This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 624fde8 [GOBBLIN-738] Open a way to customize decoding KafkaConsumerRecord
624fde8 is described below
commit 624fde8a7fa29d91db00c164864470239928b35a
Author: zhchen <zh...@linkedin.com>
AuthorDate: Sat Apr 20 21:04:55 2019 -0700
[GOBBLIN-738] Open a way to customize decoding KafkaConsumerRecord
Closes #2605 from zxcware/offset
---
.../extractor/extract/kafka/KafkaExtractor.java | 40 +++++++++++++---------
1 file changed, 24 insertions(+), 16 deletions(-)
diff --git a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
index 0a4685f..3ca24bb 100644
--- a/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
+++ b/gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaExtractor.java
@@ -211,25 +211,10 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
this.nextWatermark.set(this.currentPartitionIdx, nextValidMessage.getNextOffset());
try {
- D record = null;
// track time for decode/convert depending on the record type
long decodeStartTime = System.nanoTime();
- if (nextValidMessage instanceof ByteArrayBasedKafkaRecord) {
- record = decodeRecord((ByteArrayBasedKafkaRecord)nextValidMessage);
- } else if (nextValidMessage instanceof DecodeableKafkaRecord){
- // if value is null then this is a bad record that is returned for further error handling, so raise an error
- if (((DecodeableKafkaRecord) nextValidMessage).getValue() == null) {
- throw new DataRecordException("Could not decode Kafka record");
- }
-
- // get value from decodeable record and convert to the output schema if necessary
- record = convertRecord(((DecodeableKafkaRecord<?, D>) nextValidMessage).getValue());
- } else {
- throw new IllegalStateException(
- "Unsupported KafkaConsumerRecord type. The returned record can either be ByteArrayBasedKafkaRecord"
- + " or DecodeableKafkaRecord");
- }
+ D record = decodeKafkaMessage(nextValidMessage);
this.currentPartitionDecodeRecordTime += System.nanoTime() - decodeStartTime;
this.currentPartitionRecordCount++;
@@ -253,6 +238,29 @@ public abstract class KafkaExtractor<S, D> extends EventBasedExtractor<S, D> {
return null;
}
+ protected D decodeKafkaMessage(KafkaConsumerRecord message) throws DataRecordException, IOException {
+
+ D record = null;
+
+ if (message instanceof ByteArrayBasedKafkaRecord) {
+ record = decodeRecord((ByteArrayBasedKafkaRecord)message);
+ } else if (message instanceof DecodeableKafkaRecord){
+ // if value is null then this is a bad record that is returned for further error handling, so raise an error
+ if (((DecodeableKafkaRecord) message).getValue() == null) {
+ throw new DataRecordException("Could not decode Kafka record");
+ }
+
+ // get value from decodeable record and convert to the output schema if necessary
+ record = convertRecord(((DecodeableKafkaRecord<?, D>) message).getValue());
+ } else {
+ throw new IllegalStateException(
+ "Unsupported KafkaConsumerRecord type. The returned record can either be ByteArrayBasedKafkaRecord"
+ + " or DecodeableKafkaRecord");
+ }
+
+ return record;
+ }
+
@Override
public void shutdown()
throws JobShutdownException {