You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2016/09/08 14:14:18 UTC
svn commit: r1759829 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/java/org/apache/qpid/server/queue/
broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/re...
Author: orudyy
Date: Thu Sep 8 14:14:18 2016
New Revision: 1759829
URL: http://svn.apache.org/viewvc?rev=1759829&view=rev
Log:
QPID-7408: [Java Broker, WMC] Fix limiting of compressed content
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/StreamingContent.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/StreamingContent.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/StreamingContent.java?rev=1759829&r1=1759828&r2=1759829&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/StreamingContent.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/StreamingContent.java Thu Sep 8 14:14:18 2016
@@ -25,4 +25,6 @@ import java.io.InputStream;
public interface StreamingContent
{
InputStream getInputStream();
+
+ long getLimit();
}
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1759829&r1=1759828&r2=1759829&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Thu Sep 8 14:14:18 2016
@@ -2693,14 +2693,15 @@ public abstract class AbstractQueue<X ex
MessageContent(MessageReference<?> messageReference, long limit)
{
_messageReference = messageReference;
- _limit = (limit == UNLIMITED ? messageReference.getMessage().getSize() : limit);
+ _limit = limit;
}
@Override
public void write(OutputStream outputStream) throws IOException
{
ServerMessage message = _messageReference.getMessage();
- Collection<QpidByteBuffer> content = message.getContent(0, (int) _limit);
+ int length = (int) (_limit == UNLIMITED ? message.getSize() : _limit);
+ Collection<QpidByteBuffer> content = message.getContent(0, length);
try
{
for (QpidByteBuffer b : content)
@@ -2764,7 +2765,7 @@ public abstract class AbstractQueue<X ex
public InputStream getInputStream()
{
ServerMessage message = _messageReference.getMessage();
- final Collection<QpidByteBuffer> content = message.getContent(0, (int) _limit);
+ final Collection<QpidByteBuffer> content = message.getContent(0, (int) message.getSize());
Collection<InputStream> streams = new ArrayList<>(content.size());
for (QpidByteBuffer b : content)
{
@@ -2789,6 +2790,12 @@ public abstract class AbstractQueue<X ex
}
};
}
+
+ @Override
+ public long getLimit()
+ {
+ return _limit;
+ }
}
private static class AcquireAllQueueEntryFilter implements QueueEntryFilter
Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java?rev=1759829&r1=1759828&r2=1759829&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java Thu Sep 8 14:14:18 2016
@@ -331,13 +331,19 @@ public abstract class AbstractServlet ex
Map<String, Object> headers = getResponseHeaders(content);
boolean isGzipCompressed = GZIP_CONTENT_ENCODING.equals(headers.get(CONTENT_ENCODING_HEADER.toUpperCase()));
boolean isCompressing = HttpManagementUtil.isCompressing(request, _managementConfiguration);
- try (OutputStream os = isGzipCompressed ? response.getOutputStream() : getOutputStream(request, response))
+
+ if (isGzipCompressed && content instanceof StreamingContent
+ && ((((StreamingContent) content).getLimit() >= 0 || !isCompressing)))
+ {
+ headers.remove(CONTENT_ENCODING_HEADER);
+ content = new DecompressingContent((StreamingContent) content);
+ isGzipCompressed = false;
+ }
+
+ try (OutputStream os = isGzipCompressed && isCompressing
+ ? response.getOutputStream()
+ : getOutputStream(request, response))
{
- if (isGzipCompressed && !isCompressing && content instanceof StreamingContent)
- {
- headers.remove(CONTENT_ENCODING_HEADER);
- content = new DecompressingContent((StreamingContent) content);
- }
response.setStatus(HttpServletResponse.SC_OK);
for (Map.Entry<String, Object> entry : headers.entrySet())
{
@@ -428,9 +434,11 @@ public abstract class AbstractServlet ex
@Override
public void write(final OutputStream os) throws IOException
{
- try(GZIPInputStream gzipInputStream = new GZIPInputStream(_content.getInputStream());)
+ try (GZIPInputStream gzipInputStream = new GZIPInputStream(_content.getInputStream());)
{
- ByteStreams.copy(gzipInputStream, os);
+ ByteStreams.copy(_content.getLimit() >= 0
+ ? ByteStreams.limit(gzipInputStream, _content.getLimit())
+ : gzipInputStream, os);
}
}
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java?rev=1759829&r1=1759828&r2=1759829&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java Thu Sep 8 14:14:18 2016
@@ -203,6 +203,10 @@ public class MessageCompressionTest exte
byte[] messageBytes = _restTestHelper.getBytes(queueRelativePath + "/getMessageContent?messageId=" + id);
String content = new String(messageBytes, StandardCharsets.UTF_8);
assertEquals("Unexpected message content :" + content, messageText, content);
+
+ messageBytes = _restTestHelper.getBytes(queueRelativePath + "/getMessageContent?limit=1024&messageId=" + id);
+ content = new String(messageBytes, StandardCharsets.UTF_8);
+ assertEquals("Unexpected message content :" + content, messageText.substring(0, 1024), content);
}
public void testGetContentViaRestForCompressedMessageWithAgentSupportingCompression() throws Exception
@@ -226,11 +230,22 @@ public class MessageCompressionTest exte
long id = ((Number) messages.get(0).get("id")).longValue();
_restTestHelper.setAcceptEncoding("gzip, deflate, br");
- HttpURLConnection connection =
- _restTestHelper.openManagementConnection(queueRelativePath + "/getMessageContent?messageId=" + id,
- "GET");
+ String content = getDecompressedContent(queueRelativePath + "/getMessageContent?messageId=" + id);
+ assertEquals("Unexpected message content :" + content, messageText, content);
+
+ content = getDecompressedContent(queueRelativePath + "/getMessageContent?limit=1024&messageId=" + id);
+ assertEquals("Unexpected message content :" + content, messageText.substring(0, 1024), content);
+ }
+
+ private String getDecompressedContent(final String url) throws IOException
+ {
+ HttpURLConnection connection = _restTestHelper.openManagementConnection(url, "GET");
connection.connect();
+ return decompressInputStream(connection);
+ }
+ private String decompressInputStream(final HttpURLConnection connection) throws IOException
+ {
String content;
try (InputStream is = new GZIPInputStream(connection.getInputStream());
ByteArrayOutputStream baos = new ByteArrayOutputStream())
@@ -243,7 +258,7 @@ public class MessageCompressionTest exte
}
content = new String(baos.toByteArray(), StandardCharsets.UTF_8);
}
- assertEquals("Unexpected message content :" + content, messageText, content);
+ return content;
}
private void publishMessage(final Connection senderConnection, final String messageText)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org