You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/06/11 01:46:00 UTC

[GitHub] [kafka] rhauch commented on a change in pull request #7496: KAFKA-9018: Throw clearer exceptions on serialisation errors

rhauch commented on a change in pull request #7496:
URL: https://github.com/apache/kafka/pull/7496#discussion_r438497632



##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -507,6 +508,18 @@ private SinkRecord convertAndTransformRecord(final ConsumerRecord<byte[], byte[]
         return transformationChain.apply(origRecord);
     }
 
+    private SchemaAndValue convertKeyValue(ConsumerRecord<byte[], byte[]> msg, boolean isKey) {
+        try {
+            byte[] value = isKey ? msg.key() : msg.value();
+            Converter converter = isKey ? keyConverter : valueConverter;
+            return converter.toConnectData(msg.topic(), msg.headers(), value);
+        } catch (Exception e) {
+            log.error("Error converting message {} in topic '{}' partition {} at offset {}",
+                    isKey ? ConverterType.KEY.getName() : ConverterType.VALUE.getName(), msg.topic(), msg.partition(), msg.offset());
+            throw e;
+        }
+    }
+

Review comment:
       Since the calling code already knows whether it's a key or value, how about just having separate methods? Yeah, they'd be mostly the same, but we could avoid the superfluous logic and could simplify things a bit.
   
   Also, would it be better to wrap the exception rather than just log the error? Especially with the retry operator, it's possible that the error won't get logged near this log message, so we'd lose the correlation.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org