You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2016/09/19 08:35:06 UTC
svn commit: r1761399 - in /qpid/java/trunk:
broker-core/src/main/java/org/apache/qpid/server/model/
broker-core/src/main/java/org/apache/qpid/server/queue/
broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/
broker-pl...
Author: orudyy
Date: Mon Sep 19 08:35:06 2016
New Revision: 1761399
URL: http://svn.apache.org/viewvc?rev=1761399&view=rev
Log:
QPID-7408: Address review comments
Added:
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/GZIPOutputStreamAdapter.java
qpid/java/trunk/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/GZIPOutputStreamAdapterTest.java
Removed:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/StreamingContent.java
Modified:
qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java
qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java
Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1761399&r1=1761398&r2=1761399&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Mon Sep 19 08:35:06 2016
@@ -21,7 +21,6 @@ package org.apache.qpid.server.queue;
import static org.apache.qpid.server.util.ParameterizedTypes.MAP_OF_STRING_STRING;
import java.io.IOException;
-import java.io.InputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
@@ -110,7 +109,6 @@ import org.apache.qpid.server.util.MapVa
import org.apache.qpid.server.util.ServerScopedRuntimeException;
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
-import org.apache.qpid.streams.CompositeInputStream;
import org.apache.qpid.transport.TransportException;
public abstract class AbstractQueue<X extends AbstractQueue<X>>
@@ -2781,7 +2779,7 @@ public abstract class AbstractQueue<X ex
}
}
- class MessageContent extends BaseMessageContent implements CustomRestHeaders, StreamingContent
+ class MessageContent extends BaseMessageContent implements CustomRestHeaders
{
MessageContent(MessageReference<?> messageReference, long limit)
@@ -2843,41 +2841,13 @@ public abstract class AbstractQueue<X ex
return super.getContentDisposition();
}
- @Override
- public InputStream getInputStream()
- {
- ServerMessage message = _messageReference.getMessage();
- final Collection<QpidByteBuffer> content = message.getContent(0, (int) message.getSize());
- Collection<InputStream> streams = new ArrayList<>(content.size());
- for (QpidByteBuffer b : content)
- {
- streams.add(b.asInputStream());
- }
- return new CompositeInputStream(streams)
- {
- @Override
- public void close() throws IOException
- {
- try
- {
- super.close();
- }
- finally
- {
- for (QpidByteBuffer b : content)
- {
- b.dispose();
- }
- }
- }
- };
- }
-
- @Override
- public long getLimit()
+ @SuppressWarnings("unused")
+ @RestContentHeader("X-Content-Limit")
+ public Long getContentLimit()
{
return _limit;
}
+
}
private static class AcquireAllQueueEntryFilter implements QueueEntryFilter
Added: qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/GZIPOutputStreamAdapter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/GZIPOutputStreamAdapter.java?rev=1761399&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/GZIPOutputStreamAdapter.java (added)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/GZIPOutputStreamAdapter.java Mon Sep 19 08:35:06 2016
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.management.plugin;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.zip.GZIPInputStream;
+
+public class GZIPOutputStreamAdapter extends OutputStream
+{
+ private static final int DEFAULT_BUFFER_SIZE = 2048;
+ private static final int MIN_GZIP_HEADER_SIZE = 10; // |ID1|ID2|CM |FLG|MTIME (4 bytes)|XFL|OS | ...excluding file name and comment
+
+ private final OutputStream _targetOutputStream;
+ private final BufferInputStream _bufferInputStream;
+ private final int _size;
+ private final long _limit;
+
+ private GZIPInputStream _gzipInputStream;
+ private long _writtenBytesNumber;
+
+ public GZIPOutputStreamAdapter(final OutputStream targetOutputStream, final long limit)
+ {
+ this(targetOutputStream, DEFAULT_BUFFER_SIZE, limit);
+ }
+
+ public GZIPOutputStreamAdapter(final OutputStream targetOutputStream, final int size, final long limit)
+ {
+ if (size < MIN_GZIP_HEADER_SIZE)
+ {
+ throw new IllegalArgumentException("Buffer size should be greater than or equal " + MIN_GZIP_HEADER_SIZE);
+ }
+ _size = size;
+ _targetOutputStream = targetOutputStream;
+ _bufferInputStream = new BufferInputStream(size * 2);
+ _limit = limit;
+ }
+
+ @Override
+ public synchronized void close() throws IOException
+ {
+ try
+ {
+ if (_gzipInputStream != null)
+ {
+ _gzipInputStream.close();
+ _gzipInputStream = null;
+ }
+ _bufferInputStream.close();
+ }
+ finally
+ {
+ _targetOutputStream.close();
+ }
+ }
+
+ @Override
+ public synchronized void write(final int byteValue) throws IOException
+ {
+ this.write(new byte[]{(byte) byteValue}, 0, 1);
+ }
+
+ @Override
+ public synchronized void write(final byte data[], final int offset, final int length) throws IOException
+ {
+ if (_limit < 0 || (_limit > 0 && _writtenBytesNumber < _limit))
+ {
+ int numberOfWrittenBytes = 0;
+ do
+ {
+ numberOfWrittenBytes +=
+ _bufferInputStream.write(data, offset + numberOfWrittenBytes, length - numberOfWrittenBytes);
+ if (_gzipInputStream == null)
+ {
+ _bufferInputStream.mark(_size);
+ try
+ {
+ _gzipInputStream = new GZIPInputStream(_bufferInputStream, _size);
+ }
+ catch (IOException e)
+ {
+ // no sufficient bytes to read gzip header
+ _bufferInputStream.reset();
+ }
+ }
+
+ if (_gzipInputStream != null && _bufferInputStream.available() > 0)
+ {
+ tryToDecompressAndWrite();
+ }
+ }
+ while (numberOfWrittenBytes < length);
+ }
+ }
+
+ private void tryToDecompressAndWrite() throws IOException
+ {
+ int b = -1;
+ do
+ {
+ try
+ {
+ b = _gzipInputStream.read();
+ }
+ catch (EOFException e)
+ {
+ // no sufficient data to decompress
+ break;
+ }
+
+ if (b != -1)
+ {
+ _targetOutputStream.write(b);
+ _writtenBytesNumber++;
+
+ if (_limit > 0 && _writtenBytesNumber == _limit)
+ {
+ break;
+ }
+ }
+ }
+ while (b != -1);
+ }
+
+ private static final class BufferInputStream extends InputStream
+ {
+ private final ByteBuffer _byteBuffer;
+
+ private BufferInputStream(int size)
+ {
+ if (size <= 0)
+ {
+ throw new IllegalArgumentException("Buffer size should be greater than zero");
+ }
+ _byteBuffer = ByteBuffer.allocate(size);
+ _byteBuffer.limit(0);
+ }
+
+ @Override
+ public int read() throws IOException
+ {
+ if (_byteBuffer.hasRemaining())
+ {
+ return _byteBuffer.get() & 0xFF;
+ }
+ return -1;
+ }
+
+
+ @Override
+ public int read(byte[] data, int offset, int length) throws IOException
+ {
+ if (!_byteBuffer.hasRemaining())
+ {
+ return -1;
+ }
+ if (_byteBuffer.remaining() < length)
+ {
+ length = _byteBuffer.remaining();
+ }
+ _byteBuffer.get(data, offset, length);
+
+ return length;
+ }
+
+ @Override
+ public void mark(int readlimit)
+ {
+ _byteBuffer.mark();
+ }
+
+ @Override
+ public void reset() throws IOException
+ {
+ _byteBuffer.reset();
+ }
+
+ @Override
+ public boolean markSupported()
+ {
+ return true;
+ }
+
+ @Override
+ public long skip(long n) throws IOException
+ {
+ _byteBuffer.position(_byteBuffer.position() + (int) n);
+ return n;
+ }
+
+ @Override
+ public int available() throws IOException
+ {
+ return _byteBuffer.remaining();
+ }
+
+ @Override
+ public void close()
+ {
+
+ }
+
+ public int write(byte[] data, int offset, int length)
+ {
+ int numberOfBytes = 0;
+ int capacity = _byteBuffer.capacity();
+ int remaining = _byteBuffer.remaining();
+ if (remaining == 0)
+ {
+ numberOfBytes = Math.min(length, capacity);
+ _byteBuffer.position(0);
+ _byteBuffer.limit(numberOfBytes);
+ _byteBuffer.put(data, offset, numberOfBytes);
+ _byteBuffer.flip();
+ }
+ else if (remaining < capacity)
+ {
+ byte[] array = _byteBuffer.array();
+ int position = _byteBuffer.position();
+ if (position > 0)
+ {
+ for (int i = 0; i < remaining; i++)
+ {
+ array[0] = array[i + position];
+ }
+ }
+
+ numberOfBytes = Math.min(length, capacity - remaining);
+ System.arraycopy(data, offset, array, remaining, numberOfBytes);
+
+ _byteBuffer.position(0);
+ _byteBuffer.limit(remaining + numberOfBytes);
+ }
+ return numberOfBytes;
+ }
+ }
+}
Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java?rev=1761399&r1=1761398&r2=1761399&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java Mon Sep 19 08:35:06 2016
@@ -178,7 +178,7 @@ public class HttpManagementUtil
throws IOException
{
OutputStream outputStream;
- if(isCompressing(request, managementConfiguration))
+ if(isCompressingAccepted(request, managementConfiguration))
{
outputStream = new GZIPOutputStream(response.getOutputStream());
response.setHeader(CONTENT_ENCODING_HEADER, GZIP_CONTENT_ENCODING);
@@ -190,8 +190,8 @@ public class HttpManagementUtil
return outputStream;
}
- public static boolean isCompressing(final HttpServletRequest request,
- final HttpManagementConfiguration managementConfiguration)
+ public static boolean isCompressingAccepted(final HttpServletRequest request,
+ final HttpManagementConfiguration managementConfiguration)
{
return managementConfiguration.isCompressResponses()
&& Collections.list(request.getHeaderNames()).contains(ACCEPT_ENCODING_HEADER)
Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java?rev=1761399&r1=1761398&r2=1761399&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java Mon Sep 19 08:35:06 2016
@@ -32,7 +32,6 @@ import java.security.PrivilegedException
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import java.util.zip.GZIPInputStream;
import javax.security.auth.Subject;
import javax.servlet.ServletConfig;
@@ -48,7 +47,6 @@ import org.apache.qpid.server.model.Rest
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
-import com.google.common.io.ByteStreams;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,14 +54,15 @@ import org.apache.qpid.server.management
import org.apache.qpid.server.management.plugin.HttpManagementUtil;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObjectJacksonModule;
-import org.apache.qpid.server.model.StreamingContent;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.management.plugin.GZIPOutputStreamAdapter;
public abstract class AbstractServlet extends HttpServlet
{
public static final int SC_UNPROCESSABLE_ENTITY = 422;
private static final Logger LOGGER = LoggerFactory.getLogger(AbstractServlet.class);
public static final String CONTENT_DISPOSITION = "Content-disposition";
+ public static final String CONTENT_LIMIT = "X-Content-Limit";
private Broker<?> _broker;
private HttpManagementConfiguration _managementConfiguration;
@@ -330,19 +329,22 @@ public abstract class AbstractServlet ex
{
Map<String, Object> headers = getResponseHeaders(content);
boolean isGzipCompressed = GZIP_CONTENT_ENCODING.equals(headers.get(CONTENT_ENCODING_HEADER.toUpperCase()));
- boolean isCompressing = HttpManagementUtil.isCompressing(request, _managementConfiguration);
-
- if (isGzipCompressed && content instanceof StreamingContent
- && ((((StreamingContent) content).getLimit() >= 0 || !isCompressing)))
+ boolean isCompressingAccepted = HttpManagementUtil.isCompressingAccepted(request, _managementConfiguration);
+ if (isGzipCompressed && !isCompressingAccepted)
{
headers.remove(CONTENT_ENCODING_HEADER);
- content = new DecompressingContent((StreamingContent) content);
- isGzipCompressed = false;
+ }
+ long limit = -1;
+ if (headers.containsKey(CONTENT_LIMIT.toUpperCase()))
+ {
+ limit = Long.parseLong(String.valueOf(headers.get(CONTENT_LIMIT.toUpperCase())));
}
- try (OutputStream os = isGzipCompressed && isCompressing
+ try (OutputStream os = isGzipCompressed && isCompressingAccepted && limit < 0
? response.getOutputStream()
- : getOutputStream(request, response))
+ : isGzipCompressed && (!isCompressingAccepted || limit >= 0)
+ ? new GZIPOutputStreamAdapter(getOutputStream(request, response), limit)
+ : getOutputStream(request, response))
{
response.setStatus(HttpServletResponse.SC_OK);
for (Map.Entry<String, Object> entry : headers.entrySet())
@@ -422,33 +424,4 @@ public abstract class AbstractServlet ex
return results;
}
- private class DecompressingContent implements Content
- {
- private final StreamingContent _content;
-
- public DecompressingContent(final StreamingContent content)
- {
- _content = content;
- }
-
- @Override
- public void write(final OutputStream os) throws IOException
- {
- try (GZIPInputStream gzipInputStream = new GZIPInputStream(_content.getInputStream());)
- {
- ByteStreams.copy(_content.getLimit() >= 0
- ? ByteStreams.limit(gzipInputStream, _content.getLimit())
- : gzipInputStream, os);
- }
- }
-
- @Override
- public void release()
- {
- if (_content instanceof Content)
- {
- ((Content)_content).release();
- }
- }
- }
}
Added: qpid/java/trunk/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/GZIPOutputStreamAdapterTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/GZIPOutputStreamAdapterTest.java?rev=1761399&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/GZIPOutputStreamAdapterTest.java (added)
+++ qpid/java/trunk/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/GZIPOutputStreamAdapterTest.java Mon Sep 19 08:35:06 2016
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.management.plugin;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.qpid.test.utils.QpidTestCase;
+
+
+public class GZIPOutputStreamAdapterTest extends QpidTestCase
+{
+
+ public void testDecompressing() throws IOException
+ {
+ String testText = generateTestText();
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ GZIPOutputStreamAdapter adapter = new GZIPOutputStreamAdapter(outputStream, 128, -1);
+
+ compressAndDecompressWithAdapter(testText, adapter);
+
+ assertEquals("Unexpected content", testText, new String(outputStream.toByteArray()));
+ }
+
+ public void testDecompressingLimited() throws IOException
+ {
+ String testText = generateTestText();
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ GZIPOutputStreamAdapter adapter = new GZIPOutputStreamAdapter(outputStream, 128, testText.length() / 2);
+
+ compressAndDecompressWithAdapter(testText, adapter);
+
+ assertEquals("Unexpected content",
+ testText.substring(0, testText.length() / 2),
+ new String(outputStream.toByteArray()));
+ }
+
+ private void compressAndDecompressWithAdapter(final String testText, final GZIPOutputStreamAdapter adapter) throws IOException
+ {
+ byte[] data = compress(testText);
+ byte[] buffer = new byte[256];
+ int remaining = data.length;
+ int written = 0;
+ while (remaining > 0)
+ {
+ int length = Math.min(remaining, buffer.length);
+ System.arraycopy(data, written, buffer, 0, length);
+ adapter.write(buffer);
+ written += length;
+ remaining -= length;
+ }
+ }
+
+ private byte[] compress(final String testText) throws IOException
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try (GZIPOutputStream zipStream = new GZIPOutputStream(baos))
+ {
+ zipStream.write(testText.getBytes());
+ }
+ return baos.toByteArray();
+ }
+
+ private String generateTestText()
+ {
+ StringBuilder sb = new StringBuilder();
+ int i = 0;
+ while (sb.length() < 5000)
+ {
+ sb.append(" A simple test text ").append(i++);
+ }
+ return sb.toString();
+ }
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org