You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/12/05 02:20:22 UTC

[GitHub] [kafka] abbccdda opened a new pull request #9700: KAFKA-10813: InvalidProducerEpoch should be caught and throw as TaskMigrated

abbccdda opened a new pull request #9700:
URL: https://github.com/apache/kafka/pull/9700


   We should catch InvalidProducerEpoch and rethrow as TaskMigrated, similar to ProducerFenced.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #9700: KAFKA-10813: InvalidProducerEpoch should be caught and throw as TaskMigrated

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #9700:
URL: https://github.com/apache/kafka/pull/9700#issuecomment-742662578


   Cherry-picked to 2.7


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9700: KAFKA-10813: InvalidProducerEpoch should be caught and throw as TaskMigrated

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9700:
URL: https://github.com/apache/kafka/pull/9700#discussion_r537917953



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -199,7 +200,9 @@ private void recordSendError(final String topic, final Exception exception, fina
         if (isFatalException(exception)) {
             errorMessage += "\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error.";
             sendException.set(new StreamsException(errorMessage, exception));
-        } else if (exception instanceof ProducerFencedException || exception instanceof OutOfOrderSequenceException) {
+        } else if (exception instanceof ProducerFencedException ||
+                exception instanceof InvalidProducerEpochException ||
+                exception instanceof OutOfOrderSequenceException) {

Review comment:
       Why is that here, we catch these three exceptions -- ProducerFenced, InvalidProducerEpoch, and OutOfOrderSequence -- and wrap them as TaskMigratedException, while in `StreamsProducer#send`, we catch  ProducerFenced, InvalidProducerEpoch, and UnknownProducerId exceptions and wrap those as TaskMigrated?

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -199,7 +200,9 @@ private void recordSendError(final String topic, final Exception exception, fina
         if (isFatalException(exception)) {
             errorMessage += "\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error.";
             sendException.set(new StreamsException(errorMessage, exception));
-        } else if (exception instanceof ProducerFencedException || exception instanceof OutOfOrderSequenceException) {
+        } else if (exception instanceof ProducerFencedException ||
+                exception instanceof InvalidProducerEpochException ||
+                exception instanceof OutOfOrderSequenceException) {

Review comment:
       Just to clarify, I think my question/confusion is twofold:
   
   1) Why is it OutOfOrderSequence in one place and UnknownProducerId in another?
   2) What is the difference between these two code paths? Is it really possible for example for the ProducerFencedException to sometimes be thrown directly from `Producer#send`, and sometimes be passed along through the callback ( as in `streamsProducer.send(serializedRecord, (metadata, exception)` )

##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##########
@@ -184,12 +184,12 @@ public void resetProducer() {
         transactionInitialized = false;
     }
 
-    private void maybeBeginTransaction() throws ProducerFencedException {
+    private void maybeBeginTransaction() {
         if (eosEnabled() && !transactionInFlight) {
             try {
                 producer.beginTransaction();
                 transactionInFlight = true;
-            } catch (final ProducerFencedException error) {
+            } catch (final ProducerFencedException | InvalidProducerEpochException error) {

Review comment:
       ```suggestion
               } catch (final ProducerFencedException || InvalidProducerEpochException error) {
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9700: KAFKA-10813: InvalidProducerEpoch should be caught and throw as TaskMigrated

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9700:
URL: https://github.com/apache/kafka/pull/9700#discussion_r538900204



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -199,7 +200,9 @@ private void recordSendError(final String topic, final Exception exception, fina
         if (isFatalException(exception)) {
             errorMessage += "\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error.";
             sendException.set(new StreamsException(errorMessage, exception));
-        } else if (exception instanceof ProducerFencedException || exception instanceof OutOfOrderSequenceException) {
+        } else if (exception instanceof ProducerFencedException ||
+                exception instanceof InvalidProducerEpochException ||
+                exception instanceof OutOfOrderSequenceException) {

Review comment:
       Ok SG, agree we should keep things simple since this is a last minute blocker




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ableegoldman commented on a change in pull request #9700: KAFKA-10813: InvalidProducerEpoch should be caught and throw as TaskMigrated

Posted by GitBox <gi...@apache.org>.
ableegoldman commented on a change in pull request #9700:
URL: https://github.com/apache/kafka/pull/9700#discussion_r538899743



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##########
@@ -184,12 +184,12 @@ public void resetProducer() {
         transactionInitialized = false;
     }
 
-    private void maybeBeginTransaction() throws ProducerFencedException {
+    private void maybeBeginTransaction() {
         if (eosEnabled() && !transactionInFlight) {
             try {
                 producer.beginTransaction();
                 transactionInFlight = true;
-            } catch (final ProducerFencedException error) {
+            } catch (final ProducerFencedException | InvalidProducerEpochException error) {

Review comment:
       Oh lol sorry I was just comparing it with the other block. Obviously nvm




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9700: KAFKA-10813: InvalidProducerEpoch should be caught and throw as TaskMigrated

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9700:
URL: https://github.com/apache/kafka/pull/9700#discussion_r538755571



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
##########
@@ -184,12 +184,12 @@ public void resetProducer() {
         transactionInitialized = false;
     }
 
-    private void maybeBeginTransaction() throws ProducerFencedException {
+    private void maybeBeginTransaction() {
         if (eosEnabled() && !transactionInFlight) {
             try {
                 producer.beginTransaction();
                 transactionInFlight = true;
-            } catch (final ProducerFencedException error) {
+            } catch (final ProducerFencedException | InvalidProducerEpochException error) {

Review comment:
       This won't work I suppose?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda merged pull request #9700: KAFKA-10813: InvalidProducerEpoch should be caught and throw as TaskMigrated

Posted by GitBox <gi...@apache.org>.
abbccdda merged pull request #9700:
URL: https://github.com/apache/kafka/pull/9700


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9700: KAFKA-10813: InvalidProducerEpoch should be caught and throw as TaskMigrated

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9700:
URL: https://github.com/apache/kafka/pull/9700#discussion_r538940544



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -199,7 +200,9 @@ private void recordSendError(final String topic, final Exception exception, fina
         if (isFatalException(exception)) {
             errorMessage += "\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error.";
             sendException.set(new StreamsException(errorMessage, exception));
-        } else if (exception instanceof ProducerFencedException || exception instanceof OutOfOrderSequenceException) {
+        } else if (exception instanceof ProducerFencedException ||
+                exception instanceof InvalidProducerEpochException ||
+                exception instanceof OutOfOrderSequenceException) {

Review comment:
       I thought about this a bit, and I think both `UnknownProducerId` and `OutOfOrderSequence` could be possibly thrown from the caller or directly (though in the later case they would be wrapped as KafkaException).
   
   I created two tickets, one for producer and one for streams to improve the general picture moving forward.
   
   https://issues.apache.org/jira/browse/KAFKA-10829
   https://issues.apache.org/jira/browse/KAFKA-10830




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9700: KAFKA-10813: InvalidProducerEpoch should be caught and throw as TaskMigrated

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9700:
URL: https://github.com/apache/kafka/pull/9700#discussion_r538667676



##########
File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
##########
@@ -199,7 +200,9 @@ private void recordSendError(final String topic, final Exception exception, fina
         if (isFatalException(exception)) {
             errorMessage += "\nWritten offsets would not be recorded and no more records would be sent since this is a fatal error.";
             sendException.set(new StreamsException(errorMessage, exception));
-        } else if (exception instanceof ProducerFencedException || exception instanceof OutOfOrderSequenceException) {
+        } else if (exception instanceof ProducerFencedException ||
+                exception instanceof InvalidProducerEpochException ||
+                exception instanceof OutOfOrderSequenceException) {

Review comment:
       Wasn't over thinking here since this is a blocker-fix, so we don't want to trigger any regression here by changing existing exception catching logic. Would be good to do this as a follow-up I guess. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org