You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2021/08/24 11:04:14 UTC

[pulsar] branch master updated: [pulsar-test] enable pooled netty allocator for unit-tests (#11753)

This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b77a84  [pulsar-test] enable pooled netty allocator for unit-tests (#11753)
4b77a84 is described below

commit 4b77a8487e4d3c3ce17ca895f9a462f1730b2566
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue Aug 24 04:03:30 2021 -0700

    [pulsar-test] enable pooled netty allocator for unit-tests (#11753)
    
    Fixes #11750
    
    * [pulsar-test] enable pooled netty allocator for unit-tests
    
    * Make TransactionMarkerDeleteTest work with pooled allocator
    
    * Revisit fix for making TransactionMarkerDeleteTest work with pooled allocator
    
    Co-authored-by: Lari Hotari <lh...@apache.org>
---
 pom.xml                                            |  2 +-
 .../service/TransactionMarkerDeleteTest.java       | 45 ++++++++++++----------
 2 files changed, 26 insertions(+), 21 deletions(-)

diff --git a/pom.xml b/pom.xml
index c103656..f07d2db 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1312,7 +1312,7 @@ flexible messaging model and an intuitive client API.</description>
         <artifactId>maven-surefire-plugin</artifactId>
         <configuration>
           <argLine> -Xmx1G -XX:+UseG1GC
-            -Dpulsar.allocator.pooled=false
+            -Dpulsar.allocator.pooled=true
             -Dpulsar.allocator.leak_detection=Advanced
             -Dpulsar.allocator.exit_on_oom=false
             -Dio.netty.tryReflectionSetAccessible=true
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
index d6e9c73..f25b346 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
@@ -26,12 +26,10 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
-
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.Unpooled;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
 import org.apache.bookkeeper.mledger.ManagedCursor;
 import org.apache.bookkeeper.mledger.ManagedLedger;
 import org.apache.bookkeeper.mledger.Position;
@@ -55,7 +53,7 @@ import org.testng.annotations.Test;
 import org.testng.collections.Sets;
 
 @Test(groups = "broker")
-public class TransactionMarkerDeleteTest extends BrokerTestBase{
+public class TransactionMarkerDeleteTest extends BrokerTestBase {
 
     @BeforeMethod
     @Override
@@ -96,7 +94,6 @@ public class TransactionMarkerDeleteTest extends BrokerTestBase{
     }
 
 
-
     @Test
     public void testMarkerDelete() throws Exception {
 
@@ -108,7 +105,7 @@ public class TransactionMarkerDeleteTest extends BrokerTestBase{
         ByteBuf payload = PooledByteBufAllocator.DEFAULT.buffer(0);
 
         payload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
-                    msgMetadata, payload);
+                msgMetadata, payload);
 
         ManagedLedger managedLedger = pulsar.getManagedLedgerFactory().open("test");
         PersistentTopic topic = mock(PersistentTopic.class);
@@ -119,15 +116,16 @@ public class TransactionMarkerDeleteTest extends BrokerTestBase{
         PersistentSubscription persistentSubscription = new PersistentSubscription(topic, "test",
                 managedLedger.openCursor("test"), false);
 
-        Position position1 = managedLedger.addEntry(payload.array());
-        Position markerPosition1 = managedLedger.addEntry(Markers
-                .newTxnCommitMarker(1, 1, 1).array());
+        byte[] payloadBytes = toBytes(payload);
+        Position position1 = managedLedger.addEntry(payloadBytes);
+        Position markerPosition1 = managedLedger.addEntry(toBytes(Markers
+                .newTxnCommitMarker(1, 1, 1)));
 
-        Position position2 = managedLedger.addEntry(payload.array());
-        Position markerPosition2 = managedLedger.addEntry(Markers
-                .newTxnAbortMarker(1, 1, 1).array());
+        Position position2 = managedLedger.addEntry(payloadBytes);
+        Position markerPosition2 = managedLedger.addEntry(toBytes(Markers
+                .newTxnAbortMarker(1, 1, 1)));
 
-        Position position3 = managedLedger.addEntry(payload.array());
+        Position position3 = managedLedger.addEntry(payloadBytes);
 
         assertEquals(cursor.getNumberOfEntriesInBacklog(true), 5);
         assertTrue(((PositionImpl) cursor.getMarkDeletedPosition()).compareTo((PositionImpl) position1) < 0);
@@ -150,14 +148,14 @@ public class TransactionMarkerDeleteTest extends BrokerTestBase{
                         .compareTo((PositionImpl) markerPosition2) == 0);
 
         // add consequent marker
-        managedLedger.addEntry(Markers
-                .newTxnCommitMarker(1, 1, 1).array());
+        managedLedger.addEntry(toBytes(Markers
+                .newTxnCommitMarker(1, 1, 1)));
 
-        managedLedger.addEntry(Markers
-                .newTxnAbortMarker(1, 1, 1).array());
+        managedLedger.addEntry(toBytes(Markers
+                .newTxnAbortMarker(1, 1, 1)));
 
-        Position markerPosition3 = managedLedger.addEntry(Markers
-                .newTxnAbortMarker(1, 1, 1).array());
+        Position markerPosition3 = managedLedger.addEntry(toBytes(Markers
+                .newTxnAbortMarker(1, 1, 1)));
 
         // ack with transaction, then commit this transaction
         persistentSubscription.transactionIndividualAcknowledge(new TxnID(0, 0),
@@ -171,4 +169,11 @@ public class TransactionMarkerDeleteTest extends BrokerTestBase{
                         .compareTo((PositionImpl) markerPosition3) == 0);
 
     }
+
+    static byte[] toBytes(ByteBuf byteBuf) {
+        byte[] buf = new byte[byteBuf.readableBytes()];
+        byteBuf.readBytes(buf);
+        byteBuf.release();
+        return buf;
+    }
 }