You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2016/09/27 11:59:09 UTC

svn commit: r1762460 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/java/org/apache/qpid/server/queue/ broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/ broker-pl...

Author: lquack
Date: Tue Sep 27 11:59:09 2016
New Revision: 1762460

URL: http://svn.apache.org/viewvc?rev=1762460&view=rev
Log:
QPID-7409: [Java Broker] Move responsibility to limit message content to ManagedOperation getMessageContent

* returnJson always returns uncompressed data
* new parameter decompressBeforeLimiting (default "false")

Added:
    qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferInputStream.java
Removed:
    qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/LimitingOutputStream.java
Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java
    qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1762460&r1=1762459&r2=1762460&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Tue Sep 27 11:59:09 2016
@@ -311,7 +311,11 @@ public interface Queue<X extends Queue<X
                               @Param(name = "returnJson", defaultValue = "false",
                                       description = "If true, converts message content into json format"
                                                     + " if message mime-type is either amqp/map or amqp/list"
-                                                    + " or jms/map-message. Default is false.") boolean returnJson);
+                                                    + " or jms/map-message. Default is false.") boolean returnJson,
+                              @Param(name = "decompressBeforeLimiting", defaultValue = "false",
+                                      description = "If true, the operation will attempt to decompress the message"
+                                                    + "(should it be compressed) before applying any limit. If"
+                                                    + "decompression fails the operation will fail.") boolean decompressBeforeLimiting);
 
     @ManagedOperation(nonModifying = true, paramRequiringSecure = "includeHeaders")
     List<MessageInfo> getMessageInfo(@Param(name = "first", defaultValue = "-1") int first,

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1762460&r1=1762459&r2=1762460&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Tue Sep 27 11:59:09 2016
@@ -19,8 +19,10 @@
 package org.apache.qpid.server.queue;
 
 import static org.apache.qpid.server.util.ParameterizedTypes.MAP_OF_STRING_STRING;
+import static org.apache.qpid.util.GZIPUtils.GZIP_CONTENT_ENCODING;
 
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
@@ -31,6 +33,7 @@ import java.security.AccessController;
 import java.security.Principal;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
@@ -50,9 +53,12 @@ import java.util.concurrent.RejectedExec
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
 
 import javax.security.auth.Subject;
 
+import com.google.common.io.ByteStreams;
 import com.google.common.util.concurrent.FutureCallback;
 import com.google.common.util.concurrent.Futures;
 import com.google.common.util.concurrent.ListenableFuture;
@@ -60,7 +66,7 @@ import com.google.common.util.concurrent
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.bytebuffer.QpidByteBufferInputStream;
 import org.apache.qpid.filter.SelectorParsingException;
 import org.apache.qpid.filter.selector.ParseException;
 import org.apache.qpid.filter.selector.TokenMgrError;
@@ -110,7 +116,6 @@ import org.apache.qpid.server.util.Serve
 import org.apache.qpid.server.util.StateChangeListener;
 import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
 import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.util.GZIPUtils;
 
 public abstract class AbstractQueue<X extends AbstractQueue<X>>
         extends AbstractConfiguredObject<X>
@@ -2690,26 +2695,54 @@ public abstract class AbstractQueue<X ex
         }
     }
 
-    abstract class BaseMessageContent implements Content
+    abstract class BaseMessageContent implements Content, CustomRestHeaders
     {
         public static final int UNLIMITED = -1;
         protected final MessageReference<?> _messageReference;
         protected final long _limit;
+        private final boolean _truncated;
 
         BaseMessageContent(MessageReference<?> messageReference, long limit)
         {
             _messageReference = messageReference;
             _limit = limit;
+            _truncated = limit >= 0 && _messageReference.getMessage().getSize() > limit;
         }
 
         @Override
-        public void release()
+        public final void release()
         {
             _messageReference.release();
         }
 
-        public abstract String getContentType();
+        protected boolean isTruncated()
+        {
+            return _truncated;
+        }
 
+        @SuppressWarnings("unused")
+        @RestContentHeader("X-Content-Truncated")
+        public String getContentTruncated()
+        {
+            return String.valueOf(isTruncated());
+        }
+
+        @SuppressWarnings("unused")
+        @RestContentHeader("Content-Type")
+        public String getContentType()
+        {
+            return _messageReference.getMessage().getMessageHeader().getMimeType();
+        }
+
+        @SuppressWarnings("unused")
+        @RestContentHeader("Content-Encoding")
+        public String getContentEncoding()
+        {
+            return _messageReference.getMessage().getMessageHeader().getEncoding();
+        }
+
+        @SuppressWarnings("unused")
+        @RestContentHeader("Content-Disposition")
         public String getContentDisposition()
         {
             try
@@ -2739,114 +2772,89 @@ public abstract class AbstractQueue<X ex
         }
     }
 
-    class JsonMessageContent extends BaseMessageContent implements CustomRestHeaders
+    class JsonMessageContent extends BaseMessageContent
     {
         private final InternalMessage _internalMessage;
-        private boolean _truncate = false;
 
         JsonMessageContent(MessageReference<?> messageReference, InternalMessage message, long limit)
         {
             super(messageReference, limit);
             _internalMessage = message;
-            _truncate = limit >= 0 && _messageReference.getMessage().getSize() > limit;
         }
 
         @Override
-        public void write(final OutputStream outputStream) throws IOException
+        public void write(OutputStream outputStream) throws IOException
         {
             Object messageBody = _internalMessage.getMessageBody();
-            new MessageContentJsonConverter(messageBody, _truncate ? _limit : UNLIMITED).convertAndWrite(outputStream);
+            new MessageContentJsonConverter(messageBody, isTruncated() ? _limit : UNLIMITED).convertAndWrite(outputStream);
         }
 
         @SuppressWarnings("unused")
-        @RestContentHeader("X-Content-Truncated")
-        public String getContentTruncated()
+        @Override
+        @RestContentHeader("Content-Encoding")
+        public String getContentEncoding()
         {
-            return String.valueOf(_truncate);
+            return "identity";
         }
 
         @SuppressWarnings("unused")
+        @Override
         @RestContentHeader("Content-Type")
         public String getContentType()
         {
             return "application/json";
         }
-
-        @SuppressWarnings("unused")
-        @RestContentHeader("Content-Disposition")
-        public String getContentDisposition()
-        {
-            return super.getContentDisposition();
-        }
     }
 
-    class MessageContent extends BaseMessageContent implements CustomRestHeaders
+    class MessageContent extends BaseMessageContent
     {
 
-        MessageContent(MessageReference<?> messageReference, long limit)
-        {
-            super(messageReference, limit);
-        }
+        private boolean _decompressBeforeLimiting;
 
-        @Override
-        public void write(OutputStream outputStream) throws IOException
+        MessageContent(MessageReference<?> messageReference, long limit, boolean decompressBeforeLimiting)
         {
-            ServerMessage message = _messageReference.getMessage();
-            int length = (int) ((_limit == UNLIMITED || GZIPUtils.GZIP_CONTENT_ENCODING.equals(getContentEncoding())) ? message.getSize() : _limit);
-            Collection<QpidByteBuffer> content = message.getContent(0, length);
-            try
+            super(messageReference, limit);
+            if (decompressBeforeLimiting)
             {
-                for (QpidByteBuffer b : content)
+                String contentEncoding = getContentEncoding();
+                if (GZIP_CONTENT_ENCODING.equals(contentEncoding))
                 {
-                    int len = b.remaining();
-                    byte[] data = new byte[len];
-                    b.get(data);
-                    outputStream.write(data);
+                    _decompressBeforeLimiting = true;
                 }
-            }
-            finally
-            {
-                for (QpidByteBuffer b : content)
+                else if (contentEncoding != null && !"".equals(contentEncoding) && !"identity".equals(contentEncoding))
                 {
-                    b.dispose();
+                    throw new IllegalArgumentException(String.format(
+                            "Requested decompression of message with unknown compression '%s'", contentEncoding));
                 }
             }
         }
 
         @Override
-        public void release()
-        {
-            _messageReference.release();
-        }
-
-        @SuppressWarnings("unused")
-        @RestContentHeader("Content-Type")
-        public String getContentType()
+        public void write(OutputStream outputStream) throws IOException
         {
-            return _messageReference.getMessage().getMessageHeader().getMimeType();
-        }
+            ServerMessage message = _messageReference.getMessage();
 
-        @SuppressWarnings("unused")
-        @RestContentHeader("Content-Encoding")
-        public String getContentEncoding()
-        {
-            return _messageReference.getMessage().getMessageHeader().getEncoding();
-        }
+            int length = (int) ((_limit == UNLIMITED || _decompressBeforeLimiting) ? message.getSize() : _limit);
+            InputStream inputStream = new QpidByteBufferInputStream(message.getContent(0, length));
 
-        @SuppressWarnings("unused")
-        @RestContentHeader("Content-Disposition")
-        public String getContentDisposition()
-        {
-            return super.getContentDisposition();
-        }
+            if (_limit != UNLIMITED && _decompressBeforeLimiting)
+            {
+                inputStream = new GZIPInputStream(inputStream);
+                inputStream = ByteStreams.limit(inputStream, _limit);
+                outputStream = new GZIPOutputStream(outputStream);
+            }
 
-        @SuppressWarnings("unused")
-        @RestContentHeader("X-Content-Limit")
-        public Long getContentLimit()
-        {
-            return _limit;
+            try
+            {
+                 long foo = ByteStreams.copy(inputStream, outputStream);
+                foo = foo +1 -1;
+            }
+            finally
+            {
+                outputStream.close();
+                inputStream.close();
+            }
         }
-
     }
 
     private static class AcquireAllQueueEntryFilter implements QueueEntryFilter
@@ -3613,13 +3621,13 @@ public abstract class AbstractQueue<X ex
     }
 
     @Override
-    public Content getMessageContent(final long messageId, final long limit, boolean returnJson)
+    public Content getMessageContent(final long messageId, final long limit, boolean returnJson, boolean decompressBeforeLimiting)
     {
         final MessageContentFinder messageFinder = new MessageContentFinder(messageId);
         visit(messageFinder);
         if (messageFinder.isFound())
         {
-            return createMessageContent(messageFinder.getMessageReference(), returnJson, limit);
+            return createMessageContent(messageFinder.getMessageReference(), returnJson, limit, decompressBeforeLimiting);
         }
         else
         {
@@ -3629,7 +3637,8 @@ public abstract class AbstractQueue<X ex
 
     private Content createMessageContent(final MessageReference<?> messageReference,
                                          final boolean returnJson,
-                                         final long limit)
+                                         final long limit,
+                                         final boolean decompressBeforeLimiting)
     {
         String mimeType = messageReference.getMessage().getMessageHeader().getMimeType();
         if (returnJson && ("amqp/list".equalsIgnoreCase(mimeType)
@@ -3653,7 +3662,7 @@ public abstract class AbstractQueue<X ex
                 }
             }
         }
-        return new MessageContent(messageReference, limit);
+        return new MessageContent(messageReference, limit, decompressBeforeLimiting);
     }
 
     @Override

Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java?rev=1762460&r1=1762459&r2=1762460&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java Tue Sep 27 11:59:09 2016
@@ -50,7 +50,6 @@ import org.slf4j.LoggerFactory;
 import org.apache.qpid.server.management.plugin.GunzipOutputStream;
 import org.apache.qpid.server.management.plugin.HttpManagementConfiguration;
 import org.apache.qpid.server.management.plugin.HttpManagementUtil;
-import org.apache.qpid.server.management.plugin.LimitingOutputStream;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.ConfiguredObjectJacksonModule;
 import org.apache.qpid.server.model.Content;
@@ -63,7 +62,6 @@ public abstract class AbstractServlet ex
     public static final int SC_UNPROCESSABLE_ENTITY = 422;
     private static final Logger LOGGER = LoggerFactory.getLogger(AbstractServlet.class);
     public static final String CONTENT_DISPOSITION = "Content-disposition";
-    public static final String CONTENT_LIMIT = "X-Content-Limit";
 
     private Broker<?> _broker;
     private HttpManagementConfiguration _managementConfiguration;
@@ -350,35 +348,17 @@ public abstract class AbstractServlet ex
                                          final HttpServletResponse response,
                                          Map<String, Object> headers) throws IOException
     {
-        boolean isGzipCompressed = GZIP_CONTENT_ENCODING.equals(headers.get(CONTENT_ENCODING_HEADER.toUpperCase()));
-        boolean isCompressingAccepted = HttpManagementUtil.isCompressingAccepted(request, _managementConfiguration);
-        if (isGzipCompressed && !isCompressingAccepted)
-        {
-            headers.remove(CONTENT_ENCODING_HEADER);
-        }
-        long limit = -1;
-        if (headers.containsKey(CONTENT_LIMIT.toUpperCase()))
-        {
-            limit = Long.parseLong(String.valueOf(headers.get(CONTENT_LIMIT.toUpperCase())));
-        }
+        final boolean isGzipCompressed = GZIP_CONTENT_ENCODING.equals(headers.get(CONTENT_ENCODING_HEADER.toUpperCase()));
+        final boolean isCompressingAccepted = HttpManagementUtil.isCompressingAccepted(request, _managementConfiguration);
+
         OutputStream stream = response.getOutputStream();
 
         if (isGzipCompressed)
         {
             if (!isCompressingAccepted)
             {
-                if (limit > 0)
-                {
-                    stream = new LimitingOutputStream(stream, limit);
-                }
                 stream = new GunzipOutputStream(stream);
-            }
-            else
-            {
-                if (limit > 0)
-                {
-                    stream = new GunzipOutputStream(new LimitingOutputStream(new GZIPOutputStream(stream), limit));
-                }
+                headers.remove(CONTENT_ENCODING_HEADER.toUpperCase());
             }
         }
         else
@@ -386,6 +366,7 @@ public abstract class AbstractServlet ex
             if (isCompressingAccepted)
             {
                 stream = new GZIPOutputStream(stream);
+                headers.put(CONTENT_ENCODING_HEADER.toUpperCase(), GZIP_CONTENT_ENCODING);
             }
         }
 

Added: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferInputStream.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferInputStream.java?rev=1762460&view=auto
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferInputStream.java (added)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferInputStream.java Tue Sep 27 11:59:09 2016
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.bytebuffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.qpid.streams.CompositeInputStream;
+
+/**
+ * InputStream implementation that takes a list QpidByteBuffers.
+ * The QpidByteBufferInputStream takes ownership of the buffers and disposes them on close().
+ *
+ * Not thread safe.
+ */
+public class QpidByteBufferInputStream extends InputStream
+{
+    private final CompositeInputStream _compositeInputStream;
+    private final Collection<QpidByteBuffer> _buffers;
+
+    public QpidByteBufferInputStream(Collection<QpidByteBuffer> buffers)
+    {
+        _buffers = buffers;
+
+        final Collection<InputStream> streams = new ArrayList<>(buffers.size());
+        for (QpidByteBuffer buffer : buffers)
+        {
+            streams.add(buffer.asInputStream());
+        }
+        _compositeInputStream = new CompositeInputStream(streams);
+    }
+
+    @Override
+    public int read() throws IOException
+    {
+        return _compositeInputStream.read();
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException
+    {
+        return _compositeInputStream.read(b, off, len);
+    }
+
+    @Override
+    public void mark(int readlimit)
+    {
+        _compositeInputStream.mark(readlimit);
+    }
+
+    @Override
+    public void reset() throws IOException
+    {
+        _compositeInputStream.reset();
+    }
+
+    @Override
+    public boolean markSupported()
+    {
+        return _compositeInputStream.markSupported();
+    }
+
+    @Override
+    public long skip(long n) throws IOException
+    {
+        return _compositeInputStream.skip(n);
+    }
+
+    @Override
+    public int available() throws IOException
+    {
+        return _compositeInputStream.available();
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+        try
+        {
+            _compositeInputStream.close();
+        }
+        finally
+        {
+            for (QpidByteBuffer buffer : _buffers)
+            {
+                buffer.dispose();
+            }
+        }
+    }
+}

Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java?rev=1762460&r1=1762459&r2=1762460&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java Tue Sep 27 11:59:09 2016
@@ -21,6 +21,7 @@
 package org.apache.qpid.systest;
 
 import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.HttpURLConnection;
@@ -209,7 +210,7 @@ public class MessageCompressionTest exte
         String content = new String(messageBytes, StandardCharsets.UTF_8);
         assertEquals("Unexpected message content :" + content, messageText, content);
 
-        messageBytes = _restTestHelper.getBytes(queueRelativePath + "/getMessageContent?limit=1024&messageId=" + id);
+        messageBytes = _restTestHelper.getBytes(queueRelativePath + "/getMessageContent?limit=1024&decompressBeforeLimiting=true&messageId=" + id);
         content = new String(messageBytes, StandardCharsets.UTF_8);
         assertEquals("Unexpected message content :" + content, messageText.substring(0, 1024), content);
     }
@@ -238,10 +239,42 @@ public class MessageCompressionTest exte
         String content = getDecompressedContent(queueRelativePath + "/getMessageContent?messageId=" + id);
         assertEquals("Unexpected message content :" + content, messageText, content);
 
-        content = getDecompressedContent(queueRelativePath + "/getMessageContent?limit=1024&messageId=" + id);
+        content = getDecompressedContent(queueRelativePath + "/getMessageContent?limit=1024&decompressBeforeLimiting=true&messageId=" + id);
         assertEquals("Unexpected message content :" + content, messageText.substring(0, 1024), content);
     }
 
+    public void testGetTruncatedContentViaRestForCompressedMessage() throws Exception
+    {
+        setTestSystemProperty(Broker.BROKER_MESSAGE_COMPRESSION_ENABLED, String.valueOf(true));
+
+        doActualSetUp();
+
+        String messageText = createMessageText();
+        Connection senderConnection = getConnection(true);
+        String virtualPath = getConnectionFactory().getVirtualPath();
+        String testQueueName = getTestQueueName();
+        createAndBindQueue(virtualPath, testQueueName);
+
+        publishMessage(senderConnection, messageText);
+
+        String queueRelativePath = "queue" + virtualPath + virtualPath + "/" + testQueueName;
+
+        List<Map<String, Object>> messages = _restTestHelper.getJsonAsList(queueRelativePath + "/getMessageInfo");
+        assertEquals("Unexpected number of messages", 1, messages.size());
+        long id = ((Number) messages.get(0).get("id")).longValue();
+
+        _restTestHelper.setAcceptEncoding("gzip");
+        try
+        {
+            getDecompressedContent(queueRelativePath + "/getMessageContent?limit=1024&messageId=" + id);
+            fail("Should not be able to decompress truncated gzip");
+        }
+        catch (EOFException e)
+        {
+            // pass
+        }
+    }
+
     private String getDecompressedContent(final String url) throws IOException
     {
         HttpURLConnection connection = _restTestHelper.openManagementConnection(url, "GET");



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org