You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by lh...@apache.org on 2021/08/24 11:04:14 UTC
[pulsar] branch master updated: [pulsar-test] enable pooled netty
allocator for unit-tests (#11753)
This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 4b77a84 [pulsar-test] enable pooled netty allocator for unit-tests (#11753)
4b77a84 is described below
commit 4b77a8487e4d3c3ce17ca895f9a462f1730b2566
Author: Rajan Dhabalia <rd...@apache.org>
AuthorDate: Tue Aug 24 04:03:30 2021 -0700
[pulsar-test] enable pooled netty allocator for unit-tests (#11753)
Fixes #11750
* [pulsar-test] enable pooled netty allocator for unit-tests
* Make TransactionMarkerDeleteTest work with pooled allocator
* Revisit fix for making TransactionMarkerDeleteTest work with pooled allocator
Co-authored-by: Lari Hotari <lh...@apache.org>
---
pom.xml | 2 +-
.../service/TransactionMarkerDeleteTest.java | 45 ++++++++++++----------
2 files changed, 26 insertions(+), 21 deletions(-)
diff --git a/pom.xml b/pom.xml
index c103656..f07d2db 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1312,7 +1312,7 @@ flexible messaging model and an intuitive client API.</description>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine> -Xmx1G -XX:+UseG1GC
- -Dpulsar.allocator.pooled=false
+ -Dpulsar.allocator.pooled=true
-Dpulsar.allocator.leak_detection=Advanced
-Dpulsar.allocator.exit_on_oom=false
-Dio.netty.tryReflectionSetAccessible=true
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
index d6e9c73..f25b346 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TransactionMarkerDeleteTest.java
@@ -26,12 +26,10 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
-
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
-import io.netty.buffer.Unpooled;
+import java.util.Collections;
+import java.util.concurrent.TimeUnit;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.Position;
@@ -55,7 +53,7 @@ import org.testng.annotations.Test;
import org.testng.collections.Sets;
@Test(groups = "broker")
-public class TransactionMarkerDeleteTest extends BrokerTestBase{
+public class TransactionMarkerDeleteTest extends BrokerTestBase {
@BeforeMethod
@Override
@@ -96,7 +94,6 @@ public class TransactionMarkerDeleteTest extends BrokerTestBase{
}
-
@Test
public void testMarkerDelete() throws Exception {
@@ -108,7 +105,7 @@ public class TransactionMarkerDeleteTest extends BrokerTestBase{
ByteBuf payload = PooledByteBufAllocator.DEFAULT.buffer(0);
payload = Commands.serializeMetadataAndPayload(Commands.ChecksumType.Crc32c,
- msgMetadata, payload);
+ msgMetadata, payload);
ManagedLedger managedLedger = pulsar.getManagedLedgerFactory().open("test");
PersistentTopic topic = mock(PersistentTopic.class);
@@ -119,15 +116,16 @@ public class TransactionMarkerDeleteTest extends BrokerTestBase{
PersistentSubscription persistentSubscription = new PersistentSubscription(topic, "test",
managedLedger.openCursor("test"), false);
- Position position1 = managedLedger.addEntry(payload.array());
- Position markerPosition1 = managedLedger.addEntry(Markers
- .newTxnCommitMarker(1, 1, 1).array());
+ byte[] payloadBytes = toBytes(payload);
+ Position position1 = managedLedger.addEntry(payloadBytes);
+ Position markerPosition1 = managedLedger.addEntry(toBytes(Markers
+ .newTxnCommitMarker(1, 1, 1)));
- Position position2 = managedLedger.addEntry(payload.array());
- Position markerPosition2 = managedLedger.addEntry(Markers
- .newTxnAbortMarker(1, 1, 1).array());
+ Position position2 = managedLedger.addEntry(payloadBytes);
+ Position markerPosition2 = managedLedger.addEntry(toBytes(Markers
+ .newTxnAbortMarker(1, 1, 1)));
- Position position3 = managedLedger.addEntry(payload.array());
+ Position position3 = managedLedger.addEntry(payloadBytes);
assertEquals(cursor.getNumberOfEntriesInBacklog(true), 5);
assertTrue(((PositionImpl) cursor.getMarkDeletedPosition()).compareTo((PositionImpl) position1) < 0);
@@ -150,14 +148,14 @@ public class TransactionMarkerDeleteTest extends BrokerTestBase{
.compareTo((PositionImpl) markerPosition2) == 0);
// add consequent marker
- managedLedger.addEntry(Markers
- .newTxnCommitMarker(1, 1, 1).array());
+ managedLedger.addEntry(toBytes(Markers
+ .newTxnCommitMarker(1, 1, 1)));
- managedLedger.addEntry(Markers
- .newTxnAbortMarker(1, 1, 1).array());
+ managedLedger.addEntry(toBytes(Markers
+ .newTxnAbortMarker(1, 1, 1)));
- Position markerPosition3 = managedLedger.addEntry(Markers
- .newTxnAbortMarker(1, 1, 1).array());
+ Position markerPosition3 = managedLedger.addEntry(toBytes(Markers
+ .newTxnAbortMarker(1, 1, 1)));
// ack with transaction, then commit this transaction
persistentSubscription.transactionIndividualAcknowledge(new TxnID(0, 0),
@@ -171,4 +169,11 @@ public class TransactionMarkerDeleteTest extends BrokerTestBase{
.compareTo((PositionImpl) markerPosition3) == 0);
}
+
+ static byte[] toBytes(ByteBuf byteBuf) {
+ byte[] buf = new byte[byteBuf.readableBytes()];
+ byteBuf.readBytes(buf);
+ byteBuf.release();
+ return buf;
+ }
}