You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/10/19 00:03:07 UTC

svn commit: r1709323 - in /qpid/java/trunk: broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ common/src/main/java/org/apache/qpid...

Author: kwall
Date: Sun Oct 18 22:03:07 2015
New Revision: 1709323

URL: http://svn.apache.org/viewvc?rev=1709323&view=rev
Log:
QPID-6800: [Java Broker] Use cached buffers for message compression/decompression

CompositeInputStream based on an implementation from Apache Axis2.

Added:
    qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStream.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/streams/
    qpid/java/trunk/common/src/main/java/org/apache/qpid/streams/CompositeInputStream.java
    qpid/java/trunk/common/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStreamTest.java
    qpid/java/trunk/common/src/test/java/org/apache/qpid/streams/
    qpid/java/trunk/common/src/test/java/org/apache/qpid/streams/CompositeInputStreamTest.java
Modified:
    qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
    qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
    qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
    qpid/java/trunk/common/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferTest.java

Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1709323&r1=1709322&r2=1709323&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Sun Oct 18 22:03:07 2015
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.server.protocol.v0_10;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -28,6 +29,9 @@ import java.util.concurrent.CopyOnWriteA
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import org.apache.qpid.bytebuffer.QpidByteBuffer;
 import org.apache.qpid.server.consumer.AbstractConsumerTarget;
 import org.apache.qpid.server.consumer.ConsumerImpl;
@@ -61,6 +65,7 @@ import org.apache.qpid.util.GZIPUtils;
 
 public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener
 {
+    private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerTarget_0_10.class);
 
     private static final Option[] BATCHED = new Option[] { Option.BATCH };
 
@@ -271,56 +276,50 @@ public class ConsumerTarget_0_10 extends
         boolean msgCompressed = messageProps != null && GZIPUtils.GZIP_CONTENT_ENCODING.equals(messageProps.getContentEncoding());
 
 
-        Collection<QpidByteBuffer> body = msg.getBody();
+        Collection<QpidByteBuffer> bodyBuffers = msg.getBody();
 
         boolean compressionSupported = _session.getConnection().getConnectionDelegate().isCompressionSupported();
 
-        if(msgCompressed && !compressionSupported)
+        if(msgCompressed && !compressionSupported && bodyBuffers != null)
         {
-            byte[] uncompressed = GZIPUtils.uncompressBufferToArray(ByteBufferUtils.combine(body));
-            if(uncompressed != null)
+            Collection<QpidByteBuffer> uncompressedBuffers = inflateIfPossible(bodyBuffers);
+            messageProps.setContentEncoding(null);
+            for (QpidByteBuffer buf : bodyBuffers)
             {
-                messageProps.setContentEncoding(null);
-                for (QpidByteBuffer buf : body)
-                {
-                    buf.dispose();
-                }
-                body = Collections.singleton(QpidByteBuffer.wrap(uncompressed));
+                buf.dispose();
             }
+            bodyBuffers = uncompressedBuffers;
         }
         else if(!msgCompressed
                 && compressionSupported
-                && (messageProps == null || messageProps.getContentEncoding()==null)
-                && body != null
-                && ByteBufferUtils.remaining(body) > _session.getConnection().getMessageCompressionThreshold())
+                && (messageProps == null || messageProps.getContentEncoding() == null)
+                && bodyBuffers != null
+                && ByteBufferUtils.remaining(bodyBuffers) > _session.getConnection().getMessageCompressionThreshold())
         {
-            byte[] compressed = GZIPUtils.compressBufferToArray(ByteBufferUtils.combine(body));
-            if(compressed != null)
+            Collection<QpidByteBuffer> compressedBuffers = deflateIfPossible(bodyBuffers);
+            if(messageProps == null)
             {
-                if(messageProps == null)
-                {
-                    messageProps = new MessageProperties();
-                }
-                messageProps.setContentEncoding(GZIPUtils.GZIP_CONTENT_ENCODING);
-                for (QpidByteBuffer buf : body)
-                {
-                    buf.dispose();
-                }
-                body = Collections.singleton(QpidByteBuffer.wrap(compressed));
+                messageProps = new MessageProperties();
             }
+            messageProps.setContentEncoding(GZIPUtils.GZIP_CONTENT_ENCODING);
+            for (QpidByteBuffer buf : bodyBuffers)
+            {
+                buf.dispose();
+            }
+            bodyBuffers = compressedBuffers;
         }
 
         Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
 
-        xfr = batch ? new MessageTransfer(_name,_acceptMode,_acquireMode,header, body, BATCHED)
-                    : new MessageTransfer(_name,_acceptMode,_acquireMode,header, body);
-        if (body != null)
+        xfr = batch ? new MessageTransfer(_name,_acceptMode,_acquireMode,header, bodyBuffers, BATCHED)
+                    : new MessageTransfer(_name,_acceptMode,_acquireMode,header, bodyBuffers);
+        if (bodyBuffers != null)
         {
-            for (QpidByteBuffer buf : body)
+            for (QpidByteBuffer buf : bodyBuffers)
             {
                 buf.dispose();
             }
-            body = null;
+            bodyBuffers = null;
         }
         if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED)
         {
@@ -712,4 +711,32 @@ public class ConsumerTarget_0_10 extends
     {
         return "ConsumerTarget_0_10[name=" + _name + ", session=" + _session.toLogString() + "]";
     }
+
+
+    private Collection<QpidByteBuffer> deflateIfPossible(final Collection<QpidByteBuffer> buffers)
+    {
+        try
+        {
+            return QpidByteBuffer.deflate(buffers);
+        }
+        catch (IOException e)
+        {
+            LOGGER.warn("Unable to compress message payload for consumer with gzip, message will be sent as is", e);
+            return null;
+        }
+    }
+
+    private Collection<QpidByteBuffer> inflateIfPossible(final Collection<QpidByteBuffer> buffers)
+    {
+        try
+        {
+            return QpidByteBuffer.inflate(buffers);
+        }
+        catch (IOException e)
+        {
+            LOGGER.warn("Unable to decompress message payload for consumer with gzip, message will be sent as is", e);
+            return null;
+        }
+    }
+
 }

Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java?rev=1709323&r1=1709322&r2=1709323&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java Sun Oct 18 22:03:07 2015
@@ -24,7 +24,6 @@ import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.List;
 
 import org.slf4j.Logger;
@@ -106,53 +105,55 @@ public class ProtocolOutputConverterImpl
 
         int bodySize = (int) message.getSize();
         boolean msgCompressed = isCompressed(contentHeaderBody);
-        byte[] modifiedContent;
+        Collection<QpidByteBuffer> modifiedContentBuffers = null;
 
-        // straight through case
         boolean compressionSupported = _connection.isCompressionSupported();
 
-        Collection<QpidByteBuffer> buffers = message.getContent();
+        Collection<QpidByteBuffer> contentBuffers = message.getContent();
 
         long length;
         if(msgCompressed
            && !compressionSupported
-           && (buffers != null)
-           && (modifiedContent = GZIPUtils.uncompressBufferToArray(
-                ByteBufferUtils.combine(buffers))) != null)
+           && (contentBuffers != null)
+           && (modifiedContentBuffers = inflateIfPossible(contentBuffers)) != null)
         {
             BasicContentHeaderProperties modifiedProps =
                     new BasicContentHeaderProperties(contentHeaderBody.getProperties());
             modifiedProps.setEncoding((String)null);
 
-            writeMessageDeliveryModified(channelId, deliverBody, modifiedProps, modifiedContent);
-
-            length = modifiedContent.length;
-        }
+            length = writeMessageDeliveryModified(modifiedContentBuffers, channelId, deliverBody, modifiedProps);
+       }
         else if(!msgCompressed
                 && compressionSupported
                 && contentHeaderBody.getProperties().getEncoding()==null
                 && bodySize > _connection.getMessageCompressionThreshold()
-                && (buffers != null)
-                && (modifiedContent = GZIPUtils.compressBufferToArray(ByteBufferUtils.combine(buffers))) != null)
+                && (contentBuffers != null)
+                && (modifiedContentBuffers = deflateIfPossible(contentBuffers)) != null)
         {
             BasicContentHeaderProperties modifiedProps =
                     new BasicContentHeaderProperties(contentHeaderBody.getProperties());
             modifiedProps.setEncoding(GZIP_ENCODING);
 
-            writeMessageDeliveryModified(channelId, deliverBody, modifiedProps, modifiedContent);
-
-            length = modifiedContent.length;
+            length = writeMessageDeliveryModified(modifiedContentBuffers, channelId, deliverBody, modifiedProps);
         }
         else
         {
-            writeMessageDeliveryUnchanged(buffers, contentHeaderBody, channelId, deliverBody, bodySize);
+            writeMessageDeliveryUnchanged(contentBuffers, channelId, deliverBody, contentHeaderBody, bodySize);
 
             length = bodySize;
         }
 
-        if (buffers != null)
+        if (contentBuffers != null)
+        {
+            for (QpidByteBuffer buf : contentBuffers)
+            {
+                buf.dispose();
+            }
+        }
+
+        if (modifiedContentBuffers != null)
         {
-            for (QpidByteBuffer buf : buffers)
+            for(QpidByteBuffer buf : modifiedContentBuffers)
             {
                 buf.dispose();
             }
@@ -161,25 +162,45 @@ public class ProtocolOutputConverterImpl
         return length;
     }
 
-    private int writeMessageDeliveryModified(final int channelId,
+    private Collection<QpidByteBuffer> deflateIfPossible(final Collection<QpidByteBuffer> buffers)
+    {
+        try
+        {
+            return QpidByteBuffer.deflate(buffers);
+        }
+        catch (IOException e)
+        {
+            LOGGER.warn("Unable to compress message payload for consumer with gzip, message will be sent as is", e);
+            return null;
+        }
+    }
+
+    private Collection<QpidByteBuffer> inflateIfPossible(final Collection<QpidByteBuffer> buffers)
+    {
+        try
+        {
+            return QpidByteBuffer.inflate(buffers);
+        }
+        catch (IOException e)
+        {
+            LOGGER.warn("Unable to decompress message payload for consumer with gzip, message will be sent as is", e);
+            return null;
+        }
+    }
+
+    private int writeMessageDeliveryModified(final Collection<QpidByteBuffer> contentBuffers, final int channelId,
                                              final AMQBody deliverBody,
-                                             final BasicContentHeaderProperties modifiedProps,
-                                             final byte[] content)
+                                             final BasicContentHeaderProperties modifiedProps)
     {
-        final int bodySize;
-        bodySize = content.length;
-        ContentHeaderBody modifiedHeaderBody =
-                new ContentHeaderBody(modifiedProps, bodySize);
-        writeMessageDeliveryUnchanged(Collections.singleton(QpidByteBuffer.wrap(content)),
-                                      modifiedHeaderBody, channelId, deliverBody, bodySize);
+        final int bodySize = ByteBufferUtils.remaining(contentBuffers);
+        ContentHeaderBody modifiedHeaderBody = new ContentHeaderBody(modifiedProps, bodySize);
+        writeMessageDeliveryUnchanged(contentBuffers, channelId, deliverBody, modifiedHeaderBody, bodySize);
         return bodySize;
     }
 
 
-    private void writeMessageDeliveryUnchanged(Collection<QpidByteBuffer> messageBuffers,
-                                               ContentHeaderBody contentHeaderBody,
-                                               int channelId,
-                                               AMQBody deliverBody,
+    private void writeMessageDeliveryUnchanged(Collection<QpidByteBuffer> contentBuffers,
+                                               int channelId, AMQBody deliverBody, ContentHeaderBody contentHeaderBody,
                                                int bodySize)
     {
         if (bodySize == 0)
@@ -198,7 +219,7 @@ public class ProtocolOutputConverterImpl
 
             int writtenSize = capacity;
 
-            AMQBody firstContentBody = new MessageContentSourceBody(messageBuffers, 0, capacity);
+            AMQBody firstContentBody = new MessageContentSourceBody(contentBuffers, 0, capacity);
 
             CompositeAMQBodyBlock
                     compositeBlock =
@@ -208,7 +229,7 @@ public class ProtocolOutputConverterImpl
             while (writtenSize < bodySize)
             {
                 capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
-                AMQBody body = new MessageContentSourceBody(messageBuffers, writtenSize, capacity);
+                AMQBody body = new MessageContentSourceBody(contentBuffers, writtenSize, capacity);
                 writtenSize += capacity;
 
                 writeFrame(new AMQFrame(channelId, body));

Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java?rev=1709323&r1=1709322&r2=1709323&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java Sun Oct 18 22:03:07 2015
@@ -20,6 +20,7 @@
  */
 package org.apache.qpid.bytebuffer;
 
+import java.io.BufferedOutputStream;
 import java.io.DataOutput;
 import java.io.IOException;
 import java.io.InputStream;
@@ -34,6 +35,8 @@ import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
 
 import javax.net.ssl.SSLEngine;
 import javax.net.ssl.SSLEngineResult;
@@ -44,6 +47,7 @@ import org.slf4j.LoggerFactory;
 
 import org.apache.qpid.codec.MarkableDataInput;
 import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.streams.CompositeInputStream;
 
 public final class QpidByteBuffer
 {
@@ -573,6 +577,69 @@ public final class QpidByteBuffer
         return engine.wrap(src, dest._buffer);
     }
 
+    public static Collection<QpidByteBuffer> inflate(Collection<QpidByteBuffer> compressedBuffers) throws IOException
+    {
+        if (compressedBuffers == null)
+        {
+            throw new IllegalArgumentException("buffer cannot be null");
+        }
+
+        boolean isDirect = false;
+        Collection<InputStream> streams = new ArrayList<>(compressedBuffers.size());
+        for (QpidByteBuffer buffer : compressedBuffers)
+        {
+            isDirect = isDirect || buffer.isDirect();
+            streams.add(buffer.asInputStream());
+        }
+
+        Collection<QpidByteBuffer> uncompressedBuffers = new ArrayList<>();
+        try (GZIPInputStream gzipInputStream = new GZIPInputStream(new CompositeInputStream(streams)))
+        {
+            byte[] buf = new byte[_pooledBufferSize];
+            int read;
+            while ((read = gzipInputStream.read(buf)) != -1)
+            {
+                QpidByteBuffer output = isDirect ? allocateDirect(read) : allocate(read);
+                output.put(buf, 0, read);
+                output.flip();
+                uncompressedBuffers.add(output);
+            }
+            return uncompressedBuffers;
+        }
+    }
+
+    public static Collection<QpidByteBuffer> deflate(Collection<QpidByteBuffer> uncompressedBuffers) throws IOException
+    {
+        if (uncompressedBuffers == null)
+        {
+            throw new IllegalArgumentException("buffer cannot be null");
+        }
+
+        boolean isDirect = false;
+        Collection<InputStream> streams = new ArrayList<>(uncompressedBuffers.size());
+        for (QpidByteBuffer buffer : uncompressedBuffers)
+        {
+            isDirect = isDirect || buffer.isDirect();
+            streams.add(buffer.asInputStream());
+        }
+
+        QpidByteBufferOutputStream compressedOutput = new QpidByteBufferOutputStream(isDirect, _pooledBufferSize);
+
+        try(InputStream compressedInput = new CompositeInputStream(streams);
+            GZIPOutputStream gzipStream = new GZIPOutputStream(new BufferedOutputStream(compressedOutput, _pooledBufferSize)))
+        {
+            byte[] buf = new byte[16384];
+            int read;
+            while ((read = compressedInput.read(buf)) > -1)
+            {
+                gzipStream.write(buf, 0, read);
+            }
+        }
+
+        // output pipeline will be already flushed and closed
+
+        return compressedOutput.fetchAccumulatedBuffers();
+    }
 
     public static long write(GatheringByteChannel channel, Collection<QpidByteBuffer> buffers) throws IOException
     {

Added: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStream.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStream.java?rev=1709323&view=auto
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStream.java (added)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStream.java Sun Oct 18 22:03:07 2015
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.bytebuffer;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+
+/**
+ * OutputStream implementation that yields a list QpidByteBuffers that contain a copy
+ * of the incoming bytes.  Use fetchAccumulatedBuffers to get the buffers.  Caller
+ * has responsibility to dispose the buffers after use.
+ *
+ * It will be normally be desirable to front this stream with java.io.BufferedOutputStream
+ * to minimise the number of write and thus the number of buffers created.
+ *
+ * Not thread safe.
+ */
+public class QpidByteBufferOutputStream extends OutputStream
+{
+    private final LinkedList<QpidByteBuffer> _buffers = new LinkedList<>();
+    private final boolean _isDirect;
+    private final int _maximumBufferSize;
+    private boolean _closed;
+
+    public QpidByteBufferOutputStream(final boolean isDirect, final int size)
+    {
+        _isDirect = isDirect;
+        _maximumBufferSize = size;
+    }
+
+    @Override
+    public void write(int b) throws IOException
+    {
+        int size = 1;
+        byte[] data = new byte[] {(byte)b};
+        allocateDataBuffers(data, 0, size);
+    }
+
+    @Override
+    public void write(byte[] data) throws IOException
+    {
+        write(data, 0, data.length);
+    }
+
+    @Override
+    public void write(byte[] data, int offset, int len) throws IOException
+    {
+        allocateDataBuffers(data, offset, len);
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+        _closed = true;
+    }
+
+    public Collection<QpidByteBuffer> fetchAccumulatedBuffers()
+    {
+        Collection<QpidByteBuffer> bufs = new ArrayList<>(_buffers);
+        _buffers.clear();
+        return bufs;
+    }
+
+    private void allocateDataBuffers(byte[] data, int offset, int len) throws IOException
+    {
+        if (_closed)
+        {
+            throw new IOException("Stream is closed");
+        }
+
+        int size = Math.min(_maximumBufferSize, len);
+
+        final QpidByteBuffer current =
+                _isDirect ? QpidByteBuffer.allocate(size) : QpidByteBuffer.allocateDirect(
+                        _maximumBufferSize);
+        current.put(data, offset, size);
+        current.flip();
+        _buffers.add(current);
+        if (len > size)
+        {
+            allocateDataBuffers(data, offset + size, len - size);
+        }
+    }
+}

Added: qpid/java/trunk/common/src/main/java/org/apache/qpid/streams/CompositeInputStream.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/streams/CompositeInputStream.java?rev=1709323&view=auto
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/streams/CompositeInputStream.java (added)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/streams/CompositeInputStream.java Sun Oct 18 22:03:07 2015
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.streams;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.LinkedList;
+
+/**
+ * Allows a series of input streams to be treated as if they were one.
+ * NotThreadSafe
+ */
+public class CompositeInputStream extends InputStream
+{
+    private final LinkedList<InputStream> _inputStreams;
+    private InputStream _current = null;
+
+    public CompositeInputStream(Collection<InputStream> streams)
+    {
+        if (streams == null)
+        {
+            throw new IllegalArgumentException("streams cannot be null");
+        }
+        _inputStreams = new LinkedList<>(streams);
+    }
+
+    @Override
+    public int read() throws IOException
+    {
+        int count = -1;
+        if (_current != null)
+        {
+            count = _current.read();
+        }
+        if (count == -1 && _inputStreams.size() > 0)
+        {
+            if (_current != null)
+            {
+                _current.close();
+            }
+            _current = _inputStreams.removeFirst();
+            count = read();
+        }
+        return count;
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException
+    {
+        int count = -1;
+        if (_current != null)
+        {
+            count = _current.read(b, off, len);
+        }
+
+        if (count < len && _inputStreams.size() > 0)
+        {
+            if (_current != null)
+            {
+                _current.close();
+            }
+
+            _current = _inputStreams.removeFirst();
+            int numRead = count <= 0 ? 0 : count;
+
+            int recursiveCount = read(b, off + numRead, len - numRead);
+
+            if (recursiveCount == -1 && count == -1)
+            {
+                count = -1;
+            }
+            else if (recursiveCount == -1)
+            {
+                count = numRead;
+            }
+            else
+            {
+                count = recursiveCount + numRead;
+            }
+        }
+        return count;
+    }
+
+    public int read(byte[] b) throws IOException
+    {
+        return read(b, 0, b.length);
+    }
+
+    @Override
+    public int available() throws IOException
+    {
+
+        int available = 0;
+        if (_current != null)
+        {
+            available = _current.available();
+        }
+        if (_inputStreams != null)
+        {
+            for (InputStream is : _inputStreams)
+            {
+                if (is != null)
+                {
+                    available += is.available();
+                }
+            }
+        }
+        return available;
+    }
+
+    @Override
+    public boolean markSupported()
+    {
+        return false;
+    }
+
+    @Override
+    public void mark(final int readlimit)
+    {
+    }
+
+    @Override
+    public void reset() throws IOException
+    {
+        throw new IOException("mark/reset not supported");
+    }
+
+    @Override
+    public void close() throws IOException
+    {
+        IOException ioException = null;
+        try
+        {
+            if (_current != null)
+            {
+                try
+                {
+                    _current.close();
+                    _current = null;
+                }
+                catch (IOException e)
+                {
+                    ioException = e;
+                }
+            }
+            for (InputStream is : _inputStreams)
+            {
+                try
+                {
+                    is.close();
+                }
+                catch (IOException e)
+                {
+                    if (ioException != null)
+                    {
+                        ioException = e;
+                    }
+                }
+            }
+        }
+        finally
+        {
+            if (ioException != null)
+            {
+                throw ioException;
+            }
+        }
+    }
+
+}
\ No newline at end of file

Added: qpid/java/trunk/common/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStreamTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStreamTest.java?rev=1709323&view=auto
==============================================================================
--- qpid/java/trunk/common/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStreamTest.java (added)
+++ qpid/java/trunk/common/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStreamTest.java Sun Oct 18 22:03:07 2015
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.bytebuffer;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import junit.framework.TestCase;
+import org.junit.Assert;
+
+public class QpidByteBufferOutputStreamTest extends TestCase
+{
+    public void testWriteByteByByte() throws Exception
+    {
+        QpidByteBufferOutputStream stream = new QpidByteBufferOutputStream(false, 3);
+        stream.write('a');
+        stream.write('b');
+
+        Collection<QpidByteBuffer> bufs = stream.fetchAccumulatedBuffers();
+        assertEquals("Unexpected number of buffers", 2, bufs.size());
+        Iterator<QpidByteBuffer> bufItr = bufs.iterator();
+
+        QpidByteBuffer buf1 = bufItr.next();
+        assertBufferContent("1st buffer", buf1, "a".getBytes());
+
+        QpidByteBuffer buf2 = bufItr.next();
+        assertBufferContent("2nd buffer", buf2, "b".getBytes());
+    }
+
+    public void testWriteByteArrays() throws Exception
+    {
+        QpidByteBufferOutputStream stream = new QpidByteBufferOutputStream(false, 8);
+        stream.write("abcd".getBytes(), 0, 4);
+        stream.write("_ef_".getBytes(), 1, 2);
+
+        Collection<QpidByteBuffer> bufs = stream.fetchAccumulatedBuffers();
+        assertEquals("Unexpected number of buffers", 2, bufs.size());
+        Iterator<QpidByteBuffer> bufItr = bufs.iterator();
+
+        QpidByteBuffer buf1 = bufItr.next();
+        assertBufferContent("1st buffer", buf1, "abcd".getBytes());
+
+        QpidByteBuffer buf2 = bufItr.next();
+        assertBufferContent("2nd buffer", buf2, "ef".getBytes());
+    }
+
+    public void testWriteByteArrays_ArrayTooLargeForSingleBuffer() throws Exception
+    {
+        QpidByteBufferOutputStream stream = new QpidByteBufferOutputStream(false, 8);
+        stream.write("abcdefghi".getBytes());
+
+        Collection<QpidByteBuffer> bufs = stream.fetchAccumulatedBuffers();
+        assertEquals("Unexpected number of buffers", 2, bufs.size());
+        Iterator<QpidByteBuffer> bufItr = bufs.iterator();
+
+        QpidByteBuffer buf1 = bufItr.next();
+        assertBufferContent("1st buffer", buf1, "abcdefgh".getBytes());
+
+        QpidByteBuffer buf2 = bufItr.next();
+        assertBufferContent("2nd buffer", buf2, "i".getBytes());
+    }
+
+    private void assertBufferContent(String bufName, QpidByteBuffer buf, byte[] expectedBytes)
+    {
+        assertEquals(bufName + " has unexpected number of bytes", expectedBytes.length, buf.remaining());
+        byte[] copy = new byte[buf.remaining()];
+        buf.get(copy);
+        Assert.assertArrayEquals(bufName + " has unexpected content", expectedBytes, copy);
+    }
+
+}
\ No newline at end of file

Modified: qpid/java/trunk/common/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferTest.java?rev=1709323&r1=1709322&r2=1709323&view=diff
==============================================================================
--- qpid/java/trunk/common/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferTest.java (original)
+++ qpid/java/trunk/common/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferTest.java Sun Oct 18 22:03:07 2015
@@ -20,13 +20,20 @@
 
 package org.apache.qpid.bytebuffer;
 
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.junit.Assert;
+
 import org.apache.qpid.test.utils.QpidTestCase;
 
 public class QpidByteBufferTest extends QpidTestCase
 {
 
-    public static final int BUFFER_SIZE = 1;
-    public static final int POOL_SIZE = 1   ;
+    public static final int BUFFER_SIZE = 10;
+    public static final int POOL_SIZE = 20;
 
     @Override
     protected void setUp() throws Exception
@@ -99,4 +106,50 @@ public class QpidByteBufferTest extends
             // pass
         }
     }
+
+    public void testDeflateInflate() throws Exception
+    {
+        byte[] input = "aaabbbcccddddeeeffff".getBytes();
+        QpidByteBuffer original = QpidByteBuffer.wrap(input);
+
+        Collection<QpidByteBuffer> deflated = QpidByteBuffer.deflate(Collections.singleton(original));
+        assertNotNull(deflated);
+
+        Collection<QpidByteBuffer> inflated = QpidByteBuffer.inflate(deflated);
+        assertNotNull(inflated);
+        assertEquals("Inflated to an unexpected number of inflated buffers", 2, inflated.size());
+
+        Iterator<QpidByteBuffer> bufItr = inflated.iterator();
+        QpidByteBuffer buf1 = bufItr.next();
+        QpidByteBuffer buf2 = bufItr.next();
+
+        assertEquals("Unexpected total remaining", input.length, buf1.remaining() + buf2.remaining());
+
+        byte[] bytes1 = new byte[buf1.remaining()];
+        buf1.get(bytes1);
+        Assert.assertArrayEquals("Inflated buf1 has unexpected content", Arrays.copyOf(input, bytes1.length), bytes1);
+
+        byte[] bytes2 = new byte[buf2.remaining()];
+        buf2.get(bytes2);
+        Assert.assertArrayEquals("Inflated buf2 has unexpected content", Arrays.copyOfRange(input,
+                                                                                            bytes1.length,
+                                                                                            input.length), bytes2);
+    }
+
+    public void testInflatingUncompressedBytes_ThrowsZipException() throws Exception
+    {
+        byte[] input = "not_a_compressed_stream".getBytes();
+        QpidByteBuffer original = QpidByteBuffer.wrap(input);
+
+        try
+        {
+            QpidByteBuffer.inflate(Collections.singleton(original));
+            fail("Exception not thrown");
+        }
+        catch(java.util.zip.ZipException ze)
+        {
+            // PASS
+        }
+    }
+
 }

Added: qpid/java/trunk/common/src/test/java/org/apache/qpid/streams/CompositeInputStreamTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/streams/CompositeInputStreamTest.java?rev=1709323&view=auto
==============================================================================
--- qpid/java/trunk/common/src/test/java/org/apache/qpid/streams/CompositeInputStreamTest.java (added)
+++ qpid/java/trunk/common/src/test/java/org/apache/qpid/streams/CompositeInputStreamTest.java Sun Oct 18 22:03:07 2015
@@ -0,0 +1,132 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.streams;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+
+import junit.framework.TestCase;
+
+public class CompositeInputStreamTest extends TestCase
+{
+
+    public void testReadByteByByte_MultipleStreams() throws Exception
+    {
+        InputStream bis1 = new ByteArrayInputStream("ab".getBytes());
+        InputStream bis2 = new ByteArrayInputStream("cd".getBytes());
+
+        CompositeInputStream cis = new CompositeInputStream(Arrays.asList(bis1, bis2));
+
+        assertEquals("1st read byte unexpected", 'a', cis.read());
+        assertEquals("2nd read byte unexpected", 'b', cis.read());
+        assertEquals("3rd read byte unexpected", 'c', cis.read());
+        assertEquals("4th read byte unexpected", 'd', cis.read());
+
+        assertEquals("Expecting EOF", -1, cis.read());
+    }
+
+    public void testReadByteArray_MultipleStreams() throws Exception
+    {
+        InputStream bis1 = new ByteArrayInputStream("ab".getBytes());
+        InputStream bis2 = new ByteArrayInputStream("cd".getBytes());
+
+        CompositeInputStream cis = new CompositeInputStream(Arrays.asList(bis1, bis2));
+
+        byte[] buf = new byte[3];
+
+        int read1 = cis.read(buf);
+        assertEquals("Unexpected return value from 1st array read", 3, read1);
+        assertArrayEquals("Unexpected bytes from 1st array read", "abc".getBytes(), buf);
+
+        int read2 = cis.read(buf);
+        assertEquals("Unexpected return value from 2nd array read", 1, read2);
+        assertArrayEquals("Unexpected bytes from 1st array read", "d".getBytes(), Arrays.copyOf(buf, 1));
+
+        int read3 = cis.read(buf);
+        assertEquals("Expecting EOF", -1, read3);
+    }
+
+    public void testReadsMixed_SingleStream() throws Exception
+    {
+        InputStream bis = new ByteArrayInputStream("abcd".getBytes());
+
+        CompositeInputStream cis = new CompositeInputStream(Arrays.asList(bis));
+
+        byte[] buf = new byte[3];
+
+        int read1 = cis.read(buf);
+        assertEquals("Unexpected return value from 1st array read", 3, read1);
+        assertArrayEquals("Unexpected bytes from 1st array read", "abc".getBytes(), buf);
+
+        assertEquals("1st read byte unexpected", 'd', cis.read());
+
+        assertEquals("Expecting EOF", -1, cis.read(buf));
+    }
+
+    public void testAvailable_MultipleStreams() throws Exception
+    {
+        InputStream bis1 = new ByteArrayInputStream("ab".getBytes());
+        InputStream bis2 = new ByteArrayInputStream("cd".getBytes());
+
+        CompositeInputStream cis = new CompositeInputStream(Arrays.asList(bis1, bis2));
+
+        assertEquals("Unexpected number of available bytes before read", 4, cis.available());
+        cis.read();
+        assertEquals("Unexpected number of available bytes after 1st read", 3, cis.available());
+        cis.read();
+        cis.read();
+        assertEquals("Unexpected number of available bytes after 3rd read", 1, cis.available());
+        cis.read();
+        assertEquals("Unexpected number of available bytes after last byte read", 0, cis.available());
+    }
+
+    public void testClose() throws Exception
+    {
+        InputStream bis1 = mock(InputStream.class);
+        InputStream bis2 = mock(InputStream.class);
+
+        CompositeInputStream cis = new CompositeInputStream(Arrays.asList(bis1, bis2));
+
+        cis.close();
+        verify(bis1).close();
+        verify(bis1).close();
+        when(bis1.read()).thenThrow(new IOException("mocked stream closed"));
+
+        try
+        {
+            cis.read();
+            fail("Excetion not thrown");
+        }
+        catch(IOException ioe)
+        {
+            // PASS
+        }
+    }
+
+}
\ No newline at end of file




---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org