You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/07 23:54:56 UTC

[GitHub] [kafka] ableegoldman commented on a change in pull request #9700: KAFKA-10813: InvalidProducerEpoch should be caught and throw as TaskMigrated

ableegoldman commented on a change in pull request #9700:
URL: https://github.com/apache/kafka/pull/9700#discussion_r537917953



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -199,7 +200,9 @@ private void recordSendError(final String topic, final Exception exception, fina
         if (isFatalException(exception)) {
             errorMessage += "\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error.";
             sendException.set(new StreamsException(errorMessage, exception));
-        } else if (exception instanceof ProducerFencedException || exception instanceof OutOfOrderSequenceException) {
+        } else if (exception instanceof ProducerFencedException ||
+                exception instanceof InvalidProducerEpochException ||
+                exception instanceof OutOfOrderSequenceException) {

Review comment:
       Why is that here, we catch these three exceptions -- ProducerFenced, InvalidProducerEpoch, and OutOfOrderSequence -- and wrap them as TaskMigratedException, while in `StreamsProducer#send`, we catch  ProducerFenced, InvalidProducerEpoch, and UnknownProducerId exceptions and wrap those as TaskMigrated?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -199,7 +200,9 @@ private void recordSendError(final String topic, final Exception exception, fina
         if (isFatalException(exception)) {
             errorMessage += "\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error.";
             sendException.set(new StreamsException(errorMessage, exception));
-        } else if (exception instanceof ProducerFencedException || exception instanceof OutOfOrderSequenceException) {
+        } else if (exception instanceof ProducerFencedException ||
+                exception instanceof InvalidProducerEpochException ||
+                exception instanceof OutOfOrderSequenceException) {

Review comment:
       Just to clarify, I think my question/confusion is twofold:
   
   1) Why is it OutOfOrderSequence in one place and UnknownProducerId in another?
   2) What is the difference between these two code paths? Is it really possible for example for the ProducerFencedException to sometimes be thrown directly from `Producer#send`, and sometimes be passed along through the callback ( as in `streamsProducer.send(serializedRecord, (metadata, exception)` )

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##########
@@ -184,12 +184,12 @@ public void resetProducer() {
         transactionInitialized = false;
     }
 
-    private void maybeBeginTransaction() throws ProducerFencedException {
+    private void maybeBeginTransaction() {
         if (eosEnabled() && !transactionInFlight) {
             try {
                 producer.beginTransaction();
                 transactionInFlight = true;
-            } catch (final ProducerFencedException error) {
+            } catch (final ProducerFencedException | InvalidProducerEpochException error) {

Review comment:
       ```suggestion
               } catch (final ProducerFencedException || InvalidProducerEpochException error) {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org