You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/02/23 06:26:55 UTC
[kafka] branch 1.1 updated: MINOR: Fix
ConcurrentModificationException in TransactionManager (#4608)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 1.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/1.1 by this push:
new 7c0e7b4 MINOR: Fix ConcurrentModificationException in TransactionManager (#4608)
7c0e7b4 is described below
commit 7c0e7b47dae3b18f8dd5191f5107577b6a2b6381
Author: Colin Patrick McCabe <co...@cmccabe.xyz>
AuthorDate: Thu Feb 22 22:23:14 2018 -0800
MINOR: Fix ConcurrentModificationException in TransactionManager (#4608)
---
checkstyle/suppressions.xml | 2 +-
.../producer/internals/TransactionManager.java | 6 +++--
.../producer/internals/TransactionManagerTest.java | 27 ++++++++++++++++++++++
3 files changed, 32 insertions(+), 3 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index de1bdfd..f23805e 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -54,7 +54,7 @@
files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler).java"/>
<suppress checks="JavaNCSS"
- files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java"/>
+ files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java|TransactionManagerTest.java"/>
<suppress checks="NPathComplexity"
files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|Agent|Values|PluginUtils).java"/>
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 006a12b..b242d5a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -51,6 +51,7 @@ import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
@@ -568,14 +569,15 @@ public class TransactionManager {
if (isTransactional())
// We should not reset producer state if we are transactional. We will transition to a fatal error instead.
return false;
- for (TopicPartition topicPartition : partitionsWithUnresolvedSequences) {
+ for (Iterator<TopicPartition> iter = partitionsWithUnresolvedSequences.iterator(); iter.hasNext(); ) {
+ TopicPartition topicPartition = iter.next();
if (!hasInflightBatches(topicPartition)) {
// The partition has been fully drained. At this point, the last ack'd sequence should be once less than
// next sequence destined for the partition. If so, the partition is fully resolved. If not, we should
// reset the sequence number if necessary.
if (isNextSequence(topicPartition, sequenceNumber(topicPartition))) {
// This would happen when a batch was expired, but subsequent batches succeeded.
- partitionsWithUnresolvedSequences.remove(topicPartition);
+ iter.remove();
} else {
// We would enter this branch if all in flight batches were ultimately expired in the producer.
log.info("No inflight batches remaining for {}, last ack'd sequence for partition is {}, next sequence is {}. " +
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index fab139a..6fcf480 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -2213,6 +2213,33 @@ public class TransactionManagerTest {
assertFalse(transactionManager.hasOngoingTransaction());
}
+ @Test
+ public void testShouldResetProducerStateAfterResolvingSequences() throws InterruptedException, ExecutionException {
+ // Create a TransactionManager without a transactionalId to test
+ // shouldResetProducerStateAfterResolvingSequences.
+ TransactionManager manager = new TransactionManager(logContext, null, transactionTimeoutMs,
+ DEFAULT_RETRY_BACKOFF_MS);
+ assertFalse(manager.shouldResetProducerStateAfterResolvingSequences());
+ TopicPartition tp0 = new TopicPartition("foo", 0);
+ TopicPartition tp1 = new TopicPartition("foo", 1);
+ assertEquals(Integer.valueOf(0), manager.sequenceNumber(tp0));
+ assertEquals(Integer.valueOf(0), manager.sequenceNumber(tp1));
+
+ manager.incrementSequenceNumber(tp0, 1);
+ manager.incrementSequenceNumber(tp1, 1);
+ manager.maybeUpdateLastAckedSequence(tp0, 0);
+ manager.maybeUpdateLastAckedSequence(tp1, 0);
+ manager.markSequenceUnresolved(tp0);
+ manager.markSequenceUnresolved(tp1);
+ assertFalse(manager.shouldResetProducerStateAfterResolvingSequences());
+
+ manager.maybeUpdateLastAckedSequence(tp0, 5);
+ manager.incrementSequenceNumber(tp0, 1);
+ manager.markSequenceUnresolved(tp0);
+ manager.markSequenceUnresolved(tp1);
+ assertTrue(manager.shouldResetProducerStateAfterResolvingSequences());
+ }
+
private void verifyAddPartitionsFailsWithPartitionLevelError(final Errors error) throws InterruptedException {
final long pid = 1L;
final short epoch = 1;
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.