You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pinot.apache.org by GitBox <gi...@apache.org> on 2022/07/20 21:40:32 UTC

[GitHub] [pinot] sajjad-moradi commented on a diff in pull request #9051: Handle unknown magic byte error in Confluent Avro decoder (#9045)

sajjad-moradi commented on code in PR #9051:
URL: https://github.com/apache/pinot/pull/9051#discussion_r926077254


##########
pinot-plugins/pinot-input-format/pinot-confluent-avro/src/main/java/org/apache/pinot/plugin/inputformat/avro/confluent/KafkaConfluentSchemaRegistryAvroMessageDecoder.java:
##########
@@ -103,12 +107,37 @@ public void init(Map<String, String> props, Set<String> fieldsToRead, String top
 
   @Override
   public GenericRow decode(byte[] payload, GenericRow destination) {
-    Record avroRecord = (Record) _deserializer.deserialize(_topicName, payload);
-    return _avroRecordExtractor.extract(avroRecord, destination);
+    try {
+      Record avroRecord = (Record) _deserializer.deserialize(_topicName, payload);
+      return _avroRecordExtractor.extract(avroRecord, destination);
+    } catch (RuntimeException e) {
+      ignoreOrRethrowException(e);
+      return null;
+    }
   }
 
   @Override
   public GenericRow decode(byte[] payload, int offset, int length, GenericRow destination) {
     return decode(Arrays.copyOfRange(payload, offset, offset + length), destination);
   }
+
+  /**
+   * This method handles specific serialisation exceptions. If the exception cannot be ignored the method
+   * re-throws the exception.
+   *
+   * @param e exception to handle
+   */
+  private void ignoreOrRethrowException(RuntimeException e) {
+    if (isUnknownMagicByte(e) || isUnknownMagicByte(e.getCause())) {
+      // Do nothing, the message is not an Avro message and can't be decoded
+      LOGGER.error("Caught exception while decoding row in topic {}, discarding row", _topicName, e);
+      return;

Review Comment:
   What do you think of incrementing a counter metric here? Normally people don't go through the logs, but they can get alerted on metrics.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@pinot.apache.org
For additional commands, e-mail: commits-help@pinot.apache.org