You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/04/01 18:49:40 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #10445: KAFKA-12548; Propagate record error messages to application

guozhangwang commented on a change in pull request #10445:
URL: https://github.com/apache/kafka/pull/10445#discussion_r605867393



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
##########
@@ -181,20 +212,36 @@ public boolean isDone() {
      *
      * @param baseOffset The base offset of the messages assigned by the server
      * @param logAppendTime The log append time or -1 if CreateTime is being used
-     * @param exception The exception that occurred (or null if the request was successful)
+     * @param topLevelException The exception that occurred (or null if the request was successful)
+     * @param recordExceptionMap Record exception map keyed by batchIndex which overrides `topLevelException`
+     *                           for records present in the map
      * @return true if the batch was completed successfully and false if the batch was previously aborted
      */
-    public boolean done(long baseOffset, long logAppendTime, RuntimeException exception) {
-        final FinalState tryFinalState = (exception == null) ? FinalState.SUCCEEDED : FinalState.FAILED;
-
+    private boolean done(
+        long baseOffset,
+        long logAppendTime,
+        RuntimeException topLevelException,
+        Map<Integer, RuntimeException> recordExceptionMap
+    ) {
+        final FinalState tryFinalState = (topLevelException == null) ? FinalState.SUCCEEDED : FinalState.FAILED;
         if (tryFinalState == FinalState.SUCCEEDED) {
             log.trace("Successfully produced messages to {} with base offset {}.", topicPartition, baseOffset);
         } else {
-            log.trace("Failed to produce messages to {} with base offset {}.", topicPartition, baseOffset, exception);
+            log.trace("Failed to produce messages to {} with base offset {}.", topicPartition, baseOffset, topLevelException);
         }
 
         if (this.finalState.compareAndSet(null, tryFinalState)) {
-            completeFutureAndFireCallbacks(baseOffset, logAppendTime, exception);
+            Function<Integer, RuntimeException> recordExceptions = null;
+            if (topLevelException != null) {
+                recordExceptions = batchIndex -> {
+                    if (recordExceptionMap == null) {
+                        return topLevelException;
+                    } else {
+                        return recordExceptionMap.getOrDefault(batchIndex, topLevelException);

Review comment:
       Is it possible that the returned response has both top-level exception and not-null record-level exceptions, and they are not the same? If yes should we let the record-level exception to override top-level exception here?

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
##########
@@ -689,30 +680,57 @@ private void completeBatch(ProducerBatch batch, ProduceResponse.PartitionRespons
             transactionManager.handleCompletedBatch(batch, response);
         }
 
-        if (batch.done(response.baseOffset, response.logAppendTime, null)) {
+        if (batch.complete(response.baseOffset, response.logAppendTime)) {
             maybeRemoveAndDeallocateBatch(batch);
         }
     }
 
     private void failBatch(ProducerBatch batch,
                            ProduceResponse.PartitionResponse response,
-                           RuntimeException exception,
                            boolean adjustSequenceNumbers) {
-        failBatch(batch, response.baseOffset, response.logAppendTime, exception, adjustSequenceNumbers);
+        final RuntimeException topLevelException;
+        if (response.error == Errors.TOPIC_AUTHORIZATION_FAILED)
+            topLevelException = new TopicAuthorizationException(Collections.singleton(batch.topicPartition.topic()));
+        else if (response.error == Errors.CLUSTER_AUTHORIZATION_FAILED)
+            topLevelException = new ClusterAuthorizationException("The producer is not authorized to do idempotent sends");
+        else
+            topLevelException = response.error.exception(response.errorMessage);
+
+        if (response.recordErrors == null || response.recordErrors.isEmpty()) {
+            failBatch(batch, topLevelException, adjustSequenceNumbers);
+        } else {
+            Map<Integer, RuntimeException> recordErrorMap = new HashMap<>(response.recordErrors.size());
+            for (ProduceResponse.RecordError recordError : response.recordErrors) {
+                if (recordError.message != null) {
+                    recordErrorMap.put(recordError.batchIndex, response.error.exception(recordError.message));

Review comment:
       Hmm.. would we ever have customized error message that are not from `Errors` map? E.g. if pluggable modules record some non-ak errors.

##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
##########
@@ -215,20 +262,25 @@ public boolean done(long baseOffset, long logAppendTime, RuntimeException except
         return false;
     }
 
-    private void completeFutureAndFireCallbacks(long baseOffset, long logAppendTime, RuntimeException exception) {
+    private void completeFutureAndFireCallbacks(
+        long baseOffset,
+        long logAppendTime,
+        Function<Integer, RuntimeException> recordExceptions
+    ) {
         // Set the future before invoking the callbacks as we rely on its state for the `onCompletion` call
-        produceFuture.set(baseOffset, logAppendTime, exception);
+        produceFuture.set(baseOffset, logAppendTime, recordExceptions);
 
         // execute callbacks
-        for (Thunk thunk : thunks) {
+        for (int i = 0; i < thunks.size(); i++) {
             try {
-                if (exception == null) {
+                Thunk thunk = thunks.get(i);
+                if (recordExceptions == null) {

Review comment:
       nit: not introduced in this PR, but why we only check `if (thunk.callback != null)` in the first branch and not the second? Maybe we can move this check upfront right after line 276.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org