You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by bb...@apache.org on 2020/11/18 22:05:22 UTC
[kafka] branch 2.7 updated: Cherry-pick KAFKA-10687 to 2.7 (#9613)
This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new e25c6c9 Cherry-pick KAFKA-10687 to 2.7 (#9613)
e25c6c9 is described below
commit e25c6c9885a655a6d147d8781e4e522fc6f89d8b
Author: Boyang Chen <bo...@confluent.io>
AuthorDate: Wed Nov 18 14:04:32 2020 -0800
Cherry-pick KAFKA-10687 to 2.7 (#9613)
Ensures INVALID_PRODUCER_EPOCH recognizable from client side, and ensure the ProduceResponse always uses the old error code as INVALID_PRODUCER_EPOCH.
Reviewers: Guozhang Wang <wa...@gmail.com> 2.7 review Bill Bejeck <bb...@apache.org>
---
.../apache/kafka/clients/producer/Callback.java | 1 +
.../kafka/clients/producer/KafkaProducer.java | 6 +++
.../producer/internals/TransactionManager.java | 7 ++--
.../InvalidProducerEpochException.java | 11 +++--
.../org/apache/kafka/common/protocol/Errors.java | 2 +-
.../producer/internals/TransactionManagerTest.java | 6 +--
.../scala/kafka/log/ProducerStateManager.scala | 5 ++-
.../integration/kafka/api/TransactionsTest.scala | 6 +--
core/src/test/scala/unit/kafka/log/LogTest.scala | 4 +-
.../unit/kafka/log/ProducerStateManagerTest.scala | 6 +--
.../scala/unit/kafka/server/KafkaApisTest.scala | 47 ++++++++++++++++++++++
.../processor/internals/StreamsProducer.java | 3 +-
.../processor/internals/StreamsProducerTest.java | 19 +++++++++
13 files changed, 99 insertions(+), 24 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java b/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java
index da2a16b..ee0610e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/Callback.java
@@ -40,6 +40,7 @@ public interface Callback {
* RecordTooLargeException
* UnknownServerException
* UnknownProducerIdException
+ * InvalidProducerEpochException
*
* Retriable exceptions (transient, may be covered by increasing #.retries):
*
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index a7e867e..1ce0b93 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -639,6 +639,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* format used for the offsets topic on the broker does not support transactions
* @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
* transactional.id is not authorized, or the consumer group id is not authorized.
+ * @throws org.apache.kafka.common.errors.InvalidProducerEpochException if the producer has attempted to produce with an old epoch
+ * to the partition leader. See the exception for more details
* @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any
* other unexpected error
*/
@@ -680,6 +682,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* (e.g. if the consumer has been kicked out of the group). Users should handle this by aborting the transaction.
* @throws org.apache.kafka.common.errors.FencedInstanceIdException if this producer instance gets fenced by broker due to a
* mis-configured consumer instance id within group metadata.
+ * @throws org.apache.kafka.common.errors.InvalidProducerEpochException if the producer has attempted to produce with an old epoch
+ * to the partition leader. See the exception for more details
* @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any
* other unexpected error
* @throws TimeoutException if the time taken for sending offsets has surpassed max.block.ms.
@@ -713,6 +717,8 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
* does not support transactions (i.e. if its version is lower than 0.11.0.0)
* @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured
* transactional.id is not authorized. See the exception for more details
+ * @throws org.apache.kafka.common.errors.InvalidProducerEpochException if the producer has attempted to produce with an old epoch
+ * to the partition leader. See the exception for more details
* @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any
* other unexpected error
* @throws TimeoutException if the time taken for committing the transaction has surpassed <code>max.block.ms</code>.
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index ca53dba..d77ad41 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -25,6 +25,7 @@ import org.apache.kafka.clients.consumer.CommitFailedException;
import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.errors.InvalidPidMappingException;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.RetriableException;
import org.apache.kafka.common.errors.UnknownProducerIdException;
import org.apache.kafka.common.protocol.ApiKeys;
@@ -1014,10 +1015,6 @@ public class TransactionManager {
requestEpochBumpForPartition(batch.topicPartition);
return true;
}
- } else if (error == Errors.INVALID_PRODUCER_EPOCH) {
- // Retry the initProducerId to bump the epoch and continue.
- requestEpochBumpForPartition(batch.topicPartition);
- return true;
} else if (error == Errors.OUT_OF_ORDER_SEQUENCE_NUMBER) {
if (!hasUnresolvedSequence(batch.topicPartition) &&
(batch.sequenceHasBeenReset() || !isNextSequence(batch.topicPartition, batch.baseSequence()))) {
@@ -1096,6 +1093,8 @@ public class TransactionManager {
if (lastError instanceof ProducerFencedException) {
throw new ProducerFencedException("The producer has been rejected from the broker because " +
"it tried to use an old epoch with the transactionalId");
+ } else if (lastError instanceof InvalidProducerEpochException) {
+ throw new InvalidProducerEpochException("Producer attempted to produce with an old epoch " + producerIdAndEpoch);
} else {
throw new KafkaException("Cannot execute transactional method because we are in an error state", lastError);
}
diff --git a/clients/src/main/java/org/apache/kafka/common/internals/InvalidProducerEpochException.java b/clients/src/main/java/org/apache/kafka/common/errors/InvalidProducerEpochException.java
similarity index 78%
rename from clients/src/main/java/org/apache/kafka/common/internals/InvalidProducerEpochException.java
rename to clients/src/main/java/org/apache/kafka/common/errors/InvalidProducerEpochException.java
index db78f33..79b8236 100644
--- a/clients/src/main/java/org/apache/kafka/common/internals/InvalidProducerEpochException.java
+++ b/clients/src/main/java/org/apache/kafka/common/errors/InvalidProducerEpochException.java
@@ -14,16 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.common.internals;
-
-import org.apache.kafka.common.errors.RetriableException;
+package org.apache.kafka.common.errors;
/**
* This exception indicates that the produce request sent to the partition leader
- * contains a non-matching producer epoch. When encountering this exception, the ongoing transaction
- * will be aborted and can be retried.
+ * contains a non-matching producer epoch. When encountering this exception, user should abort the ongoing transaction
+ * by calling KafkaProducer#abortTransaction which would try to send initPidRequest and reinitialize the producer
+ * under the hood.
*/
-public class InvalidProducerEpochException extends RetriableException {
+public class InvalidProducerEpochException extends ApiException {
private static final long serialVersionUID = 1L;
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index 3c3b800..b42f7bf 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -114,7 +114,7 @@ import org.apache.kafka.common.errors.UnsupportedCompressionTypeException;
import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
import org.apache.kafka.common.errors.UnsupportedSaslMechanismException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
-import org.apache.kafka.common.internals.InvalidProducerEpochException;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 53fb273..ee3f892 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -1685,13 +1685,13 @@ public class TransactionManagerTest {
sender.runOnce();
runUntil(responseFuture::isDone);
- assertFalse(transactionManager.hasError());
+ assertTrue(transactionManager.hasError());
- transactionManager.beginCommit();
+ transactionManager.beginAbort();
TransactionManager.TxnRequestHandler handler = transactionManager.nextRequest(false);
- // First we will get an EndTxn for commit.
+ // First we will get an EndTxn for abort.
assertNotNull(handler);
assertTrue(handler.requestBuilder() instanceof EndTxnRequest.Builder);
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index a7eaf1a..c1d2704 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -202,7 +202,10 @@ private[log] class ProducerAppendInfo(val topicPartition: TopicPartition,
if (origin == AppendOrigin.Replication) {
warn(message)
} else {
- throw new ProducerFencedException(message)
+ // Starting from 2.7, we replaced ProducerFenced error with InvalidProducerEpoch in the
+ // producer send response callback to differentiate from the former fatal exception,
+ // letting client abort the ongoing transaction and retry.
+ throw new InvalidProducerEpochException(message)
}
}
}
diff --git a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
index e3825a1..1d3f968 100644
--- a/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
+++ b/core/src/test/scala/integration/kafka/api/TransactionsTest.scala
@@ -29,7 +29,7 @@ import kafka.utils.TestUtils
import kafka.utils.TestUtils.consumeRecords
import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, OffsetAndMetadata}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
-import org.apache.kafka.common.errors.{ProducerFencedException, TimeoutException}
+import org.apache.kafka.common.errors.{InvalidProducerEpochException, ProducerFencedException, TimeoutException}
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.junit.Assert._
import org.junit.{After, Before, Test}
@@ -468,7 +468,7 @@ class TransactionsTest extends KafkaServerTestHarness {
producer2.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic2, null, "2", "4", willBeCommitted = true)).get()
try {
- val result = producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "1", "5", willBeCommitted = false))
+ val result = producer1.send(TestUtils.producerRecordWithExpectedTransactionStatus(topic1, null, "1", "5", willBeCommitted = false))
val recordMetadata = result.get()
error(s"Missed a producer fenced exception when writing to ${recordMetadata.topic}-${recordMetadata.partition}. Grab the logs!!")
servers.foreach { server =>
@@ -479,7 +479,7 @@ class TransactionsTest extends KafkaServerTestHarness {
case _: ProducerFencedException =>
producer1.close()
case e: ExecutionException =>
- assertTrue(e.getCause.isInstanceOf[ProducerFencedException])
+ assertTrue(e.getCause.isInstanceOf[InvalidProducerEpochException])
case e: Exception =>
fail("Got an unexpected exception from a fenced producer.", e)
}
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 43061b6..b5fa3b5 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -1630,7 +1630,7 @@ class LogTest {
assertEquals(5L, log.logEndOffset)
}
- @Test(expected = classOf[ProducerFencedException])
+ @Test(expected = classOf[InvalidProducerEpochException])
def testOldProducerEpoch(): Unit = {
// create a log
val log = createLog(logDir, LogConfig())
@@ -4104,7 +4104,7 @@ class LogTest {
val log = createLog(logDir, logConfig)
appendEndTxnMarkerAsLeader(log, producerId, epoch, ControlRecordType.ABORT, coordinatorEpoch = 1)
- assertThrows[ProducerFencedException] {
+ assertThrows[InvalidProducerEpochException] {
appendEndTxnMarkerAsLeader(log, producerId, (epoch - 1).toShort, ControlRecordType.ABORT, coordinatorEpoch = 1)
}
}
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index bb9d9ba..8f0ed49 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -78,7 +78,7 @@ class ProducerStateManagerTest {
append(stateManager, producerId, (epoch + 1).toShort, 0, 0L, 3L)
// Incorrect epoch
- assertThrows[ProducerFencedException] {
+ assertThrows[InvalidProducerEpochException] {
append(stateManager, producerId, epoch, 0, 0L, 4L)
}
}
@@ -94,7 +94,7 @@ class ProducerStateManagerTest {
assertEquals(RecordBatch.NO_SEQUENCE, firstEntry.lastSeq)
// Fencing should continue to work even if the marker is the only thing left
- assertThrows[ProducerFencedException] {
+ assertThrows[InvalidProducerEpochException] {
append(stateManager, producerId, 0.toShort, 0, 0L, 4L)
}
@@ -742,7 +742,7 @@ class ProducerStateManagerTest {
isTransactional = true, origin = AppendOrigin.Coordinator)
}
- @Test(expected = classOf[ProducerFencedException])
+ @Test(expected = classOf[InvalidProducerEpochException])
def testOldEpochForControlRecord(): Unit = {
val epoch = 5.toShort
val sequence = 0
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index e4136ed..e5dda6f 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -723,6 +723,53 @@ class KafkaApisTest {
}
@Test
+ def shouldReplaceProducerFencedWithInvalidProducerEpochInProduceResponse(): Unit = {
+ val topic = "topic"
+ setupBasicMetadataCache(topic, numPartitions = 2)
+
+ for (version <- ApiKeys.PRODUCE.oldestVersion to ApiKeys.PRODUCE.latestVersion) {
+
+ EasyMock.reset(replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
+
+ val responseCallback: Capture[Map[TopicPartition, PartitionResponse] => Unit] = EasyMock.newCapture()
+
+ val tp = new TopicPartition("topic", 0)
+
+ val produceRequest = ProduceRequest.Builder.forCurrentMagic(1, 5000,
+ collection.mutable.Map(tp -> MemoryRecords.withRecords(CompressionType.NONE,
+ new SimpleRecord("test".getBytes))).asJava)
+ .build(version.toShort)
+ val request = buildRequest(produceRequest)
+
+ EasyMock.expect(replicaManager.appendRecords(EasyMock.anyLong(),
+ EasyMock.anyShort(),
+ EasyMock.eq(false),
+ EasyMock.eq(AppendOrigin.Client),
+ EasyMock.anyObject(),
+ EasyMock.capture(responseCallback),
+ EasyMock.anyObject(),
+ EasyMock.anyObject())
+ ).andAnswer(() => responseCallback.getValue.apply(Map(tp -> new PartitionResponse(Errors.INVALID_PRODUCER_EPOCH))))
+
+ val capturedResponse = expectNoThrottling()
+ EasyMock.expect(clientQuotaManager.maybeRecordAndGetThrottleTimeMs(
+ anyObject[RequestChannel.Request](), anyDouble, anyLong)).andReturn(0)
+
+ EasyMock.replay(replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel, txnCoordinator)
+
+ createKafkaApis().handleProduceRequest(request)
+
+ val response = readResponse(ApiKeys.PRODUCE, produceRequest, capturedResponse)
+ .asInstanceOf[ProduceResponse]
+
+ assertEquals(1, response.responses().size())
+ for (partitionResponse <- response.responses().asScala) {
+ assertEquals(Errors.INVALID_PRODUCER_EPOCH, partitionResponse._2.error)
+ }
+ }
+ }
+
+ @Test
def testAddPartitionsToTxnWithInvalidPartition(): Unit = {
val topic = "topic"
setupBasicMetadataCache(topic, numPartitions = 1)
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
index c39ff20..e0469f6 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsProducer.java
@@ -29,6 +29,7 @@ import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownProducerIdException;
@@ -243,7 +244,7 @@ public class StreamsProducer {
producer.sendOffsetsToTransaction(offsets, consumerGroupMetadata);
producer.commitTransaction();
transactionInFlight = false;
- } catch (final ProducerFencedException | CommitFailedException error) {
+ } catch (final ProducerFencedException | InvalidProducerEpochException | CommitFailedException error) {
throw new TaskMigratedException(
formatException("Producer got fenced trying to commit a transaction"),
error
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
index c46a4c3..0747885 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsProducerTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InvalidProducerEpochException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.UnknownProducerIdException;
@@ -931,6 +932,24 @@ public class StreamsProducerTest {
}
@Test
+ public void shouldThrowTaskMigratedExceptionOnEosWithInvalidProducerEpoch() {
+ eosAlphaMockProducer.commitTransactionException = new InvalidProducerEpochException("KABOOM!");
+
+ final TaskMigratedException thrown = assertThrows(
+ TaskMigratedException.class,
+ () -> eosAlphaStreamsProducer.commitTransaction(offsetsAndMetadata, new ConsumerGroupMetadata("appId"))
+ );
+
+ assertThat(eosAlphaMockProducer.sentOffsets(), is(true));
+ assertThat(thrown.getCause(), is(eosAlphaMockProducer.commitTransactionException));
+ assertThat(
+ thrown.getMessage(),
+ is("Producer got fenced trying to commit a transaction [test];" +
+ " it means all tasks belonging to this thread should be migrated.")
+ );
+ }
+
+ @Test
public void shouldFailOnEosSendOffsetFatal() {
eosAlphaMockProducer.sendOffsetsToTransactionException = new RuntimeException("KABOOM!");