You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ew...@apache.org on 2018/05/22 04:10:40 UTC

[kafka] branch trunk updated: KAFKA-6685: Added Exception to distinguish message Key from Value during deserializing.

This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 95b46a1  KAFKA-6685: Added Exception to distinguish message Key from Value during deserializing.
95b46a1 is described below

commit 95b46a12e5a74da2699b3472c639cc82b28cea96
Author: Jagadesh Adireddi <ad...@gmail.com>
AuthorDate: Mon May 21 21:10:17 2018 -0700

    KAFKA-6685: Added Exception to distinguish message Key from Value during deserializing.
    
    https://issues.apache.org/jira/browse/KAFKA-6685
    
    Added Exception message in `WorkerSinkTask.convertMessages` to distinguish message Key from Value during deserialization to Kafka connect format.
    
    *More detailed description of your change,
    if necessary. The PR title and PR message become
    the squashed commit message, so use a separate
    comment to ping reviewers.*
    
    *Summary of testing strategy (including rationale)
    for the feature or bug fix. Unit and/or integration
    tests are expected for any behaviour change and
    system tests should be considered for larger changes.*
    
    Author: Jagadesh Adireddi <ad...@gmail.com>
    
    Reviewers: Randall Hauch <rh...@gmail.com>, Ewen Cheslack-Postava <ew...@confluent.io>
    
    Closes #4765 from jadireddi/KAFKA-6685---log-message-should-distinguish-key-from-value
---
 .../org/apache/kafka/connect/runtime/WorkerSinkTask.java   | 14 ++++++++++++--
 1 file changed, 12 insertions(+), 2 deletions(-)

diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 6edcfd4..e629798 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -477,8 +477,8 @@ class WorkerSinkTask extends WorkerTask {
         for (ConsumerRecord<byte[], byte[]> msg : msgs) {
             log.trace("{} Consuming and converting message in topic '{}' partition {} at offset {} and timestamp {}",
                     this, msg.topic(), msg.partition(), msg.offset(), msg.timestamp());
-            SchemaAndValue keyAndSchema = keyConverter.toConnectData(msg.topic(), msg.key());
-            SchemaAndValue valueAndSchema = valueConverter.toConnectData(msg.topic(), msg.value());
+            SchemaAndValue keyAndSchema = toConnectData(keyConverter, "key", msg, msg.key());
+            SchemaAndValue valueAndSchema = toConnectData(valueConverter, "value", msg, msg.value());
             Headers headers = convertHeadersFor(msg);
             Long timestamp = ConnectUtils.checkAndConvertTimestamp(msg.timestamp());
             SinkRecord origRecord = new SinkRecord(msg.topic(), msg.partition(),
@@ -505,6 +505,16 @@ class WorkerSinkTask extends WorkerTask {
         sinkTaskMetricsGroup.recordConsumedOffsets(origOffsets);
     }
 
+    private SchemaAndValue toConnectData(Converter converter, String converterName, ConsumerRecord<byte[], byte[]> msg, byte[] data) {
+        try {
+            return converter.toConnectData(msg.topic(), data);
+        } catch (Throwable e) {
+            String str = String.format("Error converting message %s in topic '%s' partition %d at offset %d and timestamp %d",
+                    converterName, msg.topic(), msg.partition(), msg.offset(), msg.timestamp());
+            throw new ConnectException(str, e);
+        }
+    }
+
     private Headers convertHeadersFor(ConsumerRecord<byte[], byte[]> record) {
         Headers result = new ConnectHeaders();
         org.apache.kafka.common.header.Headers recordHeaders = record.headers();

-- 
To stop receiving notification emails like this one, please contact
ewencp@apache.org.