You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2016/09/19 08:35:06 UTC

svn commit: r1761399 - in /qpid/java/trunk: broker-core/src/main/java/org/apache/qpid/server/model/ broker-core/src/main/java/org/apache/qpid/server/queue/ broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/ broker-pl...

Author: orudyy
Date: Mon Sep 19 08:35:06 2016
New Revision: 1761399

URL: http://svn.apache.org/viewvc?rev=1761399&view=rev
Log:
QPID-7408: Address review comments

Added:
    qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/GZIPOutputStreamAdapter.java
    qpid/java/trunk/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/GZIPOutputStreamAdapterTest.java
Removed:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/model/StreamingContent.java
Modified:
    qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
    qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java
    qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java

Modified: qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java?rev=1761399&r1=1761398&r2=1761399&view=diff
==============================================================================
--- qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java (original)
+++ qpid/java/trunk/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java Mon Sep 19 08:35:06 2016
@@ -21,7 +21,6 @@ package org.apache.qpid.server.queue;
 import static org.apache.qpid.server.util.ParameterizedTypes.MAP_OF_STRING_STRING;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.io.OutputStream;
 import java.io.UnsupportedEncodingException;
 import java.net.URLEncoder;
@@ -110,7 +109,6 @@ import org.apache.qpid.server.util.MapVa
 import org.apache.qpid.server.util.ServerScopedRuntimeException;
 import org.apache.qpid.server.util.StateChangeListener;
 import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException;
-import org.apache.qpid.streams.CompositeInputStream;
 import org.apache.qpid.transport.TransportException;
 
 public abstract class AbstractQueue<X extends AbstractQueue<X>>
@@ -2781,7 +2779,7 @@ public abstract class AbstractQueue<X ex
         }
     }
 
-    class MessageContent extends BaseMessageContent implements CustomRestHeaders, StreamingContent
+    class MessageContent extends BaseMessageContent implements CustomRestHeaders
     {
 
         MessageContent(MessageReference<?> messageReference, long limit)
@@ -2843,41 +2841,13 @@ public abstract class AbstractQueue<X ex
             return super.getContentDisposition();
         }
 
-        @Override
-        public InputStream getInputStream()
-        {
-            ServerMessage message = _messageReference.getMessage();
-            final Collection<QpidByteBuffer> content = message.getContent(0, (int) message.getSize());
-            Collection<InputStream> streams = new ArrayList<>(content.size());
-            for (QpidByteBuffer b : content)
-            {
-                streams.add(b.asInputStream());
-            }
-            return new CompositeInputStream(streams)
-            {
-                @Override
-                public void close() throws IOException
-                {
-                    try
-                    {
-                        super.close();
-                    }
-                    finally
-                    {
-                        for (QpidByteBuffer b : content)
-                        {
-                            b.dispose();
-                        }
-                    }
-                }
-            };
-        }
-
-        @Override
-        public long getLimit()
+        @SuppressWarnings("unused")
+        @RestContentHeader("X-Content-Limit")
+        public Long getContentLimit()
         {
             return _limit;
         }
+
     }
 
     private static class AcquireAllQueueEntryFilter implements QueueEntryFilter

Added: qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/GZIPOutputStreamAdapter.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/GZIPOutputStreamAdapter.java?rev=1761399&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/GZIPOutputStreamAdapter.java (added)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/GZIPOutputStreamAdapter.java Mon Sep 19 08:35:06 2016
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.management.plugin;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.util.zip.GZIPInputStream;
+
+public class GZIPOutputStreamAdapter extends OutputStream
+{
+    private static final int DEFAULT_BUFFER_SIZE = 2048;
+    private static final int MIN_GZIP_HEADER_SIZE = 10; // |ID1|ID2|CM |FLG|MTIME (4 bytes)|XFL|OS |  ...excluding file name and comment
+
+    private final OutputStream _targetOutputStream;
+    private final BufferInputStream _bufferInputStream;
+    private final int _size;
+    private final long _limit;
+
+    private GZIPInputStream _gzipInputStream;
+    private long _writtenBytesNumber;
+
+    public GZIPOutputStreamAdapter(final OutputStream targetOutputStream, final long limit)
+    {
+        this(targetOutputStream, DEFAULT_BUFFER_SIZE, limit);
+    }
+
+    public GZIPOutputStreamAdapter(final OutputStream targetOutputStream, final int size, final long limit)
+    {
+        if (size < MIN_GZIP_HEADER_SIZE)
+        {
+            throw new IllegalArgumentException("Buffer size should be greater than or equal " + MIN_GZIP_HEADER_SIZE);
+        }
+        _size = size;
+        _targetOutputStream = targetOutputStream;
+        _bufferInputStream = new BufferInputStream(size * 2);
+        _limit = limit;
+    }
+
+    @Override
+    public synchronized void close() throws IOException
+    {
+        try
+        {
+            if (_gzipInputStream != null)
+            {
+                _gzipInputStream.close();
+                _gzipInputStream = null;
+            }
+            _bufferInputStream.close();
+        }
+        finally
+        {
+            _targetOutputStream.close();
+        }
+    }
+
+    @Override
+    public synchronized void write(final int byteValue) throws IOException
+    {
+        this.write(new byte[]{(byte) byteValue}, 0, 1);
+    }
+
+    @Override
+    public synchronized void write(final byte data[], final int offset, final int length) throws IOException
+    {
+        if (_limit < 0 || (_limit > 0 && _writtenBytesNumber < _limit))
+        {
+            int numberOfWrittenBytes = 0;
+            do
+            {
+                numberOfWrittenBytes +=
+                        _bufferInputStream.write(data, offset + numberOfWrittenBytes, length - numberOfWrittenBytes);
+                if (_gzipInputStream == null)
+                {
+                    _bufferInputStream.mark(_size);
+                    try
+                    {
+                        _gzipInputStream = new GZIPInputStream(_bufferInputStream, _size);
+                    }
+                    catch (IOException e)
+                    {
+                        // no sufficient bytes to read gzip header
+                        _bufferInputStream.reset();
+                    }
+                }
+
+                if (_gzipInputStream != null && _bufferInputStream.available() > 0)
+                {
+                    tryToDecompressAndWrite();
+                }
+            }
+            while (numberOfWrittenBytes < length);
+        }
+    }
+
+    private void tryToDecompressAndWrite() throws IOException
+    {
+        int b = -1;
+        do
+        {
+            try
+            {
+                b = _gzipInputStream.read();
+            }
+            catch (EOFException e)
+            {
+                // no sufficient data to decompress
+                break;
+            }
+
+            if (b != -1)
+            {
+                _targetOutputStream.write(b);
+                _writtenBytesNumber++;
+
+                if (_limit > 0 && _writtenBytesNumber == _limit)
+                {
+                    break;
+                }
+            }
+        }
+        while (b != -1);
+    }
+
+    private static final class BufferInputStream extends InputStream
+    {
+        private final ByteBuffer _byteBuffer;
+
+        private BufferInputStream(int size)
+        {
+            if (size <= 0)
+            {
+                throw new IllegalArgumentException("Buffer size should be greater than zero");
+            }
+            _byteBuffer = ByteBuffer.allocate(size);
+            _byteBuffer.limit(0);
+        }
+
+        @Override
+        public int read() throws IOException
+        {
+            if (_byteBuffer.hasRemaining())
+            {
+                return _byteBuffer.get() & 0xFF;
+            }
+            return -1;
+        }
+
+
+        @Override
+        public int read(byte[] data, int offset, int length) throws IOException
+        {
+            if (!_byteBuffer.hasRemaining())
+            {
+                return -1;
+            }
+            if (_byteBuffer.remaining() < length)
+            {
+                length = _byteBuffer.remaining();
+            }
+            _byteBuffer.get(data, offset, length);
+
+            return length;
+        }
+
+        @Override
+        public void mark(int readlimit)
+        {
+            _byteBuffer.mark();
+        }
+
+        @Override
+        public void reset() throws IOException
+        {
+            _byteBuffer.reset();
+        }
+
+        @Override
+        public boolean markSupported()
+        {
+            return true;
+        }
+
+        @Override
+        public long skip(long n) throws IOException
+        {
+            _byteBuffer.position(_byteBuffer.position() + (int) n);
+            return n;
+        }
+
+        @Override
+        public int available() throws IOException
+        {
+            return _byteBuffer.remaining();
+        }
+
+        @Override
+        public void close()
+        {
+
+        }
+
+        public int write(byte[] data, int offset, int length)
+        {
+            int numberOfBytes = 0;
+            int capacity = _byteBuffer.capacity();
+            int remaining = _byteBuffer.remaining();
+            if (remaining == 0)
+            {
+                numberOfBytes = Math.min(length, capacity);
+                _byteBuffer.position(0);
+                _byteBuffer.limit(numberOfBytes);
+                _byteBuffer.put(data, offset, numberOfBytes);
+                _byteBuffer.flip();
+            }
+            else if (remaining < capacity)
+            {
+                byte[] array = _byteBuffer.array();
+                int position = _byteBuffer.position();
+                if (position > 0)
+                {
+                    for (int i = 0; i < remaining; i++)
+                    {
+                        array[0] = array[i + position];
+                    }
+                }
+
+                numberOfBytes = Math.min(length, capacity - remaining);
+                System.arraycopy(data, offset, array, remaining, numberOfBytes);
+
+                _byteBuffer.position(0);
+                _byteBuffer.limit(remaining + numberOfBytes);
+            }
+            return numberOfBytes;
+        }
+    }
+}

Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java?rev=1761399&r1=1761398&r2=1761399&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/HttpManagementUtil.java Mon Sep 19 08:35:06 2016
@@ -178,7 +178,7 @@ public class HttpManagementUtil
             throws IOException
     {
         OutputStream outputStream;
-        if(isCompressing(request, managementConfiguration))
+        if(isCompressingAccepted(request, managementConfiguration))
         {
             outputStream = new GZIPOutputStream(response.getOutputStream());
             response.setHeader(CONTENT_ENCODING_HEADER, GZIP_CONTENT_ENCODING);
@@ -190,8 +190,8 @@ public class HttpManagementUtil
         return outputStream;
     }
 
-    public static boolean isCompressing(final HttpServletRequest request,
-                                        final HttpManagementConfiguration managementConfiguration)
+    public static boolean isCompressingAccepted(final HttpServletRequest request,
+                                                final HttpManagementConfiguration managementConfiguration)
     {
         return managementConfiguration.isCompressResponses()
                && Collections.list(request.getHeaderNames()).contains(ACCEPT_ENCODING_HEADER)

Modified: qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java?rev=1761399&r1=1761398&r2=1761399&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java (original)
+++ qpid/java/trunk/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/AbstractServlet.java Mon Sep 19 08:35:06 2016
@@ -32,7 +32,6 @@ import java.security.PrivilegedException
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
-import java.util.zip.GZIPInputStream;
 
 import javax.security.auth.Subject;
 import javax.servlet.ServletConfig;
@@ -48,7 +47,6 @@ import org.apache.qpid.server.model.Rest
 
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.SerializationFeature;
-import com.google.common.io.ByteStreams;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -56,14 +54,15 @@ import org.apache.qpid.server.management
 import org.apache.qpid.server.management.plugin.HttpManagementUtil;
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.ConfiguredObjectJacksonModule;
-import org.apache.qpid.server.model.StreamingContent;
 import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
+import org.apache.qpid.server.management.plugin.GZIPOutputStreamAdapter;
 
 public abstract class AbstractServlet extends HttpServlet
 {
     public static final int SC_UNPROCESSABLE_ENTITY = 422;
     private static final Logger LOGGER = LoggerFactory.getLogger(AbstractServlet.class);
     public static final String CONTENT_DISPOSITION = "Content-disposition";
+    public static final String CONTENT_LIMIT = "X-Content-Limit";
 
     private Broker<?> _broker;
     private HttpManagementConfiguration _managementConfiguration;
@@ -330,19 +329,22 @@ public abstract class AbstractServlet ex
     {
         Map<String, Object> headers = getResponseHeaders(content);
         boolean isGzipCompressed = GZIP_CONTENT_ENCODING.equals(headers.get(CONTENT_ENCODING_HEADER.toUpperCase()));
-        boolean isCompressing = HttpManagementUtil.isCompressing(request, _managementConfiguration);
-
-        if (isGzipCompressed && content instanceof StreamingContent
-            && ((((StreamingContent) content).getLimit() >= 0 || !isCompressing)))
+        boolean isCompressingAccepted = HttpManagementUtil.isCompressingAccepted(request, _managementConfiguration);
+        if (isGzipCompressed && !isCompressingAccepted)
         {
             headers.remove(CONTENT_ENCODING_HEADER);
-            content = new DecompressingContent((StreamingContent) content);
-            isGzipCompressed = false;
+        }
+        long limit = -1;
+        if (headers.containsKey(CONTENT_LIMIT.toUpperCase()))
+        {
+            limit = Long.parseLong(String.valueOf(headers.get(CONTENT_LIMIT.toUpperCase())));
         }
 
-        try (OutputStream os = isGzipCompressed && isCompressing
+        try (OutputStream os = isGzipCompressed && isCompressingAccepted && limit < 0
                 ? response.getOutputStream()
-                : getOutputStream(request, response))
+                : isGzipCompressed && (!isCompressingAccepted || limit >= 0)
+                        ? new GZIPOutputStreamAdapter(getOutputStream(request, response), limit)
+                        : getOutputStream(request, response))
         {
             response.setStatus(HttpServletResponse.SC_OK);
             for (Map.Entry<String, Object> entry : headers.entrySet())
@@ -422,33 +424,4 @@ public abstract class AbstractServlet ex
         return results;
     }
 
-    private class DecompressingContent implements Content
-    {
-        private final StreamingContent _content;
-
-        public DecompressingContent(final StreamingContent content)
-        {
-            _content = content;
-        }
-
-        @Override
-        public void write(final OutputStream os) throws IOException
-        {
-            try (GZIPInputStream gzipInputStream = new GZIPInputStream(_content.getInputStream());)
-            {
-                ByteStreams.copy(_content.getLimit() >= 0
-                                         ? ByteStreams.limit(gzipInputStream, _content.getLimit())
-                                         : gzipInputStream, os);
-            }
-        }
-
-        @Override
-        public void release()
-        {
-            if (_content instanceof Content)
-            {
-                ((Content)_content).release();
-            }
-        }
-    }
 }

Added: qpid/java/trunk/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/GZIPOutputStreamAdapterTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/GZIPOutputStreamAdapterTest.java?rev=1761399&view=auto
==============================================================================
--- qpid/java/trunk/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/GZIPOutputStreamAdapterTest.java (added)
+++ qpid/java/trunk/broker-plugins/management-http/src/test/java/org/apache/qpid/server/management/plugin/GZIPOutputStreamAdapterTest.java Mon Sep 19 08:35:06 2016
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.management.plugin;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.zip.GZIPOutputStream;
+
+import org.apache.qpid.test.utils.QpidTestCase;
+
+
+public class GZIPOutputStreamAdapterTest extends QpidTestCase
+{
+
+    public void testDecompressing() throws IOException
+    {
+        String testText = generateTestText();
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        GZIPOutputStreamAdapter adapter = new GZIPOutputStreamAdapter(outputStream, 128, -1);
+
+        compressAndDecompressWithAdapter(testText, adapter);
+
+        assertEquals("Unexpected content", testText, new String(outputStream.toByteArray()));
+    }
+
+    public void testDecompressingLimited() throws IOException
+    {
+        String testText = generateTestText();
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        GZIPOutputStreamAdapter adapter = new GZIPOutputStreamAdapter(outputStream, 128, testText.length() / 2);
+
+        compressAndDecompressWithAdapter(testText, adapter);
+
+        assertEquals("Unexpected content",
+                     testText.substring(0, testText.length() / 2),
+                     new String(outputStream.toByteArray()));
+    }
+
+    private void compressAndDecompressWithAdapter(final String testText, final GZIPOutputStreamAdapter adapter) throws IOException
+    {
+        byte[] data = compress(testText);
+        byte[] buffer = new byte[256];
+        int remaining = data.length;
+        int written = 0;
+        while (remaining > 0)
+        {
+            int length = Math.min(remaining, buffer.length);
+            System.arraycopy(data, written, buffer, 0, length);
+            adapter.write(buffer);
+            written += length;
+            remaining -= length;
+        }
+    }
+
+    private byte[] compress(final String testText) throws IOException
+    {
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
+        try (GZIPOutputStream zipStream = new GZIPOutputStream(baos))
+        {
+            zipStream.write(testText.getBytes());
+        }
+        return baos.toByteArray();
+    }
+
+    private String generateTestText()
+    {
+        StringBuilder sb = new StringBuilder();
+        int i = 0;
+        while (sb.length() < 5000)
+        {
+            sb.append(" A simple test text ").append(i++);
+        }
+        return sb.toString();
+    }
+}
\ No newline at end of file



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org