You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2016/09/08 14:14:18 UTC

svn commit: r1759829 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/java/org/apache/qpid/server/queue/ broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/re...

Author: orudyy
Date: Thu Sep  8 14:14:18 2016
New Revision: 1759829

URL: http://svn.apache.org/viewvc?rev=1759829&view=rev
Log:
QPID-7408: [Java Broker, WMC] Fix limiting of compressed content

Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/StreamingContent.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/StreamingContent.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/StreamingContent.java?rev=1759829&r1=1759828&r2=1759829&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/StreamingContent.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/StreamingContent.java Thu Sep  8 14:14:18 2016
@@ -25,4 +25,6 @@ import java.io.InputStream;
 public interface StreamingContent
 {
     InputStream getInputStream();
+
+    long getLimit();
 }

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1759829&r1=1759828&r2=1759829&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Thu Sep  8 14:14:18 2016
@@ -2693,14 +2693,15 @@ public abstract class AbstractQueue<X ex
         MessageContent(MessageReference<?> messageReference, long limit)
         {
             _messageReference = messageReference;
-            _limit = (limit == UNLIMITED ? messageReference.getMessage().getSize() : limit);
+            _limit = limit;
         }
 
         @Override
         public void write(OutputStream outputStream) throws IOException
         {
             ServerMessage message = _messageReference.getMessage();
-            Collection<QpidByteBuffer> content = message.getContent(0, (int) _limit);
+            int length = (int) (_limit == UNLIMITED ? message.getSize() : _limit);
+            Collection<QpidByteBuffer> content = message.getContent(0, length);
             try
             {
                 for (QpidByteBuffer b : content)
@@ -2764,7 +2765,7 @@ public abstract class AbstractQueue<X ex
         public InputStream getInputStream()
         {
             ServerMessage message = _messageReference.getMessage();
-            final Collection<QpidByteBuffer> content = message.getContent(0, (int) _limit);
+            final Collection<QpidByteBuffer> content = message.getContent(0, (int) message.getSize());
             Collection<InputStream> streams = new ArrayList<>(content.size());
             for (QpidByteBuffer b : content)
             {
@@ -2789,6 +2790,12 @@ public abstract class AbstractQueue<X ex
                 }
             };
         }
+
+        @Override
+        public long getLimit()
+        {
+            return _limit;
+        }
     }
 
     private static class AcquireAllQueueEntryFilter implements QueueEntryFilter

Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java?rev=1759829&r1=1759828&r2=1759829&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java Thu Sep  8 14:14:18 2016
@@ -331,13 +331,19 @@ public abstract class AbstractServlet ex
         Map<String, Object> headers = getResponseHeaders(content);
         boolean isGzipCompressed = GZIP_CONTENT_ENCODING.equals(headers.get(CONTENT_ENCODING_HEADER.toUpperCase()));
         boolean isCompressing = HttpManagementUtil.isCompressing(request, _managementConfiguration);
-        try (OutputStream os = isGzipCompressed ? response.getOutputStream() : getOutputStream(request, response))
+
+        if (isGzipCompressed && content instanceof StreamingContent
+            && ((((StreamingContent) content).getLimit() >= 0 || !isCompressing)))
+        {
+            headers.remove(CONTENT_ENCODING_HEADER);
+            content = new DecompressingContent((StreamingContent) content);
+            isGzipCompressed = false;
+        }
+
+        try (OutputStream os = isGzipCompressed && isCompressing
+                ? response.getOutputStream()
+                : getOutputStream(request, response))
         {
-            if (isGzipCompressed && !isCompressing && content instanceof StreamingContent)
-            {
-                headers.remove(CONTENT_ENCODING_HEADER);
-                content = new DecompressingContent((StreamingContent) content);
-            }
             response.setStatus(HttpServletResponse.SC_OK);
             for (Map.Entry<String, Object> entry : headers.entrySet())
             {
@@ -428,9 +434,11 @@ public abstract class AbstractServlet ex
         @Override
         public void write(final OutputStream os) throws IOException
         {
-            try(GZIPInputStream gzipInputStream = new GZIPInputStream(_content.getInputStream());)
+            try (GZIPInputStream gzipInputStream = new GZIPInputStream(_content.getInputStream());)
             {
-                ByteStreams.copy(gzipInputStream, os);
+                ByteStreams.copy(_content.getLimit() >= 0
+                                         ? ByteStreams.limit(gzipInputStream, _content.getLimit())
+                                         : gzipInputStream, os);
             }
         }
 

Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java?rev=1759829&r1=1759828&r2=1759829&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java Thu Sep  8 14:14:18 2016
@@ -203,6 +203,10 @@ public class MessageCompressionTest exte
         byte[] messageBytes = _restTestHelper.getBytes(queueRelativePath + "/getMessageContent?messageId=" + id);
         String content = new String(messageBytes, StandardCharsets.UTF_8);
         assertEquals("Unexpected message content :" + content, messageText, content);
+
+        messageBytes = _restTestHelper.getBytes(queueRelativePath + "/getMessageContent?limit=1024&messageId=" + id);
+        content = new String(messageBytes, StandardCharsets.UTF_8);
+        assertEquals("Unexpected message content :" + content, messageText.substring(0, 1024), content);
     }
 
     public void testGetContentViaRestForCompressedMessageWithAgentSupportingCompression() throws Exception
@@ -226,11 +230,22 @@ public class MessageCompressionTest exte
         long id = ((Number) messages.get(0).get("id")).longValue();
 
         _restTestHelper.setAcceptEncoding("gzip, deflate, br");
-        HttpURLConnection connection =
-                _restTestHelper.openManagementConnection(queueRelativePath + "/getMessageContent?messageId=" + id,
-                                                         "GET");
+        String content = getDecompressedContent(queueRelativePath + "/getMessageContent?messageId=" + id);
+        assertEquals("Unexpected message content :" + content, messageText, content);
+
+        content = getDecompressedContent(queueRelativePath + "/getMessageContent?limit=1024&messageId=" + id);
+        assertEquals("Unexpected message content :" + content, messageText.substring(0, 1024), content);
+    }
+
+    private String getDecompressedContent(final String url) throws IOException
+    {
+        HttpURLConnection connection = _restTestHelper.openManagementConnection(url, "GET");
         connection.connect();
+        return decompressInputStream(connection);
+    }
 
+    private String decompressInputStream(final HttpURLConnection connection) throws IOException
+    {
         String content;
         try (InputStream is = new GZIPInputStream(connection.getInputStream());
              ByteArrayOutputStream baos = new ByteArrayOutputStream())
@@ -243,7 +258,7 @@ public class MessageCompressionTest exte
             }
             content = new String(baos.toByteArray(), StandardCharsets.UTF_8);
         }
-        assertEquals("Unexpected message content :" + content, messageText, content);
+        return content;
     }
 
     private void publishMessage(final Connection senderConnection, final String messageText)



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org