You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/06/18 21:28:47 UTC

[GitHub] [kafka] guozhangwang commented on a change in pull request #10880: KAFKA-12870; Flush in progress not cleared after transaction completion

guozhangwang commented on a change in pull request #10880:
URL: https://github.com/apache/kafka/pull/10880#discussion_r654615386



##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
##########
@@ -2670,6 +2671,139 @@ public void testTransactionalRequestsSentOnShutdown() {
         }
     }
 
+    @Test
+    public void testRecordsFlushedImmediatelyOnTransactionCompletion() throws Exception {
+        try (Metrics m = new Metrics()) {
+            int lingerMs = 50;
+            SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+            TransactionManager txnManager = new TransactionManager(logContext, "txnId", 6000, 100, apiVersions);
+            setupWithTransactionState(txnManager, lingerMs);
+
+            Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+                1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions);
+
+            // Begin a transaction and successfully add one partition to it.
+            ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+            doInitTransactions(txnManager, producerIdAndEpoch);
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            // Send a couple records and assert that they are not sent immediately (due to linger).
+            appendToAccumulator(tp0);
+            appendToAccumulator(tp0);
+            sender.runOnce();
+            assertFalse(client.hasInFlightRequests());
+
+            // Now begin the commit and assert that the Produce request is sent immediately
+            // without waiting for the linger.
+            txnManager.beginCommit();
+            runUntil(sender, client::hasInFlightRequests);
+
+            // Respond to the produce request and wait for the EndTxn request to be sent.
+            respondToProduce(tp0, Errors.NONE, 1L);
+            runUntil(sender, txnManager::hasInFlightRequest);
+
+            // Respond to the expected EndTxn request.
+            respondToEndTxn(Errors.NONE);
+            runUntil(sender, txnManager::isReady);
+
+            // Finally, we want to assert that the linger time is still effective
+            // when the new transaction begins.
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            appendToAccumulator(tp0);
+            appendToAccumulator(tp0);
+            time.sleep(lingerMs - 1);
+            sender.runOnce();
+            assertFalse(client.hasInFlightRequests());
+            assertTrue(accumulator.hasUndrained());
+
+            time.sleep(1);
+            runUntil(sender, client::hasInFlightRequests);
+            assertFalse(accumulator.hasUndrained());
+        }
+    }
+
+    @Test
+    public void testAwaitPendingRecordsBeforeCommittingTransaction() throws Exception {
+        try (Metrics m = new Metrics()) {
+            SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+            TransactionManager txnManager = new TransactionManager(logContext, "txnId", 6000, 100, apiVersions);
+            setupWithTransactionState(txnManager);
+
+            Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+                1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions);
+
+            // Begin a transaction and successfully add one partition to it.
+            ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+            doInitTransactions(txnManager, producerIdAndEpoch);
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            // Send one Produce request.
+            appendToAccumulator(tp0);
+            runUntil(sender, () -> client.requests().size() == 1);
+            assertFalse(accumulator.hasUndrained());
+            assertTrue(client.hasInFlightRequests());
+            assertTrue(txnManager.hasInflightBatches(tp0));
+
+            // Enqueue another record and then commit the transaction. We expect the unsent record to
+            // get sent before the transaction can be completed.
+            appendToAccumulator(tp0);
+            txnManager.beginCommit();
+            runUntil(sender, () -> client.requests().size() == 2);
+
+            assertTrue(txnManager.isCompleting());
+            assertFalse(txnManager.hasInFlightRequest());
+            assertTrue(txnManager.hasInflightBatches(tp0));
+
+            // Now respond to the pending Produce requests.
+            respondToProduce(tp0, Errors.NONE, 0L);
+            respondToProduce(tp0, Errors.NONE, 1L);
+            runUntil(sender, txnManager::hasInFlightRequest);
+
+            // Finally, respond to the expected EndTxn request.
+            respondToEndTxn(Errors.NONE);
+            runUntil(sender, txnManager::isReady);
+        }
+    }
+
+    private void addPartitionToTxn(Sender sender, TransactionManager txnManager, TopicPartition tp) {
+        txnManager.maybeAddPartitionToTransaction(tp);
+        client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp, Errors.NONE)));
+        runUntil(sender, () -> txnManager.isPartitionAdded(tp));
+        assertFalse(txnManager.hasInFlightRequest());
+    }
+
+    private void respondToProduce(TopicPartition tp, Errors error, long offset) {
+        client.respond(
+            request -> request instanceof ProduceRequest,
+            produceResponse(tp, offset, error, 0)
+        );
+
+    }
+
+    private void respondToEndTxn(Errors error) {
+        client.respond(
+            request -> request instanceof EndTxnRequest,
+            new EndTxnResponse(new EndTxnResponseData()
+                .setErrorCode(error.code())
+                .setThrottleTimeMs(0))
+        );
+    }
+
+    private void runUntil(Sender sender, Supplier<Boolean> condition) {

Review comment:
       I remember we have similar utils in `TransactionManagerTest`, could we consolidate them?

##########
File path: clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
##########
@@ -2670,6 +2671,139 @@ public void testTransactionalRequestsSentOnShutdown() {
         }
     }
 
+    @Test
+    public void testRecordsFlushedImmediatelyOnTransactionCompletion() throws Exception {
+        try (Metrics m = new Metrics()) {
+            int lingerMs = 50;
+            SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+            TransactionManager txnManager = new TransactionManager(logContext, "txnId", 6000, 100, apiVersions);
+            setupWithTransactionState(txnManager, lingerMs);
+
+            Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+                1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions);
+
+            // Begin a transaction and successfully add one partition to it.
+            ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+            doInitTransactions(txnManager, producerIdAndEpoch);
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            // Send a couple records and assert that they are not sent immediately (due to linger).
+            appendToAccumulator(tp0);
+            appendToAccumulator(tp0);
+            sender.runOnce();
+            assertFalse(client.hasInFlightRequests());
+
+            // Now begin the commit and assert that the Produce request is sent immediately
+            // without waiting for the linger.
+            txnManager.beginCommit();
+            runUntil(sender, client::hasInFlightRequests);
+
+            // Respond to the produce request and wait for the EndTxn request to be sent.
+            respondToProduce(tp0, Errors.NONE, 1L);
+            runUntil(sender, txnManager::hasInFlightRequest);
+
+            // Respond to the expected EndTxn request.
+            respondToEndTxn(Errors.NONE);
+            runUntil(sender, txnManager::isReady);
+
+            // Finally, we want to assert that the linger time is still effective
+            // when the new transaction begins.
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            appendToAccumulator(tp0);
+            appendToAccumulator(tp0);
+            time.sleep(lingerMs - 1);
+            sender.runOnce();
+            assertFalse(client.hasInFlightRequests());
+            assertTrue(accumulator.hasUndrained());
+
+            time.sleep(1);
+            runUntil(sender, client::hasInFlightRequests);
+            assertFalse(accumulator.hasUndrained());
+        }
+    }
+
+    @Test
+    public void testAwaitPendingRecordsBeforeCommittingTransaction() throws Exception {
+        try (Metrics m = new Metrics()) {
+            SenderMetricsRegistry senderMetrics = new SenderMetricsRegistry(m);
+
+            TransactionManager txnManager = new TransactionManager(logContext, "txnId", 6000, 100, apiVersions);
+            setupWithTransactionState(txnManager);
+
+            Sender sender = new Sender(logContext, client, metadata, this.accumulator, false, MAX_REQUEST_SIZE, ACKS_ALL,
+                1, senderMetrics, time, REQUEST_TIMEOUT, RETRY_BACKOFF_MS, txnManager, apiVersions);
+
+            // Begin a transaction and successfully add one partition to it.
+            ProducerIdAndEpoch producerIdAndEpoch = new ProducerIdAndEpoch(123456L, (short) 0);
+            doInitTransactions(txnManager, producerIdAndEpoch);
+            txnManager.beginTransaction();
+            addPartitionToTxn(sender, txnManager, tp0);
+
+            // Send one Produce request.
+            appendToAccumulator(tp0);
+            runUntil(sender, () -> client.requests().size() == 1);
+            assertFalse(accumulator.hasUndrained());
+            assertTrue(client.hasInFlightRequests());
+            assertTrue(txnManager.hasInflightBatches(tp0));
+
+            // Enqueue another record and then commit the transaction. We expect the unsent record to
+            // get sent before the transaction can be completed.
+            appendToAccumulator(tp0);
+            txnManager.beginCommit();
+            runUntil(sender, () -> client.requests().size() == 2);
+
+            assertTrue(txnManager.isCompleting());
+            assertFalse(txnManager.hasInFlightRequest());
+            assertTrue(txnManager.hasInflightBatches(tp0));
+
+            // Now respond to the pending Produce requests.
+            respondToProduce(tp0, Errors.NONE, 0L);
+            respondToProduce(tp0, Errors.NONE, 1L);
+            runUntil(sender, txnManager::hasInFlightRequest);
+
+            // Finally, respond to the expected EndTxn request.
+            respondToEndTxn(Errors.NONE);
+            runUntil(sender, txnManager::isReady);
+        }
+    }
+
+    private void addPartitionToTxn(Sender sender, TransactionManager txnManager, TopicPartition tp) {
+        txnManager.maybeAddPartitionToTransaction(tp);
+        client.prepareResponse(new AddPartitionsToTxnResponse(0, Collections.singletonMap(tp, Errors.NONE)));
+        runUntil(sender, () -> txnManager.isPartitionAdded(tp));
+        assertFalse(txnManager.hasInFlightRequest());
+    }
+
+    private void respondToProduce(TopicPartition tp, Errors error, long offset) {
+        client.respond(
+            request -> request instanceof ProduceRequest,
+            produceResponse(tp, offset, error, 0)
+        );
+
+    }
+
+    private void respondToEndTxn(Errors error) {
+        client.respond(
+            request -> request instanceof EndTxnRequest,
+            new EndTxnResponse(new EndTxnResponseData()
+                .setErrorCode(error.code())
+                .setThrottleTimeMs(0))
+        );
+    }
+
+    private void runUntil(Sender sender, Supplier<Boolean> condition) {

Review comment:
       Yup, I think that's also fine.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org