You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/03/31 01:43:41 UTC

[GitHub] [kafka] hachikuji opened a new pull request #10445: KAFKA-12548; Propagate record error messages to application

hachikuji opened a new pull request #10445:
URL: https://github.com/apache/kafka/pull/10445


   KIP-467 added a field in the produce response to allow the broker to indicate which specific records failed validation. This patch adds the logic to propagate this message up to the application.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10445: KAFKA-12548; Propagate record error messages to application

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10445:
URL: https://github.com/apache/kafka/pull/10445#discussion_r605971389



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
##########
@@ -181,20 +212,36 @@ public boolean isDone() {
      *
      * @param baseOffset The base offset of the messages assigned by the server
      * @param logAppendTime The log append time or -1 if CreateTime is being used
-     * @param exception The exception that occurred (or null if the request was successful)
+     * @param topLevelException The exception that occurred (or null if the request was successful)
+     * @param recordExceptionMap Record exception map keyed by batchIndex which overrides `topLevelException`
+     *                           for records present in the map
      * @return true if the batch was completed successfully and false if the batch was previously aborted
      */
-    public boolean done(long baseOffset, long logAppendTime, RuntimeException exception) {
-        final FinalState tryFinalState = (exception == null) ? FinalState.SUCCEEDED : FinalState.FAILED;
-
+    private boolean done(
+        long baseOffset,
+        long logAppendTime,
+        RuntimeException topLevelException,
+        Map<Integer, RuntimeException> recordExceptionMap
+    ) {
+        final FinalState tryFinalState = (topLevelException == null) ? FinalState.SUCCEEDED : FinalState.FAILED;
         if (tryFinalState == FinalState.SUCCEEDED) {
             log.trace("Successfully produced messages to {} with base offset {}.", topicPartition, baseOffset);
         } else {
-            log.trace("Failed to produce messages to {} with base offset {}.", topicPartition, baseOffset, exception);
+            log.trace("Failed to produce messages to {} with base offset {}.", topicPartition, baseOffset, topLevelException);
         }
 
         if (this.finalState.compareAndSet(null, tryFinalState)) {
-            completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);
+            Function<Integer, RuntimeException> recordExceptions = null;
+            if (topLevelException != null) {
+                recordExceptions = batchIndex -> {
+                    if (recordExceptionMap == null) {
+                        return topLevelException;
+                    } else {
+                        return recordExceptionMap.getOrDefault(batchIndex, topLevelException);

Review comment:
       This seems related to your other question. The top-level (i.e. partition-level) error code is really the only one we get from the response. At the record level, all we have is an error message.  The partition-level error should be one of either `InvalidRecordException` or `InvalidTimestampException` based on the current implementation in `LogValidator`:
   ```scala
       if (recordErrors.nonEmpty) {
         val errors = recordErrors.map(_.recordError)
         if (recordErrors.exists(_.apiError == Errors.INVALID_TIMESTAMP)) {
           throw new RecordValidationException(new InvalidTimestampException(
             "One or more records have been rejected due to invalid timestamp"), errors)
         } else {
           throw new RecordValidationException(new InvalidRecordException(
             "One or more records have been rejected"), errors)
         }
       }
   ```
   The invalid timestamp case seems problematic. The problem is that we don't have a way to tell whether an individual record error is actually an instance of this error.
   
   Given the current situation, here's what I'm thinking:
   
   1. If there are record errors present, the client will always raise them as InvalidRecordException. The best we can do is make sure the partition-level error gets logged so at least it ends up somewhere.
   2. If there are no record errors present, then we will raise the partition-level exception for each record.
   
   What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #10445: KAFKA-12548; Propagate record error messages to application

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #10445:
URL: https://github.com/apache/kafka/pull/10445#issuecomment-812753885


   LGTM. I'm fine to merge it as is and continue on https://issues.apache.org/jira/browse/KAFKA-12606


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on pull request #10445: KAFKA-12548; Propagate record error messages to application

Posted by GitBox <gi...@apache.org>.
hachikuji commented on pull request #10445:
URL: https://github.com/apache/kafka/pull/10445#issuecomment-812620642


   One of the test failures `testSendWithInvalidCreateTime` is due to this patch. I am thinking how to address.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10445: KAFKA-12548; Propagate record error messages to application

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10445:
URL: https://github.com/apache/kafka/pull/10445#discussion_r606034496



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
##########
@@ -181,20 +212,36 @@ public boolean isDone() {
      *
      * @param baseOffset The base offset of the messages assigned by the server
      * @param logAppendTime The log append time or -1 if CreateTime is being used
-     * @param exception The exception that occurred (or null if the request was successful)
+     * @param topLevelException The exception that occurred (or null if the request was successful)
+     * @param recordExceptionMap Record exception map keyed by batchIndex which overrides `topLevelException`
+     *                           for records present in the map
      * @return true if the batch was completed successfully and false if the batch was previously aborted
      */
-    public boolean done(long baseOffset, long logAppendTime, RuntimeException exception) {
-        final FinalState tryFinalState = (exception == null) ? FinalState.SUCCEEDED : FinalState.FAILED;
-
+    private boolean done(
+        long baseOffset,
+        long logAppendTime,
+        RuntimeException topLevelException,
+        Map<Integer, RuntimeException> recordExceptionMap
+    ) {
+        final FinalState tryFinalState = (topLevelException == null) ? FinalState.SUCCEEDED : FinalState.FAILED;
         if (tryFinalState == FinalState.SUCCEEDED) {
             log.trace("Successfully produced messages to {} with base offset {}.", topicPartition, baseOffset);
         } else {
-            log.trace("Failed to produce messages to {} with base offset {}.", topicPartition, baseOffset, exception);
+            log.trace("Failed to produce messages to {} with base offset {}.", topicPartition, baseOffset, topLevelException);
         }
 
         if (this.finalState.compareAndSet(null, tryFinalState)) {
-            completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);
+            Function<Integer, RuntimeException> recordExceptions = null;
+            if (topLevelException != null) {
+                recordExceptions = batchIndex -> {
+                    if (recordExceptionMap == null) {
+                        return topLevelException;
+                    } else {
+                        return recordExceptionMap.getOrDefault(batchIndex, topLevelException);

Review comment:
       I more or less implemented this. The only difference is that I decided to use a generic `KafkaException` for records which did not have a record error in the response. I opened https://issues.apache.org/jira/browse/KAFKA-12606 to think of a better solution.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10445: KAFKA-12548; Propagate record error messages to application

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10445:
URL: https://github.com/apache/kafka/pull/10445#discussion_r608208702



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
##########
@@ -107,7 +108,7 @@ Long checksumOrNull() {
     RecordMetadata value() {
         if (nextRecordMetadata != null)
             return nextRecordMetadata.value();
-        return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset,
+        return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.batchIndex,

Review comment:
       The argument in `RecordMetadata` is still `relativeOffset`. Is that intended? If so, it would be good to add a comment as it's a bit confusing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10445: KAFKA-12548; Propagate record error messages to application

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10445:
URL: https://github.com/apache/kafka/pull/10445#discussion_r606034496



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
##########
@@ -181,20 +212,36 @@ public boolean isDone() {
      *
      * @param baseOffset The base offset of the messages assigned by the server
      * @param logAppendTime The log append time or -1 if CreateTime is being used
-     * @param exception The exception that occurred (or null if the request was successful)
+     * @param topLevelException The exception that occurred (or null if the request was successful)
+     * @param recordExceptionMap Record exception map keyed by batchIndex which overrides `topLevelException`
+     *                           for records present in the map
      * @return true if the batch was completed successfully and false if the batch was previously aborted
      */
-    public boolean done(long baseOffset, long logAppendTime, RuntimeException exception) {
-        final FinalState tryFinalState = (exception == null) ? FinalState.SUCCEEDED : FinalState.FAILED;
-
+    private boolean done(
+        long baseOffset,
+        long logAppendTime,
+        RuntimeException topLevelException,
+        Map<Integer, RuntimeException> recordExceptionMap
+    ) {
+        final FinalState tryFinalState = (topLevelException == null) ? FinalState.SUCCEEDED : FinalState.FAILED;
         if (tryFinalState == FinalState.SUCCEEDED) {
             log.trace("Successfully produced messages to {} with base offset {}.", topicPartition, baseOffset);
         } else {
-            log.trace("Failed to produce messages to {} with base offset {}.", topicPartition, baseOffset, exception);
+            log.trace("Failed to produce messages to {} with base offset {}.", topicPartition, baseOffset, topLevelException);
         }
 
         if (this.finalState.compareAndSet(null, tryFinalState)) {
-            completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);
+            Function<Integer, RuntimeException> recordExceptions = null;
+            if (topLevelException != null) {
+                recordExceptions = batchIndex -> {
+                    if (recordExceptionMap == null) {
+                        return topLevelException;
+                    } else {
+                        return recordExceptionMap.getOrDefault(batchIndex, topLevelException);

Review comment:
       I more or less implemented this. The only difference is that I decided to use a generic `KafkaException` for records which did not have a record error in the response. I opened https://issues.apache.org/jira/browse/KAFKA-12606 to come up with a better solution.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10445: KAFKA-12548; Propagate record error messages to application

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10445:
URL: https://github.com/apache/kafka/pull/10445#discussion_r605980981



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
##########
@@ -181,20 +212,36 @@ public boolean isDone() {
      *
      * @param baseOffset The base offset of the messages assigned by the server
      * @param logAppendTime The log append time or -1 if CreateTime is being used
-     * @param exception The exception that occurred (or null if the request was successful)
+     * @param topLevelException The exception that occurred (or null if the request was successful)
+     * @param recordExceptionMap Record exception map keyed by batchIndex which overrides `topLevelException`
+     *                           for records present in the map
      * @return true if the batch was completed successfully and false if the batch was previously aborted
      */
-    public boolean done(long baseOffset, long logAppendTime, RuntimeException exception) {
-        final FinalState tryFinalState = (exception == null) ? FinalState.SUCCEEDED : FinalState.FAILED;
-
+    private boolean done(
+        long baseOffset,
+        long logAppendTime,
+        RuntimeException topLevelException,
+        Map<Integer, RuntimeException> recordExceptionMap
+    ) {
+        final FinalState tryFinalState = (topLevelException == null) ? FinalState.SUCCEEDED : FinalState.FAILED;
         if (tryFinalState == FinalState.SUCCEEDED) {
             log.trace("Successfully produced messages to {} with base offset {}.", topicPartition, baseOffset);
         } else {
-            log.trace("Failed to produce messages to {} with base offset {}.", topicPartition, baseOffset, exception);
+            log.trace("Failed to produce messages to {} with base offset {}.", topicPartition, baseOffset, topLevelException);
         }
 
         if (this.finalState.compareAndSet(null, tryFinalState)) {
-            completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);
+            Function<Integer, RuntimeException> recordExceptions = null;
+            if (topLevelException != null) {
+                recordExceptions = batchIndex -> {
+                    if (recordExceptionMap == null) {
+                        return topLevelException;
+                    } else {
+                        return recordExceptionMap.getOrDefault(batchIndex, topLevelException);

Review comment:
       Sounds good, I like this one.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on a change in pull request #10445: KAFKA-12548; Propagate record error messages to application

Posted by GitBox <gi...@apache.org>.
ijuma commented on a change in pull request #10445:
URL: https://github.com/apache/kafka/pull/10445#discussion_r608212051



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/FutureRecordMetadata.java
##########
@@ -107,7 +108,7 @@ Long checksumOrNull() {
     RecordMetadata value() {
         if (nextRecordMetadata != null)
             return nextRecordMetadata.value();
-        return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.relativeOffset,
+        return new RecordMetadata(result.topicPartition(), this.result.baseOffset(), this.batchIndex,

Review comment:
       I can also add a comment as part of https://github.com/apache/kafka/pull/10470 if that's easier (that's how I ran into this). Looking at the code, it does seem like it's intended.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #10445: KAFKA-12548; Propagate record error messages to application

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #10445:
URL: https://github.com/apache/kafka/pull/10445#discussion_r605867393



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
##########
@@ -181,20 +212,36 @@ public boolean isDone() {
      *
      * @param baseOffset The base offset of the messages assigned by the server
      * @param logAppendTime The log append time or -1 if CreateTime is being used
-     * @param exception The exception that occurred (or null if the request was successful)
+     * @param topLevelException The exception that occurred (or null if the request was successful)
+     * @param recordExceptionMap Record exception map keyed by batchIndex which overrides `topLevelException`
+     *                           for records present in the map
      * @return true if the batch was completed successfully and false if the batch was previously aborted
      */
-    public boolean done(long baseOffset, long logAppendTime, RuntimeException exception) {
-        final FinalState tryFinalState = (exception == null) ? FinalState.SUCCEEDED : FinalState.FAILED;
-
+    private boolean done(
+        long baseOffset,
+        long logAppendTime,
+        RuntimeException topLevelException,
+        Map<Integer, RuntimeException> recordExceptionMap
+    ) {
+        final FinalState tryFinalState = (topLevelException == null) ? FinalState.SUCCEEDED : FinalState.FAILED;
         if (tryFinalState == FinalState.SUCCEEDED) {
             log.trace("Successfully produced messages to {} with base offset {}.", topicPartition, baseOffset);
         } else {
-            log.trace("Failed to produce messages to {} with base offset {}.", topicPartition, baseOffset, exception);
+            log.trace("Failed to produce messages to {} with base offset {}.", topicPartition, baseOffset, topLevelException);
         }
 
         if (this.finalState.compareAndSet(null, tryFinalState)) {
-            completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);
+            Function<Integer, RuntimeException> recordExceptions = null;
+            if (topLevelException != null) {
+                recordExceptions = batchIndex -> {
+                    if (recordExceptionMap == null) {
+                        return topLevelException;
+                    } else {
+                        return recordExceptionMap.getOrDefault(batchIndex, topLevelException);

Review comment:
       Is it possible that the returned response has both top-level exception and not-null record-level exceptions, and they are not the same? If yes should we let the record-level exception to override top-level exception here?

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##########
@@ -689,30 +680,57 @@ private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionRespons
             transactionManager.handleCompletedBatch(batch, response);
         }
 
-        if (batch.done(response.baseOffset, response.logAppendTime, null)) {
+        if (batch.complete(response.baseOffset, response.logAppendTime)) {
             maybeRemoveAndDeallocateBatch(batch);
         }
     }
 
     private void failBatch(ProducerBatch batch,
                            ProduceResponse.PartitionResponse response,
-                           RuntimeException exception,
                            boolean adjustSequenceNumbers) {
-        failBatch(batch, response.baseOffset, response.logAppendTime, exception, adjustSequenceNumbers);
+        final RuntimeException topLevelException;
+        if (response.error == Errors.TOPIC_AUTHORIZATION_FAILED)
+            topLevelException = new TopicAuthorizationException(Collections.singleton(batch.topicPartition.topic()));
+        else if (response.error == Errors.CLUSTER_AUTHORIZATION_FAILED)
+            topLevelException = new ClusterAuthorizationException("The producer is not authorized to do idempotent sends");
+        else
+            topLevelException = response.error.exception(response.errorMessage);
+
+        if (response.recordErrors == null || response.recordErrors.isEmpty()) {
+            failBatch(batch, topLevelException, adjustSequenceNumbers);
+        } else {
+            Map<Integer, RuntimeException> recordErrorMap = new HashMap<>(response.recordErrors.size());
+            for (ProduceResponse.RecordError recordError : response.recordErrors) {
+                if (recordError.message != null) {
+                    recordErrorMap.put(recordError.batchIndex, response.error.exception(recordError.message));

Review comment:
       Hmm.. would we ever have customized error message that are not from `Errors` map? E.g. if pluggable modules record some non-ak errors.

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
##########
@@ -215,20 +262,25 @@ public boolean done(long baseOffset, long logAppendTime, RuntimeException except
         return false;
     }
 
-    private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
+    private void completeFutureAndFireCallbacks(
+        long baseOffset,
+        long logAppendTime,
+        Function<Integer, RuntimeException> recordExceptions
+    ) {
         // Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call
-        produceFuture.set(baseOffset, logAppendTime, exception);
+        produceFuture.set(baseOffset, logAppendTime, recordExceptions);
 
         // execute callbacks
-        for (Thunk thunk : thunks) {
+        for (int i = 0; i < thunks.size(); i++) {
             try {
-                if (exception == null) {
+                Thunk thunk = thunks.get(i);
+                if (recordExceptions == null) {

Review comment:
       nit: not introduced in this PR, but why we only check `if (thunk.callback != null)` in the first branch and not the second? Maybe we can move this check upfront right after line 276.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10445: KAFKA-12548; Propagate record error messages to application

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10445:
URL: https://github.com/apache/kafka/pull/10445#discussion_r605879590



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##########
@@ -689,30 +680,57 @@ private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionRespons
             transactionManager.handleCompletedBatch(batch, response);
         }
 
-        if (batch.done(response.baseOffset, response.logAppendTime, null)) {
+        if (batch.complete(response.baseOffset, response.logAppendTime)) {
             maybeRemoveAndDeallocateBatch(batch);
         }
     }
 
     private void failBatch(ProducerBatch batch,
                            ProduceResponse.PartitionResponse response,
-                           RuntimeException exception,
                            boolean adjustSequenceNumbers) {
-        failBatch(batch, response.baseOffset, response.logAppendTime, exception, adjustSequenceNumbers);
+        final RuntimeException topLevelException;
+        if (response.error == Errors.TOPIC_AUTHORIZATION_FAILED)
+            topLevelException = new TopicAuthorizationException(Collections.singleton(batch.topicPartition.topic()));
+        else if (response.error == Errors.CLUSTER_AUTHORIZATION_FAILED)
+            topLevelException = new ClusterAuthorizationException("The producer is not authorized to do idempotent sends");
+        else
+            topLevelException = response.error.exception(response.errorMessage);
+
+        if (response.recordErrors == null || response.recordErrors.isEmpty()) {
+            failBatch(batch, topLevelException, adjustSequenceNumbers);
+        } else {
+            Map<Integer, RuntimeException> recordErrorMap = new HashMap<>(response.recordErrors.size());
+            for (ProduceResponse.RecordError recordError : response.recordErrors) {
+                if (recordError.message != null) {
+                    recordErrorMap.put(recordError.batchIndex, response.error.exception(recordError.message));

Review comment:
       I think the expectation is that the error code returned at the partition level is meaningful for the record-level errors. The one we're really expecting here is INVALID_RECORD, right? I guess the alternative is to discard the partition error and raise InvalidRecordException directly.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji commented on a change in pull request #10445: KAFKA-12548; Propagate record error messages to application

Posted by GitBox <gi...@apache.org>.
hachikuji commented on a change in pull request #10445:
URL: https://github.com/apache/kafka/pull/10445#discussion_r605971389



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
##########
@@ -181,20 +212,36 @@ public boolean isDone() {
      *
      * @param baseOffset The base offset of the messages assigned by the server
      * @param logAppendTime The log append time or -1 if CreateTime is being used
-     * @param exception The exception that occurred (or null if the request was successful)
+     * @param topLevelException The exception that occurred (or null if the request was successful)
+     * @param recordExceptionMap Record exception map keyed by batchIndex which overrides `topLevelException`
+     *                           for records present in the map
      * @return true if the batch was completed successfully and false if the batch was previously aborted
      */
-    public boolean done(long baseOffset, long logAppendTime, RuntimeException exception) {
-        final FinalState tryFinalState = (exception == null) ? FinalState.SUCCEEDED : FinalState.FAILED;
-
+    private boolean done(
+        long baseOffset,
+        long logAppendTime,
+        RuntimeException topLevelException,
+        Map<Integer, RuntimeException> recordExceptionMap
+    ) {
+        final FinalState tryFinalState = (topLevelException == null) ? FinalState.SUCCEEDED : FinalState.FAILED;
         if (tryFinalState == FinalState.SUCCEEDED) {
             log.trace("Successfully produced messages to {} with base offset {}.", topicPartition, baseOffset);
         } else {
-            log.trace("Failed to produce messages to {} with base offset {}.", topicPartition, baseOffset, exception);
+            log.trace("Failed to produce messages to {} with base offset {}.", topicPartition, baseOffset, topLevelException);
         }
 
         if (this.finalState.compareAndSet(null, tryFinalState)) {
-            completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);
+            Function<Integer, RuntimeException> recordExceptions = null;
+            if (topLevelException != null) {
+                recordExceptions = batchIndex -> {
+                    if (recordExceptionMap == null) {
+                        return topLevelException;
+                    } else {
+                        return recordExceptionMap.getOrDefault(batchIndex, topLevelException);

Review comment:
       This seems related to your other question. The top-level (i.e. partition-level) error code is really the only one we get from the response. At the record level, all we have is an exception message.  The partition-level error should be one of either `InvalidRecordException` or `InvalidTimestampException` based on the current implementation in `LogValidator`:
   ```scala
       if (recordErrors.nonEmpty) {
         val errors = recordErrors.map(_.recordError)
         if (recordErrors.exists(_.apiError == Errors.INVALID_TIMESTAMP)) {
           throw new RecordValidationException(new InvalidTimestampException(
             "One or more records have been rejected due to invalid timestamp"), errors)
         } else {
           throw new RecordValidationException(new InvalidRecordException(
             "One or more records have been rejected"), errors)
         }
       }
   ```
   The invalid timestamp case seems problematic. The problem is that we don't have a way to tell whether an individual record error is actually an instance of this error.
   
   Given the current situation, here's what I'm thinking:
   
   1. If there are record errors present, the client will always raise them as InvalidRecordException. The best we can do is make sure the partition-level error gets logged so at least it ends up somewhere.
   2. If there are no record errors present, then we will raise the partition-level exception for each record.
   
   What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] hachikuji merged pull request #10445: KAFKA-12548; Propagate record error messages to application

Posted by GitBox <gi...@apache.org>.
hachikuji merged pull request #10445:
URL: https://github.com/apache/kafka/pull/10445


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org