You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2020/11/18 22:05:22 UTC

[kafka] branch 2.7 updated: Cherry-pick KAFKA-10687 to 2.7 (#9613)

This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new e25c6c9  Cherry-pick KAFKA-10687 to 2.7 (#9613)
e25c6c9 is described below

commit e25c6c9885a655a6d147d8781e4e522fc6f89d8b
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Wed Nov 18 14:04:32 2020 -0800

    Cherry-pick KAFKA-10687 to 2.7 (#9613)
    
    Ensures INVALID_PRODUCER_EPOCH recognizable from client side, and ensure the ProduceResponse always uses the old error code as INVALID_PRODUCER_EPOCH.
    
    Reviewers: Guozhang Wang <wa...@gmail.com> 2.7 review Bill Bejeck <bb...@apache.org>
---
 .../apache/kafka/clients/producer/Callback.java    |  1 +
 .../kafka/clients/producer/KafkaProducer.java      |  6 +++
 .../producer/internals/TransactionManager.java     |  7 ++--
 .../InvalidProducerEpochException.java             | 11 +++--
 .../org/apache/kafka/common/protocol/Errors.java   |  2 +-
 .../producer/internals/TransactionManagerTest.java |  6 +--
 .../scala/kafka/log/ProducerStateManager.scala     |  5 ++-
 .../integration/kafka/api/TransactionsTest.scala   |  6 +--
 core/src/test/scala/unit/kafka/log/LogTest.scala   |  4 +-
 .../unit/kafka/log/ProducerStateManagerTest.scala  |  6 +--
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 47 ++++++++++++++++++++++
 .../processor/internals/StreamsProducer.java       |  3 +-
 .../processor/internals/StreamsProducerTest.java   | 19 +++++++++
 13 files changed, 99 insertions(+), 24 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java b/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java
index da2a16b..ee0610e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java
@@ -40,6 +40,7 @@ public interface Callback {
      *                  RecordTooLargeException
      *                  UnknownServerException
      *                  UnknownProducerIdException
+     *                  InvalidProducerEpochException
      *
      *                  Retriable exceptions (transient, may be covered by increasing #.retries):
      *
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index a7e867e..1ce0b93 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -639,6 +639,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      *         format used for the offsets topic on the broker does not support transactions
      * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
      *         transactional.id is not authorized, or the consumer group id is not authorized.
+     * @throws org.apache.kafka.common.errors.InvalidProducerEpochException if the producer has attempted to produce with an old epoch
+     *         to the partition leader. See the exception for more details
      * @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any
      *         other unexpected error
      */
@@ -680,6 +682,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      *         (e.g. if the consumer has been kicked out of the group). Users should handle this by aborting the transaction.
      * @throws org.apache.kafka.common.errors.FencedInstanceIdException if this producer instance gets fenced by broker due to a
      *                                                                  mis-configured consumer instance id within group metadata.
+     * @throws org.apache.kafka.common.errors.InvalidProducerEpochException if the producer has attempted to produce with an old epoch
+     *         to the partition leader. See the exception for more details
      * @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any
      *         other unexpected error
      * @throws TimeoutException if the time taken for sending offsets has surpassed max.block.ms.
@@ -713,6 +717,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      *         does not support transactions (i.e. if its version is lower than 0.11.0.0)
      * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
      *         transactional.id is not authorized. See the exception for more details
+     * @throws org.apache.kafka.common.errors.InvalidProducerEpochException if the producer has attempted to produce with an old epoch
+     *         to the partition leader. See the exception for more details
      * @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any
      *         other unexpected error
      * @throws TimeoutException if the time taken for committing the transaction has surpassed <code>max.block.ms</code>.
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index ca53dba..d77ad41 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.CommitFailedException;
 import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.errors.InvalidPidMappingException;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.UnknownProducerIdException;
 import org.apache.kafka.common.protocol.ApiKeys;
@@ -1014,10 +1015,6 @@ public class TransactionManager {
                 requestEpochBumpForPartition(batch.topicPartition);
                 return true;
             }
-        } else if (error == Errors.INVALID_PRODUCER_EPOCH) {
-            // Retry the initProducerId to bump the epoch and continue.
-            requestEpochBumpForPartition(batch.topicPartition);
-            return true;
         } else if (error == Errors.OUT_OF_ORDER_SEQUENCE_NUMBER) {
             if (!hasUnresolvedSequence(batch.topicPartition) &&
                     (batch.sequenceHasBeenReset() || !isNextSequence(batch.topicPartition, batch.baseSequence()))) {
@@ -1096,6 +1093,8 @@ public class TransactionManager {
             if (lastError instanceof ProducerFencedException) {
                 throw new ProducerFencedException("The producer has been rejected from the broker because " +
                     "it tried to use an old epoch with the transactionalId");
+            } else if (lastError instanceof InvalidProducerEpochException) {
+                throw new InvalidProducerEpochException("Producer attempted to produce with an old epoch " + producerIdAndEpoch);
             } else {
                 throw new KafkaException("Cannot execute transactional method because we are in an error state", lastError);
             }
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/InvalidProducerEpochException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidProducerEpochException.java
similarity index 78%
rename from clients/src/main/java/org/apache/kafka/common/internals/InvalidProducerEpochException.java
rename to clients/src/main/java/org/apache/kafka/common/errors/InvalidProducerEpochException.java
index db78f33..79b8236 100644
--- a/clients/src/main/java/org/apache/kafka/common/internals/InvalidProducerEpochException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidProducerEpochException.java
@@ -14,16 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.common.internals;
-
-import org.apache.kafka.common.errors.RetriableException;
+package org.apache.kafka.common.errors;
 
 /**
  * This exception indicates that the produce request sent to the partition leader
- * contains a non-matching producer epoch. When encountering this exception, the ongoing transaction
- * will be aborted and can be retried.
+ * contains a non-matching producer epoch. When encountering this exception, user should abort the ongoing transaction
+ * by calling KafkaProducer#abortTransaction which would try to send initPidRequest and reinitialize the producer
+ * under the hood.
  */
-public class InvalidProducerEpochException extends RetriableException {
+public class InvalidProducerEpochException extends ApiException {
 
     private static final long serialVersionUID = 1L;
 
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 3c3b800..b42f7bf 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -114,7 +114,7 @@ import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
 import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
 import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
-import org.apache.kafka.common.internals.InvalidProducerEpochException;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 53fb273..ee3f892 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -1685,13 +1685,13 @@ public class TransactionManagerTest {
         sender.runOnce();
 
         runUntil(responseFuture::isDone);
-        assertFalse(transactionManager.hasError());
+        assertTrue(transactionManager.hasError());
 
-        transactionManager.beginCommit();
+        transactionManager.beginAbort();
 
         TransactionManager.TxnRequestHandler handler = transactionManager.nextRequest(false);
 
-        // First we will get an EndTxn for commit.
+        // First we will get an EndTxn for abort.
         assertNotNull(handler);
         assertTrue(handler.requestBuilder() instanceof EndTxnRequest.Builder);
 
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index a7eaf1a..c1d2704 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -202,7 +202,10 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
       if (origin == AppendOrigin.Replication) {
         warn(message)
       } else {
-        throw new ProducerFencedException(message)
+        // Starting from 2.7, we replaced ProducerFenced error with InvalidProducerEpoch in the
+        // producer send response callback to differentiate from the former fatal exception,
+        // letting client abort the ongoing transaction and retry.
+        throw new InvalidProducerEpochException(message)
       }
     }
   }
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index e3825a1..1d3f968 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -29,7 +29,7 @@ import kafka.utils.TestUtils
 import kafka.utils.TestUtils.consumeRecords
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-import org.apache.kafka.common.errors.{ProducerFencedException, TimeoutException}
+import org.apache.kafka.common.errors.{InvalidProducerEpochException, ProducerFencedException, TimeoutException}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.junit.Assert._
 import org.junit.{After, Before, Test}
@@ -468,7 +468,7 @@ class TransactionsTest extends KafkaServerTestHarness {
     producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, null, "2", "4", willBeCommitted = true)).get()
 
     try {
-      val result =  producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "1", "5", willBeCommitted = false))
+      val result = producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "1", "5", willBeCommitted = false))
       val recordMetadata = result.get()
       error(s"Missed a producer fenced exception when writing to ${recordMetadata.topic}-${recordMetadata.partition}. Grab the logs!!")
       servers.foreach { server =>
@@ -479,7 +479,7 @@ class TransactionsTest extends KafkaServerTestHarness {
       case _: ProducerFencedException =>
         producer1.close()
       case e: ExecutionException =>
-        assertTrue(e.getCause.isInstanceOf[ProducerFencedException])
+        assertTrue(e.getCause.isInstanceOf[InvalidProducerEpochException])
       case e: Exception =>
         fail("Got an unexpected exception from a fenced producer.", e)
     }
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 43061b6..b5fa3b5 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -1630,7 +1630,7 @@ class LogTest {
     assertEquals(5L, log.logEndOffset)
   }
 
-  @Test(expected = classOf[ProducerFencedException])
+  @Test(expected = classOf[InvalidProducerEpochException])
   def testOldProducerEpoch(): Unit = {
     // create a log
     val log = createLog(logDir, LogConfig())
@@ -4104,7 +4104,7 @@ class LogTest {
     val log = createLog(logDir, logConfig)
     appendEndTxnMarkerAsLeader(log, producerId, epoch, ControlRecordType.ABORT, coordinatorEpoch = 1)
 
-    assertThrows[ProducerFencedException] {
+    assertThrows[InvalidProducerEpochException] {
       appendEndTxnMarkerAsLeader(log, producerId, (epoch - 1).toShort, ControlRecordType.ABORT, coordinatorEpoch = 1)
     }
   }
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index bb9d9ba..8f0ed49 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -78,7 +78,7 @@ class ProducerStateManagerTest {
     append(stateManager, producerId, (epoch + 1).toShort, 0, 0L, 3L)
 
     // Incorrect epoch
-    assertThrows[ProducerFencedException] {
+    assertThrows[InvalidProducerEpochException] {
       append(stateManager, producerId, epoch, 0, 0L, 4L)
     }
   }
@@ -94,7 +94,7 @@ class ProducerStateManagerTest {
     assertEquals(RecordBatch.NO_SEQUENCE, firstEntry.lastSeq)
 
     // Fencing should continue to work even if the marker is the only thing left
-    assertThrows[ProducerFencedException] {
+    assertThrows[InvalidProducerEpochException] {
       append(stateManager, producerId, 0.toShort, 0, 0L, 4L)
     }
 
@@ -742,7 +742,7 @@ class ProducerStateManagerTest {
       isTransactional = true, origin = AppendOrigin.Coordinator)
   }
 
-  @Test(expected = classOf[ProducerFencedException])
+  @Test(expected = classOf[InvalidProducerEpochException])
   def testOldEpochForControlRecord(): Unit = {
     val epoch = 5.toShort
     val sequence = 0
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index e4136ed..e5dda6f 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -723,6 +723,53 @@ class KafkaApisTest {
   }
 
   @Test
+  def shouldReplaceProducerFencedWithInvalidProducerEpochInProduceResponse(): Unit = {
+    val topic = "topic"
+    setupBasicMetadataCache(topic, numPartitions = 2)
+
+    for (version <- ApiKeys.PRODUCE.oldestVersion to ApiKeys.PRODUCE.latestVersion) {
+
+      EasyMock.reset(replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
+
+      val responseCallback: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
+
+      val tp = new TopicPartition("topic", 0)
+
+      val produceRequest = ProduceRequest.Builder.forCurrentMagic(1, 5000,
+        collection.mutable.Map(tp -> MemoryRecords.withRecords(CompressionType.NONE,
+          new SimpleRecord("test".getBytes))).asJava)
+      .build(version.toShort)
+      val request = buildRequest(produceRequest)
+
+      EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
+        EasyMock.anyShort(),
+        EasyMock.eq(false),
+        EasyMock.eq(AppendOrigin.Client),
+        EasyMock.anyObject(),
+        EasyMock.capture(responseCallback),
+        EasyMock.anyObject(),
+        EasyMock.anyObject())
+      ).andAnswer(() => responseCallback.getValue.apply(Map(tp -> new PartitionResponse(Errors.INVALID_PRODUCER_EPOCH))))
+
+      val capturedResponse = expectNoThrottling()
+      EasyMock.expect(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+        anyObject[RequestChannel.Request](), anyDouble, anyLong)).andReturn(0)
+
+      EasyMock.replay(replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
+
+      createKafkaApis().handleProduceRequest(request)
+
+      val response = readResponse(ApiKeys.PRODUCE, produceRequest, capturedResponse)
+        .asInstanceOf[ProduceResponse]
+
+      assertEquals(1, response.responses().size())
+      for (partitionResponse <- response.responses().asScala) {
+        assertEquals(Errors.INVALID_PRODUCER_EPOCH, partitionResponse._2.error)
+      }
+    }
+  }
+
+  @Test
   def testAddPartitionsToTxnWithInvalidPartition(): Unit = {
     val topic = "topic"
     setupBasicMetadataCache(topic, numPartitions = 1)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
index c39ff20..e0469f6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownProducerIdException;
@@ -243,7 +244,7 @@ public class StreamsProducer {
             producer.sendOffsetsToTransaction(offsets, consumerGroupMetadata);
             producer.commitTransaction();
             transactionInFlight = false;
-        } catch (final ProducerFencedException | CommitFailedException error) {
+        } catch (final ProducerFencedException | InvalidProducerEpochException | CommitFailedException error) {
             throw new TaskMigratedException(
                 formatException("Producer got fenced trying to commit a transaction"),
                 error
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
index c46a4c3..0747885 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.UnknownProducerIdException;
@@ -931,6 +932,24 @@ public class StreamsProducerTest {
     }
 
     @Test
+    public void shouldThrowTaskMigratedExceptionOnEosWithInvalidProducerEpoch() {
+        eosAlphaMockProducer.commitTransactionException = new InvalidProducerEpochException("KABOOM!");
+
+        final TaskMigratedException thrown = assertThrows(
+            TaskMigratedException.class,
+            () -> eosAlphaStreamsProducer.commitTransaction(offsetsAndMetadata, new ConsumerGroupMetadata("appId"))
+        );
+
+        assertThat(eosAlphaMockProducer.sentOffsets(), is(true));
+        assertThat(thrown.getCause(), is(eosAlphaMockProducer.commitTransactionException));
+        assertThat(
+            thrown.getMessage(),
+            is("Producer got fenced trying to commit a transaction [test];" +
+                   " it means all tasks belonging to this thread should be migrated.")
+        );
+    }
+
+    @Test
     public void shouldFailOnEosSendOffsetFatal() {
         eosAlphaMockProducer.sendOffsetsToTransactionException = new RuntimeException("KABOOM!");