You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2016/09/27 11:59:09 UTC
svn commit: r1762460 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/java/org/apache/qpid/server/queue/
broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/
broker-pl...
Author: lquack
Date: Tue Sep 27 11:59:09 2016
New Revision: 1762460
URL: http://svn.apache.org/viewvc?rev=1762460&view=rev
Log:
QPID-7409: [Java Broker] Move responsibility to limit message content to ManagedOperation getMessageContent
* returnJson always returns uncompressed data
* new parameter decompressBeforeLimiting (default "false")
Added:
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferInputStream.java
Removed:
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/LimitingOutputStream.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java
qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java?rev=1762460&r1=1762459&r2=1762460&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/Queue.java Tue Sep 27 11:59:09 2016
@@ -311,7 +311,11 @@ public interface Queue<X extends Queue<X
@Param(name = "returnJson", defaultValue = "false",
description = "If true, converts message content into json format"
+ " if message mime-type is either amqp/map or amqp/list"
- + " or jms/map-message. Default is false.") boolean returnJson);
+ + " or jms/map-message. Default is false.") boolean returnJson,
+ @Param(name = "decompressBeforeLimiting", defaultValue = "false",
+ description = "If true, the operation will attempt to decompress the message"
+ + "(should it be compressed) before applying any limit. If"
+ + "decompression fails the operation will fail.") boolean decompressBeforeLimiting);
@ManagedOperation(nonModifying = true, paramRequiringSecure = "includeHeaders")
List<MessageInfo> getMessageInfo(@Param(name = "first", defaultValue = "-1") int first,
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1762460&r1=1762459&r2=1762460&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Tue Sep 27 11:59:09 2016
@@ -19,8 +19,10 @@
package org.apache.qpid.server.queue;
import static org.apache.qpid.server.util.ParameterizedTypes.MAP_OF_STRING_STRING;
+import static org.apache.qpid.util.GZIPUtils.GZIP_CONTENT_ENCODING;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
@@ -31,6 +33,7 @@ import java.security.AccessController;
import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
@@ -50,9 +53,12 @@ import java.util.concurrent.RejectedExec
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
import javax.security.auth.Subject;
+import com.google.common.io.ByteStreams;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
@@ -60,7 +66,7 @@ import com.google.common.util.concurrent
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.bytebuffer.QpidByteBufferInputStream;
import org.apache.qpid.filter.SelectorParsingException;
import org.apache.qpid.filter.selector.ParseException;
import org.apache.qpid.filter.selector.TokenMgrError;
@@ -110,7 +116,6 @@ import org.apache.qpid.server.util.Serve
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
import org.apache.qpid.transport.TransportException;
-import org.apache.qpid.util.GZIPUtils;
public abstract class AbstractQueue<X extends AbstractQueue<X>>
extends AbstractConfiguredObject<X>
@@ -2690,26 +2695,54 @@ public abstract class AbstractQueue<X ex
}
}
- abstract class BaseMessageContent implements Content
+ abstract class BaseMessageContent implements Content, CustomRestHeaders
{
public static final int UNLIMITED = -1;
protected final MessageReference<?> _messageReference;
protected final long _limit;
+ private final boolean _truncated;
BaseMessageContent(MessageReference<?> messageReference, long limit)
{
_messageReference = messageReference;
_limit = limit;
+ _truncated = limit >= 0 && _messageReference.getMessage().getSize() > limit;
}
@Override
- public void release()
+ public final void release()
{
_messageReference.release();
}
- public abstract String getContentType();
+ protected boolean isTruncated()
+ {
+ return _truncated;
+ }
+ @SuppressWarnings("unused")
+ @RestContentHeader("X-Content-Truncated")
+ public String getContentTruncated()
+ {
+ return String.valueOf(isTruncated());
+ }
+
+ @SuppressWarnings("unused")
+ @RestContentHeader("Content-Type")
+ public String getContentType()
+ {
+ return _messageReference.getMessage().getMessageHeader().getMimeType();
+ }
+
+ @SuppressWarnings("unused")
+ @RestContentHeader("Content-Encoding")
+ public String getContentEncoding()
+ {
+ return _messageReference.getMessage().getMessageHeader().getEncoding();
+ }
+
+ @SuppressWarnings("unused")
+ @RestContentHeader("Content-Disposition")
public String getContentDisposition()
{
try
@@ -2739,114 +2772,89 @@ public abstract class AbstractQueue<X ex
}
}
- class JsonMessageContent extends BaseMessageContent implements CustomRestHeaders
+ class JsonMessageContent extends BaseMessageContent
{
private final InternalMessage _internalMessage;
- private boolean _truncate = false;
JsonMessageContent(MessageReference<?> messageReference, InternalMessage message, long limit)
{
super(messageReference, limit);
_internalMessage = message;
- _truncate = limit >= 0 && _messageReference.getMessage().getSize() > limit;
}
@Override
- public void write(final OutputStream outputStream) throws IOException
+ public void write(OutputStream outputStream) throws IOException
{
Object messageBody = _internalMessage.getMessageBody();
- new MessageContentJsonConverter(messageBody, _truncate ? _limit : UNLIMITED).convertAndWrite(outputStream);
+ new MessageContentJsonConverter(messageBody, isTruncated() ? _limit : UNLIMITED).convertAndWrite(outputStream);
}
@SuppressWarnings("unused")
- @RestContentHeader("X-Content-Truncated")
- public String getContentTruncated()
+ @Override
+ @RestContentHeader("Content-Encoding")
+ public String getContentEncoding()
{
- return String.valueOf(_truncate);
+ return "identity";
}
@SuppressWarnings("unused")
+ @Override
@RestContentHeader("Content-Type")
public String getContentType()
{
return "application/json";
}
-
- @SuppressWarnings("unused")
- @RestContentHeader("Content-Disposition")
- public String getContentDisposition()
- {
- return super.getContentDisposition();
- }
}
- class MessageContent extends BaseMessageContent implements CustomRestHeaders
+ class MessageContent extends BaseMessageContent
{
- MessageContent(MessageReference<?> messageReference, long limit)
- {
- super(messageReference, limit);
- }
+ private boolean _decompressBeforeLimiting;
- @Override
- public void write(OutputStream outputStream) throws IOException
+ MessageContent(MessageReference<?> messageReference, long limit, boolean decompressBeforeLimiting)
{
- ServerMessage message = _messageReference.getMessage();
- int length = (int) ((_limit == UNLIMITED || GZIPUtils.GZIP_CONTENT_ENCODING.equals(getContentEncoding())) ? message.getSize() : _limit);
- Collection<QpidByteBuffer> content = message.getContent(0, length);
- try
+ super(messageReference, limit);
+ if (decompressBeforeLimiting)
{
- for (QpidByteBuffer b : content)
+ String contentEncoding = getContentEncoding();
+ if (GZIP_CONTENT_ENCODING.equals(contentEncoding))
{
- int len = b.remaining();
- byte[] data = new byte[len];
- b.get(data);
- outputStream.write(data);
+ _decompressBeforeLimiting = true;
}
- }
- finally
- {
- for (QpidByteBuffer b : content)
+ else if (contentEncoding != null && !"".equals(contentEncoding) && !"identity".equals(contentEncoding))
{
- b.dispose();
+ throw new IllegalArgumentException(String.format(
+ "Requested decompression of message with unknown compression '%s'", contentEncoding));
}
}
}
@Override
- public void release()
- {
- _messageReference.release();
- }
-
- @SuppressWarnings("unused")
- @RestContentHeader("Content-Type")
- public String getContentType()
+ public void write(OutputStream outputStream) throws IOException
{
- return _messageReference.getMessage().getMessageHeader().getMimeType();
- }
+ ServerMessage message = _messageReference.getMessage();
- @SuppressWarnings("unused")
- @RestContentHeader("Content-Encoding")
- public String getContentEncoding()
- {
- return _messageReference.getMessage().getMessageHeader().getEncoding();
- }
+ int length = (int) ((_limit == UNLIMITED || _decompressBeforeLimiting) ? message.getSize() : _limit);
+ InputStream inputStream = new QpidByteBufferInputStream(message.getContent(0, length));
- @SuppressWarnings("unused")
- @RestContentHeader("Content-Disposition")
- public String getContentDisposition()
- {
- return super.getContentDisposition();
- }
+ if (_limit != UNLIMITED && _decompressBeforeLimiting)
+ {
+ inputStream = new GZIPInputStream(inputStream);
+ inputStream = ByteStreams.limit(inputStream, _limit);
+ outputStream = new GZIPOutputStream(outputStream);
+ }
- @SuppressWarnings("unused")
- @RestContentHeader("X-Content-Limit")
- public Long getContentLimit()
- {
- return _limit;
+ try
+ {
+ long foo = ByteStreams.copy(inputStream, outputStream);
+ foo = foo +1 -1;
+ }
+ finally
+ {
+ outputStream.close();
+ inputStream.close();
+ }
}
-
}
private static class AcquireAllQueueEntryFilter implements QueueEntryFilter
@@ -3613,13 +3621,13 @@ public abstract class AbstractQueue<X ex
}
@Override
- public Content getMessageContent(final long messageId, final long limit, boolean returnJson)
+ public Content getMessageContent(final long messageId, final long limit, boolean returnJson, boolean decompressBeforeLimiting)
{
final MessageContentFinder messageFinder = new MessageContentFinder(messageId);
visit(messageFinder);
if (messageFinder.isFound())
{
- return createMessageContent(messageFinder.getMessageReference(), returnJson, limit);
+ return createMessageContent(messageFinder.getMessageReference(), returnJson, limit, decompressBeforeLimiting);
}
else
{
@@ -3629,7 +3637,8 @@ public abstract class AbstractQueue<X ex
private Content createMessageContent(final MessageReference<?> messageReference,
final boolean returnJson,
- final long limit)
+ final long limit,
+ final boolean decompressBeforeLimiting)
{
String mimeType = messageReference.getMessage().getMessageHeader().getMimeType();
if (returnJson && ("amqp/list".equalsIgnoreCase(mimeType)
@@ -3653,7 +3662,7 @@ public abstract class AbstractQueue<X ex
}
}
}
- return new MessageContent(messageReference, limit);
+ return new MessageContent(messageReference, limit, decompressBeforeLimiting);
}
@Override
Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java?rev=1762460&r1=1762459&r2=1762460&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java Tue Sep 27 11:59:09 2016
@@ -50,7 +50,6 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.server.management.plugin.GunzipOutputStream;
import org.apache.qpid.server.management.plugin.HttpManagementConfiguration;
import org.apache.qpid.server.management.plugin.HttpManagementUtil;
-import org.apache.qpid.server.management.plugin.LimitingOutputStream;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObjectJacksonModule;
import org.apache.qpid.server.model.Content;
@@ -63,7 +62,6 @@ public abstract class AbstractServlet ex
public static final int SC_UNPROCESSABLE_ENTITY = 422;
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractServlet.class);
public static final String CONTENT_DISPOSITION = "Content-disposition";
- public static final String CONTENT_LIMIT = "X-Content-Limit";
private Broker<?> _broker;
private HttpManagementConfiguration _managementConfiguration;
@@ -350,35 +348,17 @@ public abstract class AbstractServlet ex
final HttpServletResponse response,
Map<String, Object> headers) throws IOException
{
- boolean isGzipCompressed = GZIP_CONTENT_ENCODING.equals(headers.get(CONTENT_ENCODING_HEADER.toUpperCase()));
- boolean isCompressingAccepted = HttpManagementUtil.isCompressingAccepted(request, _managementConfiguration);
- if (isGzipCompressed && !isCompressingAccepted)
- {
- headers.remove(CONTENT_ENCODING_HEADER);
- }
- long limit = -1;
- if (headers.containsKey(CONTENT_LIMIT.toUpperCase()))
- {
- limit = Long.parseLong(String.valueOf(headers.get(CONTENT_LIMIT.toUpperCase())));
- }
+ final boolean isGzipCompressed = GZIP_CONTENT_ENCODING.equals(headers.get(CONTENT_ENCODING_HEADER.toUpperCase()));
+ final boolean isCompressingAccepted = HttpManagementUtil.isCompressingAccepted(request, _managementConfiguration);
+
OutputStream stream = response.getOutputStream();
if (isGzipCompressed)
{
if (!isCompressingAccepted)
{
- if (limit > 0)
- {
- stream = new LimitingOutputStream(stream, limit);
- }
stream = new GunzipOutputStream(stream);
- }
- else
- {
- if (limit > 0)
- {
- stream = new GunzipOutputStream(new LimitingOutputStream(new GZIPOutputStream(stream), limit));
- }
+ headers.remove(CONTENT_ENCODING_HEADER.toUpperCase());
}
}
else
@@ -386,6 +366,7 @@ public abstract class AbstractServlet ex
if (isCompressingAccepted)
{
stream = new GZIPOutputStream(stream);
+ headers.put(CONTENT_ENCODING_HEADER.toUpperCase(), GZIP_CONTENT_ENCODING);
}
}
Added: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferInputStream.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferInputStream.java?rev=1762460&view=auto
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferInputStream.java (added)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferInputStream.java Tue Sep 27 11:59:09 2016
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.bytebuffer;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import org.apache.qpid.streams.CompositeInputStream;
+
+/**
+ * InputStream implementation that takes a list QpidByteBuffers.
+ * The QpidByteBufferInputStream takes ownership of the buffers and disposes them on close().
+ *
+ * Not thread safe.
+ */
+public class QpidByteBufferInputStream extends InputStream
+{
+ private final CompositeInputStream _compositeInputStream;
+ private final Collection<QpidByteBuffer> _buffers;
+
+ public QpidByteBufferInputStream(Collection<QpidByteBuffer> buffers)
+ {
+ _buffers = buffers;
+
+ final Collection<InputStream> streams = new ArrayList<>(buffers.size());
+ for (QpidByteBuffer buffer : buffers)
+ {
+ streams.add(buffer.asInputStream());
+ }
+ _compositeInputStream = new CompositeInputStream(streams);
+ }
+
+ @Override
+ public int read() throws IOException
+ {
+ return _compositeInputStream.read();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException
+ {
+ return _compositeInputStream.read(b, off, len);
+ }
+
+ @Override
+ public void mark(int readlimit)
+ {
+ _compositeInputStream.mark(readlimit);
+ }
+
+ @Override
+ public void reset() throws IOException
+ {
+ _compositeInputStream.reset();
+ }
+
+ @Override
+ public boolean markSupported()
+ {
+ return _compositeInputStream.markSupported();
+ }
+
+ @Override
+ public long skip(long n) throws IOException
+ {
+ return _compositeInputStream.skip(n);
+ }
+
+ @Override
+ public int available() throws IOException
+ {
+ return _compositeInputStream.available();
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ try
+ {
+ _compositeInputStream.close();
+ }
+ finally
+ {
+ for (QpidByteBuffer buffer : _buffers)
+ {
+ buffer.dispose();
+ }
+ }
+ }
+}
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java?rev=1762460&r1=1762459&r2=1762460&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/systest/MessageCompressionTest.java Tue Sep 27 11:59:09 2016
@@ -21,6 +21,7 @@
package org.apache.qpid.systest;
import java.io.ByteArrayOutputStream;
+import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.net.HttpURLConnection;
@@ -209,7 +210,7 @@ public class MessageCompressionTest exte
String content = new String(messageBytes, StandardCharsets.UTF_8);
assertEquals("Unexpected message content :" + content, messageText, content);
- messageBytes = _restTestHelper.getBytes(queueRelativePath + "/getMessageContent?limit=1024&messageId=" + id);
+ messageBytes = _restTestHelper.getBytes(queueRelativePath + "/getMessageContent?limit=1024&decompressBeforeLimiting=true&messageId=" + id);
content = new String(messageBytes, StandardCharsets.UTF_8);
assertEquals("Unexpected message content :" + content, messageText.substring(0, 1024), content);
}
@@ -238,10 +239,42 @@ public class MessageCompressionTest exte
String content = getDecompressedContent(queueRelativePath + "/getMessageContent?messageId=" + id);
assertEquals("Unexpected message content :" + content, messageText, content);
- content = getDecompressedContent(queueRelativePath + "/getMessageContent?limit=1024&messageId=" + id);
+ content = getDecompressedContent(queueRelativePath + "/getMessageContent?limit=1024&decompressBeforeLimiting=true&messageId=" + id);
assertEquals("Unexpected message content :" + content, messageText.substring(0, 1024), content);
}
+ public void testGetTruncatedContentViaRestForCompressedMessage() throws Exception
+ {
+ setTestSystemProperty(Broker.BROKER_MESSAGE_COMPRESSION_ENABLED, String.valueOf(true));
+
+ doActualSetUp();
+
+ String messageText = createMessageText();
+ Connection senderConnection = getConnection(true);
+ String virtualPath = getConnectionFactory().getVirtualPath();
+ String testQueueName = getTestQueueName();
+ createAndBindQueue(virtualPath, testQueueName);
+
+ publishMessage(senderConnection, messageText);
+
+ String queueRelativePath = "queue" + virtualPath + virtualPath + "/" + testQueueName;
+
+ List<Map<String, Object>> messages = _restTestHelper.getJsonAsList(queueRelativePath + "/getMessageInfo");
+ assertEquals("Unexpected number of messages", 1, messages.size());
+ long id = ((Number) messages.get(0).get("id")).longValue();
+
+ _restTestHelper.setAcceptEncoding("gzip");
+ try
+ {
+ getDecompressedContent(queueRelativePath + "/getMessageContent?limit=1024&messageId=" + id);
+ fail("Should not be able to decompress truncated gzip");
+ }
+ catch (EOFException e)
+ {
+ // pass
+ }
+ }
+
private String getDecompressedContent(final String url) throws IOException
{
HttpURLConnection connection = _restTestHelper.openManagementConnection(url, "GET");
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org