You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/04/24 09:06:28 UTC

[kafka] branch trunk updated: KAFKA-13834: add test coverage for RecordAccumulatorTest (#12092)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new e8c675ed56 KAFKA-13834: add test coverage for RecordAccumulatorTest (#12092)
e8c675ed56 is described below

commit e8c675ed56034f0cb04aebefd377239d7a43373c
Author: ruanliang <ru...@163.com>
AuthorDate: Sun Apr 24 17:06:19 2022 +0800

    KAFKA-13834: add test coverage for RecordAccumulatorTest (#12092)
    
    Reviewers: Luke Chen <sh...@gmail.com>
---
 .../producer/internals/RecordAccumulatorTest.java  | 26 ++++++++++++++++------
 1 file changed, 19 insertions(+), 7 deletions(-)

diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index 7c3518a136..d5b89ea864 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -50,6 +50,7 @@ import org.mockito.Mockito;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.HashSet;
@@ -61,6 +62,7 @@ import java.util.Set;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
 import static java.util.Arrays.asList;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -107,7 +109,7 @@ public class RecordAccumulatorTest {
         PartitionInfo part4 = new PartitionInfo(topic, partition4, node2, null, null);
 
         long batchSize = value.length + DefaultRecordBatch.RECORD_BATCH_OVERHEAD;
-        RecordAccumulator accum = createTestRecordAccumulator((int) batchSize, 1024, CompressionType.NONE, 10);
+        RecordAccumulator accum = createTestRecordAccumulator((int) batchSize, Integer.MAX_VALUE, CompressionType.NONE, 10);
         Cluster cluster = new Cluster(null, Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3, part4),
                 Collections.emptySet(), Collections.emptySet());
 
@@ -142,15 +144,25 @@ public class RecordAccumulatorTest {
         // drain batches from 2 nodes: node1 => tp2, node2 => tp3 (because tp4 is muted)
         Map<Integer, List<ProducerBatch>> batches4 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), (int) batchSize, 0);
         verifyTopicPartitionInBatches(batches4, tp2, tp3);
+
+        // add record for tp1, tp2, tp3, and unmute tp4
+        accum.append(tp1, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp2, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
+        accum.append(tp3, 0L, key, value, Record.EMPTY_HEADERS, null, maxBlockTimeMs, false, time.milliseconds());
+        accum.unmutePartition(tp4);
+        // set maxSize as a max value, so that the all partitions in 2 nodes should be drained: node1 => [tp1, tp2], node2 => [tp3, tp4]
+        Map<Integer, List<ProducerBatch>> batches5 = accum.drain(cluster, new HashSet<Node>(Arrays.asList(node1, node2)), Integer.MAX_VALUE, 0);
+        verifyTopicPartitionInBatches(batches5, tp1, tp2, tp3, tp4);
     }
 
-    private void verifyTopicPartitionInBatches(Map<Integer, List<ProducerBatch>> batches, TopicPartition... tp) {
-        assertEquals(tp.length, batches.size());
+    private void verifyTopicPartitionInBatches(Map<Integer, List<ProducerBatch>> nodeBatches, TopicPartition... tp) {
+        int allTpBatchCount = nodeBatches.values().stream().flatMap(Collection::stream).collect(Collectors.toList()).size();
+        assertEquals(tp.length, allTpBatchCount);
         List<TopicPartition> topicPartitionsInBatch = new ArrayList<TopicPartition>();
-        for (Map.Entry<Integer, List<ProducerBatch>> entry : batches.entrySet()) {
-            List<ProducerBatch> batchList = entry.getValue();
-            assertEquals(1, batchList.size());
-            topicPartitionsInBatch.add(batchList.get(0).topicPartition);
+        for (Map.Entry<Integer, List<ProducerBatch>> entry : nodeBatches.entrySet()) {
+            List<ProducerBatch> tpBatchList = entry.getValue();
+            List<TopicPartition> tpList = tpBatchList.stream().map(producerBatch -> producerBatch.topicPartition).collect(Collectors.toList());
+            topicPartitionsInBatch.addAll(tpList);
         }
 
         for (int i = 0; i < tp.length; i++) {