You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2018/01/25 21:55:28 UTC

[GitHub] merlimat closed pull request #1110: Fix ByteBufPair encoder to release buffers

merlimat closed pull request #1110: Fix ByteBufPair encoder to release buffers
URL: https://github.com/apache/incubator-pulsar/pull/1110
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
index f25c85ebc..e6d649834 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ReplicatorTest.java
@@ -92,7 +92,7 @@ void shutdown() throws Exception {
         super.shutdown();
     }
 
-    @Test(enabled = true)
+    @Test(enabled = true, timeOut = 30000)
     public void testConfigChange() throws Exception {
         log.info("--- Starting ReplicatorTest::testConfigChange ---");
         // This test is to verify that the config change on global namespace is successfully applied in broker during
@@ -177,7 +177,7 @@ public Void call() throws Exception {
         // Case 3: TODO: Once automatic cleanup is implemented, add tests case to verify auto removal of clusters
     }
 
-    @Test
+    @Test(timeOut = 30000)
     public void testConcurrentReplicator() throws Exception {
 
         log.info("--- Starting ReplicatorTest::testConcurrentReplicator ---");
@@ -220,7 +220,7 @@ public void testConcurrentReplicator() throws Exception {
 
     }
 
-    @Test(enabled = false)
+    @Test(enabled = false, timeOut = 30000)
     public void testConfigChangeNegativeCases() throws Exception {
         log.info("--- Starting ReplicatorTest::testConfigChangeNegativeCases ---");
         // Negative test cases for global namespace config change. Verify that the namespace config change can not be
@@ -254,7 +254,7 @@ public void testConfigChangeNegativeCases() throws Exception {
         ownerCache.tryAcquiringOwnership(globalNsBundle);
     }
 
-    @Test(enabled = true)
+    @Test(enabled = true, timeOut = 30000)
     public void testReplication() throws Exception {
 
         log.info("--- Starting ReplicatorTest::testReplication ---");
@@ -355,7 +355,7 @@ public Void call() throws Exception {
         }
     }
 
-    @Test(enabled = false)
+    @Test(enabled = false, timeOut = 30000)
     public void testReplicationOverrides() throws Exception {
 
         log.info("--- Starting ReplicatorTest::testReplicationOverrides ---");
@@ -438,7 +438,7 @@ public Void call() throws Exception {
         }
     }
 
-    @Test(enabled = true)
+    @Test(enabled = true, timeOut = 30000)
     public void testFailures() throws Exception {
 
         log.info("--- Starting ReplicatorTest::testFailures ---");
@@ -459,7 +459,7 @@ public void testFailures() throws Exception {
 
     }
 
-    @Test
+    @Test(timeOut = 30000)
     public void testReplicatePeekAndSkip() throws Exception {
 
         SortedSet<String> testDests = new TreeSet<String>();
@@ -482,7 +482,7 @@ public void testReplicatePeekAndSkip() throws Exception {
         consumer1.close();
     }
 
-    @Test
+    @Test(timeOut = 30000)
     public void testReplicatorClearBacklog() throws Exception {
 
         // This test is to verify that reset cursor fails on global topic
@@ -509,7 +509,7 @@ public void testReplicatorClearBacklog() throws Exception {
         consumer1.close();
     }
 
-    @Test(enabled = true)
+    @Test(enabled = true, timeOut = 30000)
     public void testResetCursorNotFail() throws Exception {
 
         log.info("--- Starting ReplicatorTest::testResetCursorNotFail ---");
@@ -555,7 +555,7 @@ public Void call() throws Exception {
         admin1.persistentTopics().resetCursor(testDests.first(), "sub-id", System.currentTimeMillis());
     }
 
-    @Test(enabled = true)
+    @Test(enabled = true, timeOut = 30000)
     public void testReplicationForBatchMessages() throws Exception {
 
         log.info("--- Starting ReplicatorTest::testReplicationForBatchMessages ---");
@@ -629,7 +629,7 @@ public Void call() throws Exception {
      *
      * @throws Exception
      */
-    @Test
+    @Test(timeOut = 30000)
     public void testDeleteReplicatorFailure() throws Exception {
         log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure ---");
         final String topicName = "persistent://pulsar/global/ns/repltopicbatch";
@@ -666,7 +666,7 @@ public void deleteCursorFailed(ManagedLedgerException exception, Object ctx) {
         });
     }
 
-    @Test(priority = 5)
+    @Test(priority = 5, timeOut = 30000)
     public void testReplicatorProducerClosing() throws Exception {
         log.info("--- Starting ReplicatorTest::testDeleteReplicatorFailure ---");
         final String topicName = "persistent://pulsar/global/ns/repltopicbatch";
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/ByteBufPair.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/ByteBufPair.java
index bab0a5afb..297b56276 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/ByteBufPair.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/ByteBufPair.java
@@ -29,6 +29,7 @@
 import io.netty.util.AbstractReferenceCounted;
 import io.netty.util.Recycler;
 import io.netty.util.Recycler.Handle;
+import io.netty.util.ReferenceCountUtil;
 import io.netty.util.ReferenceCounted;
 
 /**
@@ -119,8 +120,12 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
                 // Write each buffer individually on the socket. The retain() here is needed to preserve the fact that
                 // ByteBuf are automatically released after a write. If the ByteBufPair ref count is increased and it
                 // gets written multiple times, the individual buffers refcount should be reflected as well.
-                ctx.write(b.getFirst().retain(), ctx.voidPromise());
-                ctx.write(b.getSecond().retain(), promise);
+                try {
+                    ctx.write(b.getFirst().retain(), ctx.voidPromise());
+                    ctx.write(b.getSecond().retain(), promise);
+                } finally {
+                    ReferenceCountUtil.safeRelease(b);
+                }
             } else {
                 ctx.write(msg, promise);
             }
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/api/ByteBufPairTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/api/ByteBufPairTest.java
index cff92c8cc..92efb4f58 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/api/ByteBufPairTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/api/ByteBufPairTest.java
@@ -18,12 +18,17 @@
  */
 package org.apache.pulsar.common.api;
 
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 
-import org.apache.pulsar.common.api.ByteBufPair;
 import org.testng.annotations.Test;
+
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.PooledByteBufAllocator;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
 
 public class ByteBufPairTest {
 
@@ -50,4 +55,32 @@ public void testDoubleByteBuf() throws Exception {
         assertEquals(b2.refCnt(), 0);
     }
 
+    @Test
+    public void testEncoder() throws Exception {
+        ByteBuf b1 = Unpooled.wrappedBuffer("hello".getBytes());
+        ByteBuf b2 = Unpooled.wrappedBuffer("world".getBytes());
+        ByteBufPair buf = ByteBufPair.get(b1, b2);
+
+        assertEquals(buf.readableBytes(), 10);
+        assertEquals(buf.getFirst(), b1);
+        assertEquals(buf.getSecond(), b2);
+
+        assertEquals(buf.refCnt(), 1);
+        assertEquals(b1.refCnt(), 1);
+        assertEquals(b2.refCnt(), 1);
+
+        ChannelHandlerContext ctx = mock(ChannelHandlerContext.class);
+        when(ctx.write(any(), any())).then(invocation -> {
+            // Simulate a write on the context which releases the buffer
+            ((ByteBuf) invocation.getArguments()[0]).release();
+            return null;
+        });
+
+        ByteBufPair.ENCODER.write(ctx, buf, null);
+
+        assertEquals(buf.refCnt(), 0);
+        assertEquals(b1.refCnt(), 0);
+        assertEquals(b2.refCnt(), 0);
+    }
+
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services