You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2016/09/29 13:14:12 UTC

svn commit: r1762764 - in /qpid/java/trunk: broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ common/src/main/java/org/apache/qpid/bytebuffer/

Author: lquack
Date: Thu Sep 29 13:14:12 2016
New Revision: 1762764

URL: http://svn.apache.org/viewvc?rev=1762764&view=rev
Log:
QPID-7440: [Java Broker] Stop ProtocolOutputConverterImpl from leaking QpidByteBuffers

Modified:
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStream.java

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java?rev=1762764&r1=1762763&r2=1762764&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java Thu Sep 29 13:14:12 2016
@@ -150,29 +150,45 @@ public class ProtocolOutputConverterImpl
 
     private DisposableMessageContentSource deflateIfPossible(MessageContentSource source)
     {
+        Collection<QpidByteBuffer> contentBuffers = source.getContent(0, (int) source.getSize());
         try
         {
-            return new ModifiedContentSource(QpidByteBuffer.deflate(source.getContent(0, (int) source.getSize())));
+            return new ModifiedContentSource(QpidByteBuffer.deflate(contentBuffers));
         }
         catch (IOException e)
         {
             LOGGER.warn("Unable to compress message payload for consumer with gzip, message will be sent as is", e);
             return null;
         }
+        finally
+        {
+            for (QpidByteBuffer contentBuffer : contentBuffers)
+            {
+                contentBuffer.dispose();
+            }
+        }
     }
 
 
     private DisposableMessageContentSource inflateIfPossible(MessageContentSource source)
     {
+        Collection<QpidByteBuffer> contentBuffers = source.getContent(0, (int) source.getSize());
         try
         {
-            return new ModifiedContentSource(QpidByteBuffer.inflate(source.getContent(0, (int) source.getSize())));
+            return new ModifiedContentSource(QpidByteBuffer.inflate(contentBuffers));
         }
         catch (IOException e)
         {
             LOGGER.warn("Unable to decompress message payload for consumer with gzip, message will be sent as is", e);
             return null;
         }
+        finally
+        {
+            for (QpidByteBuffer contentBuffer : contentBuffers)
+            {
+                contentBuffer.dispose();
+            }
+        }
     }
 
 

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java?rev=1762764&r1=1762763&r2=1762764&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java Thu Sep 29 13:14:12 2016
@@ -631,6 +631,14 @@ public final class QpidByteBuffer
             }
             return uncompressedBuffers;
         }
+        catch (IOException e)
+        {
+            for (QpidByteBuffer uncompressedBuffer : uncompressedBuffers)
+            {
+                uncompressedBuffer.dispose();
+            }
+            throw e;
+        }
     }
 
     public static Collection<QpidByteBuffer> deflate(Collection<QpidByteBuffer> uncompressedBuffers) throws IOException
@@ -649,9 +657,8 @@ public final class QpidByteBuffer
         }
         final int bufferSize = (isDirect && _pooledBufferSize > 0) ? _pooledBufferSize : 65536;
 
-        QpidByteBufferOutputStream compressedOutput = new QpidByteBufferOutputStream(isDirect, bufferSize);
-
-        try(InputStream compressedInput = new CompositeInputStream(streams);
+        try(QpidByteBufferOutputStream compressedOutput = new QpidByteBufferOutputStream(isDirect, bufferSize);
+            InputStream compressedInput = new CompositeInputStream(streams);
             GZIPOutputStream gzipStream = new GZIPOutputStream(new BufferedOutputStream(compressedOutput, bufferSize)))
         {
             byte[] buf = new byte[16384];
@@ -660,11 +667,10 @@ public final class QpidByteBuffer
             {
                 gzipStream.write(buf, 0, read);
             }
+            gzipStream.finish();
+            gzipStream.flush();
+            return compressedOutput.fetchAccumulatedBuffers();
         }
-
-        // output pipeline will be already flushed and closed
-
-        return compressedOutput.fetchAccumulatedBuffers();
     }
 
     public static long write(GatheringByteChannel channel, Collection<QpidByteBuffer> buffers) throws IOException

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStream.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStream.java?rev=1762764&r1=1762763&r2=1762764&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStream.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStream.java Thu Sep 29 13:14:12 2016
@@ -78,6 +78,11 @@ public class QpidByteBufferOutputStream
     public void close() throws IOException
     {
         _closed = true;
+        for (QpidByteBuffer buffer : _buffers)
+        {
+            buffer.dispose();
+        }
+        _buffers.clear();
     }
 
     public Collection<QpidByteBuffer> fetchAccumulatedBuffers()



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org