You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2016/09/29 13:14:12 UTC
svn commit: r1762764 - in /qpid/java/trunk:
broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/
common/src/main/java/org/apache/qpid/bytebuffer/
Author: lquack
Date: Thu Sep 29 13:14:12 2016
New Revision: 1762764
URL: http://svn.apache.org/viewvc?rev=1762764&view=rev
Log:
QPID-7440: [Java Broker] Stop ProtocolOutputConverterImpl from leaking QpidByteBuffers
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStream.java
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java?rev=1762764&r1=1762763&r2=1762764&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java Thu Sep 29 13:14:12 2016
@@ -150,29 +150,45 @@ public class ProtocolOutputConverterImpl
private DisposableMessageContentSource deflateIfPossible(MessageContentSource source)
{
+ Collection<QpidByteBuffer> contentBuffers = source.getContent(0, (int) source.getSize());
try
{
- return new ModifiedContentSource(QpidByteBuffer.deflate(source.getContent(0, (int) source.getSize())));
+ return new ModifiedContentSource(QpidByteBuffer.deflate(contentBuffers));
}
catch (IOException e)
{
LOGGER.warn("Unable to compress message payload for consumer with gzip, message will be sent as is", e);
return null;
}
+ finally
+ {
+ for (QpidByteBuffer contentBuffer : contentBuffers)
+ {
+ contentBuffer.dispose();
+ }
+ }
}
private DisposableMessageContentSource inflateIfPossible(MessageContentSource source)
{
+ Collection<QpidByteBuffer> contentBuffers = source.getContent(0, (int) source.getSize());
try
{
- return new ModifiedContentSource(QpidByteBuffer.inflate(source.getContent(0, (int) source.getSize())));
+ return new ModifiedContentSource(QpidByteBuffer.inflate(contentBuffers));
}
catch (IOException e)
{
LOGGER.warn("Unable to decompress message payload for consumer with gzip, message will be sent as is", e);
return null;
}
+ finally
+ {
+ for (QpidByteBuffer contentBuffer : contentBuffers)
+ {
+ contentBuffer.dispose();
+ }
+ }
}
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java?rev=1762764&r1=1762763&r2=1762764&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java Thu Sep 29 13:14:12 2016
@@ -631,6 +631,14 @@ public final class QpidByteBuffer
}
return uncompressedBuffers;
}
+ catch (IOException e)
+ {
+ for (QpidByteBuffer uncompressedBuffer : uncompressedBuffers)
+ {
+ uncompressedBuffer.dispose();
+ }
+ throw e;
+ }
}
public static Collection<QpidByteBuffer> deflate(Collection<QpidByteBuffer> uncompressedBuffers) throws IOException
@@ -649,9 +657,8 @@ public final class QpidByteBuffer
}
final int bufferSize = (isDirect && _pooledBufferSize > 0) ? _pooledBufferSize : 65536;
- QpidByteBufferOutputStream compressedOutput = new QpidByteBufferOutputStream(isDirect, bufferSize);
-
- try(InputStream compressedInput = new CompositeInputStream(streams);
+ try(QpidByteBufferOutputStream compressedOutput = new QpidByteBufferOutputStream(isDirect, bufferSize);
+ InputStream compressedInput = new CompositeInputStream(streams);
GZIPOutputStream gzipStream = new GZIPOutputStream(new BufferedOutputStream(compressedOutput, bufferSize)))
{
byte[] buf = new byte[16384];
@@ -660,11 +667,10 @@ public final class QpidByteBuffer
{
gzipStream.write(buf, 0, read);
}
+ gzipStream.finish();
+ gzipStream.flush();
+ return compressedOutput.fetchAccumulatedBuffers();
}
-
- // output pipeline will be already flushed and closed
-
- return compressedOutput.fetchAccumulatedBuffers();
}
public static long write(GatheringByteChannel channel, Collection<QpidByteBuffer> buffers) throws IOException
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStream.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStream.java?rev=1762764&r1=1762763&r2=1762764&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStream.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStream.java Thu Sep 29 13:14:12 2016
@@ -78,6 +78,11 @@ public class QpidByteBufferOutputStream
public void close() throws IOException
{
_closed = true;
+ for (QpidByteBuffer buffer : _buffers)
+ {
+ buffer.dispose();
+ }
+ _buffers.clear();
}
public Collection<QpidByteBuffer> fetchAccumulatedBuffers()
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org