You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/02/23 06:23:20 UTC

[kafka] branch trunk updated: MINOR: Fix ConcurrentModificationException in TransactionManager (#4608)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 66039b1  MINOR: Fix ConcurrentModificationException in TransactionManager (#4608)
66039b1 is described below

commit 66039b1312be6356a3cb12efa8e562d2264498d1
Author: Colin Patrick McCabe <co...@cmccabe.xyz>
AuthorDate: Thu Feb 22 22:23:14 2018 -0800

    MINOR: Fix ConcurrentModificationException in TransactionManager (#4608)
---
 checkstyle/suppressions.xml                        |  2 +-
 .../producer/internals/TransactionManager.java     |  6 +++--
 .../producer/internals/TransactionManagerTest.java | 27 ++++++++++++++++++++++
 3 files changed, 32 insertions(+), 3 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index de1bdfd..f23805e 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -54,7 +54,7 @@
               files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler).java"/>
 
     <suppress checks="JavaNCSS"
-              files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java"/>
+              files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java|TransactionManagerTest.java"/>
 
     <suppress checks="NPathComplexity"
               files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|Agent|Values|PluginUtils).java"/>
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 006a12b..b242d5a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -51,6 +51,7 @@ import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Set;
@@ -568,14 +569,15 @@ public class TransactionManager {
         if (isTransactional())
             // We should not reset producer state if we are transactional. We will transition to a fatal error instead.
             return false;
-        for (TopicPartition topicPartition : partitionsWithUnresolvedSequences) {
+        for (Iterator<TopicPartition> iter = partitionsWithUnresolvedSequences.iterator(); iter.hasNext(); ) {
+            TopicPartition topicPartition = iter.next();
             if (!hasInflightBatches(topicPartition)) {
                 // The partition has been fully drained. At this point, the last ack'd sequence should be once less than
                 // next sequence destined for the partition. If so, the partition is fully resolved. If not, we should
                 // reset the sequence number if necessary.
                 if (isNextSequence(topicPartition, sequenceNumber(topicPartition))) {
                     // This would happen when a batch was expired, but subsequent batches succeeded.
-                    partitionsWithUnresolvedSequences.remove(topicPartition);
+                    iter.remove();
                 } else {
                     // We would enter this branch if all in flight batches were ultimately expired in the producer.
                     log.info("No inflight batches remaining for {}, last ack'd sequence for partition is {}, next sequence is {}. " +
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index fab139a..6fcf480 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -2213,6 +2213,33 @@ public class TransactionManagerTest {
         assertFalse(transactionManager.hasOngoingTransaction());
     }
 
+    @Test
+    public void testShouldResetProducerStateAfterResolvingSequences() throws InterruptedException, ExecutionException {
+        // Create a TransactionManager without a transactionalId to test
+        // shouldResetProducerStateAfterResolvingSequences.
+        TransactionManager manager = new TransactionManager(logContext, null, transactionTimeoutMs,
+            DEFAULT_RETRY_BACKOFF_MS);
+        assertFalse(manager.shouldResetProducerStateAfterResolvingSequences());
+        TopicPartition tp0 = new TopicPartition("foo", 0);
+        TopicPartition tp1 = new TopicPartition("foo", 1);
+        assertEquals(Integer.valueOf(0), manager.sequenceNumber(tp0));
+        assertEquals(Integer.valueOf(0), manager.sequenceNumber(tp1));
+
+        manager.incrementSequenceNumber(tp0, 1);
+        manager.incrementSequenceNumber(tp1, 1);
+        manager.maybeUpdateLastAckedSequence(tp0, 0);
+        manager.maybeUpdateLastAckedSequence(tp1, 0);
+        manager.markSequenceUnresolved(tp0);
+        manager.markSequenceUnresolved(tp1);
+        assertFalse(manager.shouldResetProducerStateAfterResolvingSequences());
+
+        manager.maybeUpdateLastAckedSequence(tp0, 5);
+        manager.incrementSequenceNumber(tp0, 1);
+        manager.markSequenceUnresolved(tp0);
+        manager.markSequenceUnresolved(tp1);
+        assertTrue(manager.shouldResetProducerStateAfterResolvingSequences());
+    }
+
     private void verifyAddPartitionsFailsWithPartitionLevelError(final Errors error) throws InterruptedException {
         final long pid = 1L;
         final short epoch = 1;

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.