You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/11/06 03:51:18 UTC

[GitHub] [kafka] abbccdda opened a new pull request #9569: KAFKA-10687: make ProduceRespone only returns INVALID_PRODUCER_EPOCH

abbccdda opened a new pull request #9569:
URL: https://github.com/apache/kafka/pull/9569


   In KIP-588, we added the new `PRODUCER_FENCED` error code 90 to represent existing ProducerFencedException, which would be unknown error for producer client who sends ProduceRequest and doesn't know how to handle it. The fix is to revise the error code back to the known `INVALID_PRODUCER_EPOCH` in the ProduceResponse.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on pull request #9569: KAFKA-10687: make ProduceRespone only returns INVALID_PRODUCER_EPOCH

Posted by GitBox <gi...@apache.org>.
abbccdda commented on pull request #9569:
URL: https://github.com/apache/kafka/pull/9569#issuecomment-722795850


   @edenhill FYI


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9569: KAFKA-10687: make ProduceRespone only returns INVALID_PRODUCER_EPOCH

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9569:
URL: https://github.com/apache/kafka/pull/9569#discussion_r523440605



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -713,6 +715,8 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
      *         does not support transactions (i.e. if its version is lower than 0.11.0.0)
      * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
      *         transactional.id is not authorized. See the exception for more details
+     * @throws org.apache.kafka.common.errors.InvalidProducerEpochException if the producer has attempted to produce with an old epoch

Review comment:
       I thought we would catch and rethrow as StreamsException:
   ```
   catch (final KafkaException error) {
               throw new StreamsException(
                   formatException("Error encountered trying to commit a transaction"),
                   error
               );
           }
   ```
   which is a fatal one.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang edited a comment on pull request #9569: KAFKA-10687: make ProduceRespone only returns INVALID_PRODUCER_EPOCH

Posted by GitBox <gi...@apache.org>.
guozhangwang edited a comment on pull request #9569:
URL: https://github.com/apache/kafka/pull/9569#issuecomment-725014616


   Okay I think I need to rephrase my thoughts above in another way: right now when abort txns due to timeout, the txn coordinator would 1) first bump up the epoch, and 2) append the prepare-abort log with the newly bumped epoch, and then 3) send the txn-markers to the partition leaders with the newly bumped epoch. As a result, the partition leaders will bump their local epoch of the pid as well.
   
   This makes sense for disallowing the old producer to append more records with the old epoch, but like KIP-588 mentioned it does not help differentiating the timeout scenario from the new initPID scenario where only the latter indeed gets a new producer. And although txn coordinators may be able to tell the difference based on its old cache values, the partition leader would not know at all and hence forcing its fenced-producer to always be translated to invalid producer error seems sub-optimal.
   
   IF, we do want to let the partition leader to never return fenced-producer, then at least we should change that in `ProducerStateManager#checkProducerEpoch` and not in `KafkaApis`, and changing `InvalidProducerEpochException` to `ApiException` **but we'd need to document that very clearly what users should do upon getting InvalidProducerEpochException**; IF, we'd like to let the partition leader be able to distinguish txn timeout from new initPID as well, we should let the txn markers to piggyback that information. **The benefits are that we do not need to expose a new exception and users still only get ProducerFencedException.**
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9569: KAFKA-10687: make ProduceRespone only returns INVALID_PRODUCER_EPOCH

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9569:
URL: https://github.com/apache/kafka/pull/9569#discussion_r523264487



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -713,6 +715,8 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
      *         does not support transactions (i.e. if its version is lower than 0.11.0.0)
      * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
      *         transactional.id is not authorized. See the exception for more details
+     * @throws org.apache.kafka.common.errors.InvalidProducerEpochException if the producer has attempted to produce with an old epoch

Review comment:
       I see your point. As of today, if we throw KafkaException then streams thread would die as we wrap it as `StreamsException` which is fatal. It's only gonna work if we throw InvalidEpoch exception directly and let stream catch and swallow it. Do you think that would be the correct way?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9569: KAFKA-10687: make ProduceRespone only returns INVALID_PRODUCER_EPOCH

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9569:
URL: https://github.com/apache/kafka/pull/9569#discussion_r525559599



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/Callback.java
##########
@@ -40,6 +40,7 @@
      *                  RecordTooLargeException
      *                  UnknownServerException
      *                  UnknownProducerIdException
+     *                  InvalidProducerEpoch

Review comment:
       nit: `InvalidProducerEpochException`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda commented on a change in pull request #9569: KAFKA-10687: make ProduceRespone only returns INVALID_PRODUCER_EPOCH

Posted by GitBox <gi...@apache.org>.
abbccdda commented on a change in pull request #9569:
URL: https://github.com/apache/kafka/pull/9569#discussion_r523102604



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -713,6 +715,8 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
      *         does not support transactions (i.e. if its version is lower than 0.11.0.0)
      * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
      *         transactional.id is not authorized. See the exception for more details
+     * @throws org.apache.kafka.common.errors.InvalidProducerEpochException if the producer has attempted to produce with an old epoch

Review comment:
       Yes, they would throw from `TransactionManager#maybeFailWithError` as user is not required to handle the send callback. As for `throws` block, I feel it's not necessary since we do have other fatal exceptions only declared in comment, which I believe is intentional to make the function signature concise.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] edenhill commented on a change in pull request #9569: KAFKA-10687: make ProduceRespone only returns INVALID_PRODUCER_EPOCH

Posted by GitBox <gi...@apache.org>.
edenhill commented on a change in pull request #9569:
URL: https://github.com/apache/kafka/pull/9569#discussion_r518582162



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -609,6 +609,12 @@ class KafkaApis(val requestChannel: RequestChannel,
 
       mergedResponseStatus.forKeyValue { (topicPartition, status) =>
         if (status.error != Errors.NONE) {
+          if (status.error == Errors.PRODUCER_FENCED) {
+            // Produce request should always return the INVALID_PRODUCER_EPOCH to as

Review comment:
       "..always return INVALID_PRODUCER_EPOCH as the partitioner leader .."




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on pull request #9569: KAFKA-10687: make ProduceRespone only returns INVALID_PRODUCER_EPOCH

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on pull request #9569:
URL: https://github.com/apache/kafka/pull/9569#issuecomment-725014616


   Okay I think I need to rephrase my thoughts above in another way: right now when abort txns due to timeout, the txn coordinator would 1) first bump up the epoch, and 2) append the prepare-abort log with the newly bumped epoch, and then 3) send the txn-markers to the partition leaders with the newly bumped epoch. As a result, the partition leaders will bump their local epoch of the pid as well.
   
   This makes sense for disallowing the old producer to append more records with the old epoch, but like KIP-588 mentioned it does not help differentiating the timeout scenario from the new initPID scenario where only the latter indeed gets a new producer. And although txn coordinators may be able to tell the difference based on its old cache values, the partition leader would not know at all and hence forcing its fenced-producer to always be translated to invalid producer error seems sub-optimal.
   
   IF, we do want to let the partition leader to never return fenced-producer, then at least we should change that in `ProducerStateManager#checkProducerEpoch` and not in `KafkaApis`, and changing `InvalidProducerEpochException` to `ApiException`; IF, we'd like to let the partition leader be able to distinguish txn timeout from new initPID as well, we should let the txn markers to piggyback that information.
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9569: KAFKA-10687: make ProduceRespone only returns INVALID_PRODUCER_EPOCH

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9569:
URL: https://github.com/apache/kafka/pull/9569#discussion_r520924649



##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -609,6 +609,12 @@ class KafkaApis(val requestChannel: RequestChannel,
 
       mergedResponseStatus.forKeyValue { (topicPartition, status) =>
         if (status.error != Errors.NONE) {
+          if (status.error == Errors.PRODUCER_FENCED) {

Review comment:
       I feel this is not the right place to fix on broker: if we think produce request should never see a fenced producer exception, we should just modify `ProducerStateManager#checkProducerEpoch` to throw `InvalidProducerEpochException` instead.

##########
File path: clients/src/main/java/org/apache/kafka/common/internals/InvalidProducerEpochException.java
##########
@@ -16,14 +16,15 @@
  */
 package org.apache.kafka.common.internals;
 
-import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.common.errors.ApiException;
 
 /**
  * This exception indicates that the produce request sent to the partition leader
- * contains a non-matching producer epoch. When encountering this exception, the ongoing transaction
- * will be aborted and can be retried.
+ * contains a non-matching producer epoch. When encountering this exception, user should abort the ongoing transaction
+ * by calling {@link KafkaProducer#abortTransaction()} to reinitialize the producer state.
  */
-public class InvalidProducerEpochException extends RetriableException {

Review comment:
       If we are going to expose this exception, then 1) it cannot be in the internals package, 2) public API javadocs should include this exception, also explains what users should do when getting this error. 3) KIP-588 should include this since it is now a publicly throwable exception.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9569: KAFKA-10687: make ProduceRespone only returns INVALID_PRODUCER_EPOCH

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9569:
URL: https://github.com/apache/kafka/pull/9569#discussion_r522675518



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -713,6 +715,8 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
      *         does not support transactions (i.e. if its version is lower than 0.11.0.0)
      * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
      *         transactional.id is not authorized. See the exception for more details
+     * @throws org.apache.kafka.common.errors.InvalidProducerEpochException if the producer has attempted to produce with an old epoch

Review comment:
       For both these two functions: are we certain that InvalidProducerEpochException could be thrown directly from the call? For sendOffsetsToTransaction we are sending a commit-offset request to the consumer coordinator which should not get this error code, and for both AddPartitionsToTxn and EndTxn responses we should only have `INVALID_PRODUCER_EPOCH` error for version <= 1, right?
   
   Even if they are thrown, then we should add them in the `throws` as well similar to ProducerFencedException, right?

##########
File path: core/src/main/scala/kafka/log/ProducerStateManager.scala
##########
@@ -203,7 +203,7 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
       if (origin == AppendOrigin.Replication) {
         warn(message)
       } else {
-        throw new ProducerFencedException(message)
+        throw new InvalidProducerEpochException(message)

Review comment:
       Let's add a comment why we change this to the InvalidProducerEpochException.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9569: KAFKA-10687: make ProduceRespone only returns INVALID_PRODUCER_EPOCH

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9569:
URL: https://github.com/apache/kafka/pull/9569#discussion_r523232452



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -713,6 +715,8 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
      *         does not support transactions (i.e. if its version is lower than 0.11.0.0)
      * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
      *         transactional.id is not authorized. See the exception for more details
+     * @throws org.apache.kafka.common.errors.InvalidProducerEpochException if the producer has attempted to produce with an old epoch

Review comment:
       In `maybeFailWithError` we only throw ProducerFenced directly if the lastError is ProducerFenced, otherwise we throw `KafkaException` which includes the lastError as its cause. That's also why we only throws `ProducerFenced` in the function signature.
   
   Regarding other exceptions: IllegalState / Interrupt / KafkaExceptions are all unchecked exceptions; Authorization / Timeout / UnsupportedVersionException are ApiExceptions but are common ones. Only ProducerFenced are the only producer-specific checked exceptions which we should add in throws.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] abbccdda merged pull request #9569: KAFKA-10687: make ProduceRespone only returns INVALID_PRODUCER_EPOCH

Posted by GitBox <gi...@apache.org>.
abbccdda merged pull request #9569:
URL: https://github.com/apache/kafka/pull/9569


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] guozhangwang commented on a change in pull request #9569: KAFKA-10687: make ProduceRespone only returns INVALID_PRODUCER_EPOCH

Posted by GitBox <gi...@apache.org>.
guozhangwang commented on a change in pull request #9569:
URL: https://github.com/apache/kafka/pull/9569#discussion_r523277419



##########
File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##########
@@ -713,6 +715,8 @@ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offs
      *         does not support transactions (i.e. if its version is lower than 0.11.0.0)
      * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
      *         transactional.id is not authorized. See the exception for more details
+     * @throws org.apache.kafka.common.errors.InvalidProducerEpochException if the producer has attempted to produce with an old epoch

Review comment:
       Well, as the javadoc said in producer
   
   ```
   KafkaException if the producer has encountered a previous fatal or abortable error, or for any
        *         other unexpected error
   ```
   
   It is not always a fatal, so the caller code , like Streams, should check for its root upon getting the KafkaException and then handle correspondingly: if its root is InvalidProducerEpoch, it should abort and begin new txn right?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org