You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "mjsax (via GitHub)" <gi...@apache.org> on 2023/04/12 06:49:04 UTC

[GitHub] [kafka] mjsax commented on a diff in pull request #13477: KAFKA-7499: Handle serialization error in ProductionExceptionHandler

mjsax commented on code in PR #13477:
URL: https://github.com/apache/kafka/pull/13477#discussion_r1163683492


##########
streams/src/main/java/org/apache/kafka/streams/errors/ProductionExceptionHandler.java:
##########
@@ -34,6 +35,18 @@ public interface ProductionExceptionHandler extends Configurable {
     ProductionExceptionHandlerResponse handle(final ProducerRecord<byte[], byte[]> record,
                                               final Exception exception);
 
+    /**
+     * Handles serialization exception and determine if the process should continue. The default implementation is to
+     * fail the process.
+     *
+     * @param record        the record that failed to serialize
+     * @param exception     the exception that occurred during serialization
+     */
+    default ProductionExceptionHandlerResponse onSerializationException(final ProducerRecord record,

Review Comment:
   The KIP say that we would add
   ```
   handleSerializationException(ProducerRecord record, Exception exception); 
   ```
   
   We need to stick to the KIP, or alter the KIP. But why would we need to alter the KIP?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -213,6 +212,31 @@ public <K, V> void send(final String topic,
                     keyClass,
                     valueClass),
                 exception);
+        } catch (final SerializationException exception) {
+            final ProducerRecord<K, V> record = new ProducerRecord<>(topic, partition, timestamp, key, value, headers);
+            final ProductionExceptionHandler.ProductionExceptionHandlerResponse response;
+            try {
+                response = productionExceptionHandler.onSerializationException(record, exception);
+            } catch (final Exception e) {
+                log.error("Fatal handling serialization exception on record {}", record, e);
+                recordSendError(topic, e, null);
+                return;
+            }
+
+            if (response == ProductionExceptionHandlerResponse.FAIL) {
+                log.error("Fatal handling serialization exception on record {}", record, exception);
+                recordSendError(topic, exception, null);
+                return;
+            }
+
+            log.warn("Unable to serialize {}. Continue processing. " +
+                    "ProducerRecord(key=[{}], value=[{}], topic=[{}], partition=[{}], timestamp=[{}])",
+                keyBytes == null ? "key" : "value",

Review Comment:
   Even if `keyBytes` is `null`, it's not clear if key or value failed -- the key could have been `null`.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -213,6 +212,31 @@ public <K, V> void send(final String topic,
                     keyClass,
                     valueClass),
                 exception);
+        } catch (final SerializationException exception) {
+            final ProducerRecord<K, V> record = new ProducerRecord<>(topic, partition, timestamp, key, value, headers);
+            final ProductionExceptionHandler.ProductionExceptionHandlerResponse response;
+            try {
+                response = productionExceptionHandler.onSerializationException(record, exception);
+            } catch (final Exception e) {
+                log.error("Fatal handling serialization exception on record {}", record, e);
+                recordSendError(topic, e, null);
+                return;
+            }
+
+            if (response == ProductionExceptionHandlerResponse.FAIL) {
+                log.error("Fatal handling serialization exception on record {}", record, exception);
+                recordSendError(topic, exception, null);
+                return;
+            }
+
+            log.warn("Unable to serialize {}. Continue processing. " +
+                    "ProducerRecord(key=[{}], value=[{}], topic=[{}], partition=[{}], timestamp=[{}])",

Review Comment:
   As above -- we should not log key/value.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -213,6 +212,31 @@ public <K, V> void send(final String topic,
                     keyClass,
                     valueClass),
                 exception);
+        } catch (final SerializationException exception) {
+            final ProducerRecord<K, V> record = new ProducerRecord<>(topic, partition, timestamp, key, value, headers);
+            final ProductionExceptionHandler.ProductionExceptionHandlerResponse response;
+            try {
+                response = productionExceptionHandler.onSerializationException(record, exception);
+            } catch (final Exception e) {
+                log.error("Fatal handling serialization exception on record {}", record, e);

Review Comment:
   We should never log `record` (maybe except for TRACE level), because it might leak data and is a security concern.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -213,6 +212,31 @@ public <K, V> void send(final String topic,
                     keyClass,
                     valueClass),
                 exception);
+        } catch (final SerializationException exception) {
+            final ProducerRecord<K, V> record = new ProducerRecord<>(topic, partition, timestamp, key, value, headers);
+            final ProductionExceptionHandler.ProductionExceptionHandlerResponse response;
+            try {
+                response = productionExceptionHandler.onSerializationException(record, exception);
+            } catch (final Exception e) {
+                log.error("Fatal handling serialization exception on record {}", record, e);
+                recordSendError(topic, e, null);
+                return;
+            }
+
+            if (response == ProductionExceptionHandlerResponse.FAIL) {
+                log.error("Fatal handling serialization exception on record {}", record, exception);

Review Comment:
   As above



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -213,6 +212,30 @@ public <K, V> void send(final String topic,
                     keyClass,
                     valueClass),
                 exception);
+        } catch (final SerializationException exception) {
+            final ProducerRecord<K, V> record = new ProducerRecord<>(topic, partition, timestamp, key, value, headers);
+            final ProductionExceptionHandler.ProductionExceptionHandlerResponse response;
+            try {
+                response = productionExceptionHandler.onSerializationException(record, exception);
+            } catch (final Exception e) {
+                log.error("Fatal handling serialization exception on record {}", record, e);
+                recordSendError(topic, e, null);
+                return;
+            }
+
+            if (response == ProductionExceptionHandlerResponse.FAIL) {
+                recordSendError(topic, exception, null);

Review Comment:
   Instead of calling `recordSendError` should we just re-throw directly -- otherwise, we might double log and worse actually even call `handle(...)` and it would be very confusing to call the handler (even if two different methods) twice for the same error, and even worse, the handler could not change it's mind and return `CONTINUE` on the second call.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -213,6 +212,31 @@ public <K, V> void send(final String topic,
                     keyClass,
                     valueClass),
                 exception);
+        } catch (final SerializationException exception) {
+            final ProducerRecord<K, V> record = new ProducerRecord<>(topic, partition, timestamp, key, value, headers);
+            final ProductionExceptionHandler.ProductionExceptionHandlerResponse response;
+            try {
+                response = productionExceptionHandler.onSerializationException(record, exception);
+            } catch (final Exception e) {
+                log.error("Fatal handling serialization exception on record {}", record, e);
+                recordSendError(topic, e, null);
+                return;
+            }
+
+            if (response == ProductionExceptionHandlerResponse.FAIL) {
+                log.error("Fatal handling serialization exception on record {}", record, exception);
+                recordSendError(topic, exception, null);
+                return;
+            }

Review Comment:
   We should also record if we drop a record via `droppedRecordsSensor.record()` (cf `recordSendError(...)`)



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -213,6 +212,31 @@ public <K, V> void send(final String topic,
                     keyClass,
                     valueClass),
                 exception);
+        } catch (final SerializationException exception) {
+            final ProducerRecord<K, V> record = new ProducerRecord<>(topic, partition, timestamp, key, value, headers);
+            final ProductionExceptionHandler.ProductionExceptionHandlerResponse response;
+            try {
+                response = productionExceptionHandler.onSerializationException(record, exception);
+            } catch (final Exception e) {
+                log.error("Fatal handling serialization exception on record {}", record, e);

Review Comment:
   Seems we don't log the original `exception` for this case, what could be a problem. Seem we might log it at least at DEBUG level?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org