You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/10/20 14:38:34 UTC

[GitHub] [kafka] rhauch commented on a change in pull request #9359: kafka-10273 Connect Converters should produce actionable error messages

rhauch commented on a change in pull request #9359:
URL: https://github.com/apache/kafka/pull/9359#discussion_r508557355



##########
File path: connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
##########
@@ -79,7 +79,7 @@ public void configure(Map<String, ?> configs, boolean isKey) {
         try {
             return serializer.serialize(topic, value == null ? null : value.toString());
         } catch (SerializationException e) {
-            throw new DataException("Failed to serialize to a string: ", e);
+            throw new DataException(this.getClass() + " Failed to serialize to a string: ", e);

Review comment:
       Using `Class.toString()` method here will output a message like:
   ```
   class org.apache.kafka.connect.storage.StringConverter Failed to serialize to a string: ...
   ```
   First, beginning this exception message with `class` it not capitalized even though it's the start of a sentence, and `Failed` should not be capitalized in the middle of a sentence. Plus, I'm unconvinced that including the package is helpful.
   
   But more importantly, I don't think we should change these exception messages in any of the converters, because we'd have to change them in all converter implementations to remain consistent. While we could do that for AK-provided converters, there are lots of other converters that may have modeled their error messages after these, and would then become inconsistent. Plus, why change all of them when we're catching the exception in the WorkerSinkTask and WorkerSourceTask? Wouldn't it make more sense to change it there so that we don't have to change any converters?
   

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -531,19 +532,25 @@ private SchemaAndValue convertKey(ConsumerRecord<byte[], byte[]> msg) {
         try {
             return keyConverter.toConnectData(msg.topic(), msg.headers(), msg.key());
         } catch (Exception e) {
-            log.error("{} Error converting message key in topic '{}' partition {} at offset {} and timestamp {}: {}",
-                    this, msg.topic(), msg.partition(), msg.offset(), msg.timestamp(), e.getMessage(), e);
-            throw e;
+            String errorMessage = "Error while deserializing the key for record in topic %s, partition %s, timestamp %s, and at offset %s. " +
+                    "Check the key.converter and key.converter.* settings in the connector configuration, " +
+                    "and ensure that the converter matches the converter/serializer used by the application that produced this record. " +
+                    "Underlying converter error: %s";
+            log.error(String.format(errorMessage, msg.topic(), msg.partition(), msg.timestamp(), msg.offset(), e.getMessage()), e);
+            throw new DataException(String.format(errorMessage, msg.topic(), msg.partition(), msg.offset(), msg.timestamp(), e.getMessage()), e);

Review comment:
       Why not create a single formatted string and use it for both the log and exception messages? Also, would it be useful to have the short name of the converter class in the message as well? Maybe something like:
   ``````suggestion
               String errorMessage = String.format("Error while deserializing the key for record in topic '%s', partition %s (timestamp %s) at offset %s. " +
                       "Check the 'key.converter' and 'key.converter.*' settings in the connector configuration, " +
                       "and ensure that the converter matches the converter/serializer used by the application that produced this record. " +
                       "Underlying converter error: %s", keyConverter.getClass().getSimpleName(), msg.topic(), msg.partition(), msg.timestamp(), msg.offset(), e.getMessage());
               log.error(errorMessage, e);
               throw new DataException(errorMessage, e);
   ```
   Note the single quotes and slight rewording.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##########
@@ -531,19 +532,25 @@ private SchemaAndValue convertKey(ConsumerRecord<byte[], byte[]> msg) {
         try {
             return keyConverter.toConnectData(msg.topic(), msg.headers(), msg.key());
         } catch (Exception e) {
-            log.error("{} Error converting message key in topic '{}' partition {} at offset {} and timestamp {}: {}",
-                    this, msg.topic(), msg.partition(), msg.offset(), msg.timestamp(), e.getMessage(), e);
-            throw e;
+            String errorMessage = "Error while deserializing the key for record in topic %s, partition %s, timestamp %s, and at offset %s. " +
+                    "Check the key.converter and key.converter.* settings in the connector configuration, " +
+                    "and ensure that the converter matches the converter/serializer used by the application that produced this record. " +
+                    "Underlying converter error: %s";
+            log.error(String.format(errorMessage, msg.topic(), msg.partition(), msg.timestamp(), msg.offset(), e.getMessage()), e);
+            throw new DataException(String.format(errorMessage, msg.topic(), msg.partition(), msg.offset(), msg.timestamp(), e.getMessage()), e);
         }
     }
 
     private SchemaAndValue convertValue(ConsumerRecord<byte[], byte[]> msg) {
         try {
             return valueConverter.toConnectData(msg.topic(), msg.headers(), msg.value());
         } catch (Exception e) {
-            log.error("{} Error converting message value in topic '{}' partition {} at offset {} and timestamp {}: {}",
-                    this, msg.topic(), msg.partition(), msg.offset(), msg.timestamp(), e.getMessage(), e);
-            throw e;
+            String errorMessage = "Error while deserializing the value for record in topic %s, partition %s, timestamp %s, and at offset %s. " +
+                    "Check the value.converter and value.converter.* settings in the connector configuration, " +
+                    "and ensure that the converter matches the converter/serializer used by the application that produced this record. " +
+                    "Underlying converter error: %s";
+            log.error(String.format(errorMessage, msg.topic(), msg.partition(), msg.timestamp(), msg.offset(), e.getMessage()), e);
+            throw new DataException(String.format(errorMessage, msg.topic(), msg.partition(), msg.offset(), msg.timestamp(), e.getMessage()), e);

Review comment:
       Similar to above suggestion.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -602,6 +602,30 @@ private synchronized void finishSuccessfulFlush() {
         flushing = false;
     }
 
+    private byte[] convertKey(SourceRecord record, RecordHeaders headers) {
+        try {
+            return keyConverter.fromConnectData(record.topic(), headers, record.keySchema(), record.key());
+        } catch (Exception e) {
+            String errorMessage = "%s Error while serializing the key for a source record to topic: %s. Check the key.converter and key.converter.* " +
+                    "settings in the connector configuration, or in the worker configuration if the connector is inheriting the connector configuration. " +
+                    "Underlying converter error: %s";
+            log.error(String.format(errorMessage, this, record.topic(), e.getMessage()), e);
+            throw new RetriableException(String.format(errorMessage, this, record.topic(), e.getMessage()), e);
+        }
+    }
+
+    private byte[] convertValue(SourceRecord record, RecordHeaders headers) {
+        try {
+            return valueConverter.fromConnectData(record.topic(), headers, record.valueSchema(), record.value());
+        } catch (Exception e) {
+            String errorMessage = "%s Error while serializing the value for a source record to topic: %s. Check the value.converter and value.converter.* " +
+                    "settings in the connector configuration, or in the worker configuration if the connector is inheriting the connector configuration. " +
+                    "Underlying converter error: %s";
+            log.error(String.format(errorMessage, this, record.topic(), e.getMessage()), e);
+            throw new RetriableException(String.format(errorMessage, this, record.topic(), e.getMessage()), e);

Review comment:
       Similar to above suggestion.

##########
File path: connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##########
@@ -602,6 +602,30 @@ private synchronized void finishSuccessfulFlush() {
         flushing = false;
     }
 
+    private byte[] convertKey(SourceRecord record, RecordHeaders headers) {
+        try {
+            return keyConverter.fromConnectData(record.topic(), headers, record.keySchema(), record.key());
+        } catch (Exception e) {
+            String errorMessage = "%s Error while serializing the key for a source record to topic: %s. Check the key.converter and key.converter.* " +
+                    "settings in the connector configuration, or in the worker configuration if the connector is inheriting the connector configuration. " +
+                    "Underlying converter error: %s";
+            log.error(String.format(errorMessage, this, record.topic(), e.getMessage()), e);
+            throw new RetriableException(String.format(errorMessage, this, record.topic(), e.getMessage()), e);

Review comment:
       Similar to above suggestion.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org