You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2015/10/19 00:03:07 UTC
svn commit: r1709323 - in /qpid/java/trunk:
broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/
broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/
common/src/main/java/org/apache/qpid...
Author: kwall
Date: Sun Oct 18 22:03:07 2015
New Revision: 1709323
URL: http://svn.apache.org/viewvc?rev=1709323&view=rev
Log:
QPID-6800: [Java Broker] Use cached buffers for message compression/decompression
CompositeInputStream based on an implementation from Apache Axis2.
Added:
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStream.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/streams/
qpid/java/trunk/common/src/main/java/org/apache/qpid/streams/CompositeInputStream.java
qpid/java/trunk/common/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStreamTest.java
qpid/java/trunk/common/src/test/java/org/apache/qpid/streams/
qpid/java/trunk/common/src/test/java/org/apache/qpid/streams/CompositeInputStreamTest.java
Modified:
qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
qpid/java/trunk/common/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferTest.java
Modified: qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java?rev=1709323&r1=1709322&r2=1709323&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java Sun Oct 18 22:03:07 2015
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -28,6 +29,9 @@ import java.util.concurrent.CopyOnWriteA
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.consumer.ConsumerImpl;
@@ -61,6 +65,7 @@ import org.apache.qpid.util.GZIPUtils;
public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener
{
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConsumerTarget_0_10.class);
private static final Option[] BATCHED = new Option[] { Option.BATCH };
@@ -271,56 +276,50 @@ public class ConsumerTarget_0_10 extends
boolean msgCompressed = messageProps != null && GZIPUtils.GZIP_CONTENT_ENCODING.equals(messageProps.getContentEncoding());
- Collection<QpidByteBuffer> body = msg.getBody();
+ Collection<QpidByteBuffer> bodyBuffers = msg.getBody();
boolean compressionSupported = _session.getConnection().getConnectionDelegate().isCompressionSupported();
- if(msgCompressed && !compressionSupported)
+ if(msgCompressed && !compressionSupported && bodyBuffers != null)
{
- byte[] uncompressed = GZIPUtils.uncompressBufferToArray(ByteBufferUtils.combine(body));
- if(uncompressed != null)
+ Collection<QpidByteBuffer> uncompressedBuffers = inflateIfPossible(bodyBuffers);
+ messageProps.setContentEncoding(null);
+ for (QpidByteBuffer buf : bodyBuffers)
{
- messageProps.setContentEncoding(null);
- for (QpidByteBuffer buf : body)
- {
- buf.dispose();
- }
- body = Collections.singleton(QpidByteBuffer.wrap(uncompressed));
+ buf.dispose();
}
+ bodyBuffers = uncompressedBuffers;
}
else if(!msgCompressed
&& compressionSupported
- && (messageProps == null || messageProps.getContentEncoding()==null)
- && body != null
- && ByteBufferUtils.remaining(body) > _session.getConnection().getMessageCompressionThreshold())
+ && (messageProps == null || messageProps.getContentEncoding() == null)
+ && bodyBuffers != null
+ && ByteBufferUtils.remaining(bodyBuffers) > _session.getConnection().getMessageCompressionThreshold())
{
- byte[] compressed = GZIPUtils.compressBufferToArray(ByteBufferUtils.combine(body));
- if(compressed != null)
+ Collection<QpidByteBuffer> compressedBuffers = deflateIfPossible(bodyBuffers);
+ if(messageProps == null)
{
- if(messageProps == null)
- {
- messageProps = new MessageProperties();
- }
- messageProps.setContentEncoding(GZIPUtils.GZIP_CONTENT_ENCODING);
- for (QpidByteBuffer buf : body)
- {
- buf.dispose();
- }
- body = Collections.singleton(QpidByteBuffer.wrap(compressed));
+ messageProps = new MessageProperties();
}
+ messageProps.setContentEncoding(GZIPUtils.GZIP_CONTENT_ENCODING);
+ for (QpidByteBuffer buf : bodyBuffers)
+ {
+ buf.dispose();
+ }
+ bodyBuffers = compressedBuffers;
}
Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
- xfr = batch ? new MessageTransfer(_name,_acceptMode,_acquireMode,header, body, BATCHED)
- : new MessageTransfer(_name,_acceptMode,_acquireMode,header, body);
- if (body != null)
+ xfr = batch ? new MessageTransfer(_name,_acceptMode,_acquireMode,header, bodyBuffers, BATCHED)
+ : new MessageTransfer(_name,_acceptMode,_acquireMode,header, bodyBuffers);
+ if (bodyBuffers != null)
{
- for (QpidByteBuffer buf : body)
+ for (QpidByteBuffer buf : bodyBuffers)
{
buf.dispose();
}
- body = null;
+ bodyBuffers = null;
}
if(_acceptMode == MessageAcceptMode.NONE && _acquireMode != MessageAcquireMode.PRE_ACQUIRED)
{
@@ -712,4 +711,32 @@ public class ConsumerTarget_0_10 extends
{
return "ConsumerTarget_0_10[name=" + _name + ", session=" + _session.toLogString() + "]";
}
+
+
+ private Collection<QpidByteBuffer> deflateIfPossible(final Collection<QpidByteBuffer> buffers)
+ {
+ try
+ {
+ return QpidByteBuffer.deflate(buffers);
+ }
+ catch (IOException e)
+ {
+ LOGGER.warn("Unable to compress message payload for consumer with gzip, message will be sent as is", e);
+ return null;
+ }
+ }
+
+ private Collection<QpidByteBuffer> inflateIfPossible(final Collection<QpidByteBuffer> buffers)
+ {
+ try
+ {
+ return QpidByteBuffer.inflate(buffers);
+ }
+ catch (IOException e)
+ {
+ LOGGER.warn("Unable to decompress message payload for consumer with gzip, message will be sent as is", e);
+ return null;
+ }
+ }
+
}
Modified: qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java?rev=1709323&r1=1709322&r2=1709323&view=diff
==============================================================================
--- qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java (original)
+++ qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java Sun Oct 18 22:03:07 2015
@@ -24,7 +24,6 @@ import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.List;
import org.slf4j.Logger;
@@ -106,53 +105,55 @@ public class ProtocolOutputConverterImpl
int bodySize = (int) message.getSize();
boolean msgCompressed = isCompressed(contentHeaderBody);
- byte[] modifiedContent;
+ Collection<QpidByteBuffer> modifiedContentBuffers = null;
- // straight through case
boolean compressionSupported = _connection.isCompressionSupported();
- Collection<QpidByteBuffer> buffers = message.getContent();
+ Collection<QpidByteBuffer> contentBuffers = message.getContent();
long length;
if(msgCompressed
&& !compressionSupported
- && (buffers != null)
- && (modifiedContent = GZIPUtils.uncompressBufferToArray(
- ByteBufferUtils.combine(buffers))) != null)
+ && (contentBuffers != null)
+ && (modifiedContentBuffers = inflateIfPossible(contentBuffers)) != null)
{
BasicContentHeaderProperties modifiedProps =
new BasicContentHeaderProperties(contentHeaderBody.getProperties());
modifiedProps.setEncoding((String)null);
- writeMessageDeliveryModified(channelId, deliverBody, modifiedProps, modifiedContent);
-
- length = modifiedContent.length;
- }
+ length = writeMessageDeliveryModified(modifiedContentBuffers, channelId, deliverBody, modifiedProps);
+ }
else if(!msgCompressed
&& compressionSupported
&& contentHeaderBody.getProperties().getEncoding()==null
&& bodySize > _connection.getMessageCompressionThreshold()
- && (buffers != null)
- && (modifiedContent = GZIPUtils.compressBufferToArray(ByteBufferUtils.combine(buffers))) != null)
+ && (contentBuffers != null)
+ && (modifiedContentBuffers = deflateIfPossible(contentBuffers)) != null)
{
BasicContentHeaderProperties modifiedProps =
new BasicContentHeaderProperties(contentHeaderBody.getProperties());
modifiedProps.setEncoding(GZIP_ENCODING);
- writeMessageDeliveryModified(channelId, deliverBody, modifiedProps, modifiedContent);
-
- length = modifiedContent.length;
+ length = writeMessageDeliveryModified(modifiedContentBuffers, channelId, deliverBody, modifiedProps);
}
else
{
- writeMessageDeliveryUnchanged(buffers, contentHeaderBody, channelId, deliverBody, bodySize);
+ writeMessageDeliveryUnchanged(contentBuffers, channelId, deliverBody, contentHeaderBody, bodySize);
length = bodySize;
}
- if (buffers != null)
+ if (contentBuffers != null)
+ {
+ for (QpidByteBuffer buf : contentBuffers)
+ {
+ buf.dispose();
+ }
+ }
+
+ if (modifiedContentBuffers != null)
{
- for (QpidByteBuffer buf : buffers)
+ for(QpidByteBuffer buf : modifiedContentBuffers)
{
buf.dispose();
}
@@ -161,25 +162,45 @@ public class ProtocolOutputConverterImpl
return length;
}
- private int writeMessageDeliveryModified(final int channelId,
+ private Collection<QpidByteBuffer> deflateIfPossible(final Collection<QpidByteBuffer> buffers)
+ {
+ try
+ {
+ return QpidByteBuffer.deflate(buffers);
+ }
+ catch (IOException e)
+ {
+ LOGGER.warn("Unable to compress message payload for consumer with gzip, message will be sent as is", e);
+ return null;
+ }
+ }
+
+ private Collection<QpidByteBuffer> inflateIfPossible(final Collection<QpidByteBuffer> buffers)
+ {
+ try
+ {
+ return QpidByteBuffer.inflate(buffers);
+ }
+ catch (IOException e)
+ {
+ LOGGER.warn("Unable to decompress message payload for consumer with gzip, message will be sent as is", e);
+ return null;
+ }
+ }
+
+ private int writeMessageDeliveryModified(final Collection<QpidByteBuffer> contentBuffers, final int channelId,
final AMQBody deliverBody,
- final BasicContentHeaderProperties modifiedProps,
- final byte[] content)
+ final BasicContentHeaderProperties modifiedProps)
{
- final int bodySize;
- bodySize = content.length;
- ContentHeaderBody modifiedHeaderBody =
- new ContentHeaderBody(modifiedProps, bodySize);
- writeMessageDeliveryUnchanged(Collections.singleton(QpidByteBuffer.wrap(content)),
- modifiedHeaderBody, channelId, deliverBody, bodySize);
+ final int bodySize = ByteBufferUtils.remaining(contentBuffers);
+ ContentHeaderBody modifiedHeaderBody = new ContentHeaderBody(modifiedProps, bodySize);
+ writeMessageDeliveryUnchanged(contentBuffers, channelId, deliverBody, modifiedHeaderBody, bodySize);
return bodySize;
}
- private void writeMessageDeliveryUnchanged(Collection<QpidByteBuffer> messageBuffers,
- ContentHeaderBody contentHeaderBody,
- int channelId,
- AMQBody deliverBody,
+ private void writeMessageDeliveryUnchanged(Collection<QpidByteBuffer> contentBuffers,
+ int channelId, AMQBody deliverBody, ContentHeaderBody contentHeaderBody,
int bodySize)
{
if (bodySize == 0)
@@ -198,7 +219,7 @@ public class ProtocolOutputConverterImpl
int writtenSize = capacity;
- AMQBody firstContentBody = new MessageContentSourceBody(messageBuffers, 0, capacity);
+ AMQBody firstContentBody = new MessageContentSourceBody(contentBuffers, 0, capacity);
CompositeAMQBodyBlock
compositeBlock =
@@ -208,7 +229,7 @@ public class ProtocolOutputConverterImpl
while (writtenSize < bodySize)
{
capacity = bodySize - writtenSize > maxBodySize ? maxBodySize : bodySize - writtenSize;
- AMQBody body = new MessageContentSourceBody(messageBuffers, writtenSize, capacity);
+ AMQBody body = new MessageContentSourceBody(contentBuffers, writtenSize, capacity);
writtenSize += capacity;
writeFrame(new AMQFrame(channelId, body));
Modified: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java?rev=1709323&r1=1709322&r2=1709323&view=diff
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java (original)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java Sun Oct 18 22:03:07 2015
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.bytebuffer;
+import java.io.BufferedOutputStream;
import java.io.DataOutput;
import java.io.IOException;
import java.io.InputStream;
@@ -34,6 +35,8 @@ import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
@@ -44,6 +47,7 @@ import org.slf4j.LoggerFactory;
import org.apache.qpid.codec.MarkableDataInput;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.streams.CompositeInputStream;
public final class QpidByteBuffer
{
@@ -573,6 +577,69 @@ public final class QpidByteBuffer
return engine.wrap(src, dest._buffer);
}
+ public static Collection<QpidByteBuffer> inflate(Collection<QpidByteBuffer> compressedBuffers) throws IOException
+ {
+ if (compressedBuffers == null)
+ {
+ throw new IllegalArgumentException("buffer cannot be null");
+ }
+
+ boolean isDirect = false;
+ Collection<InputStream> streams = new ArrayList<>(compressedBuffers.size());
+ for (QpidByteBuffer buffer : compressedBuffers)
+ {
+ isDirect = isDirect || buffer.isDirect();
+ streams.add(buffer.asInputStream());
+ }
+
+ Collection<QpidByteBuffer> uncompressedBuffers = new ArrayList<>();
+ try (GZIPInputStream gzipInputStream = new GZIPInputStream(new CompositeInputStream(streams)))
+ {
+ byte[] buf = new byte[_pooledBufferSize];
+ int read;
+ while ((read = gzipInputStream.read(buf)) != -1)
+ {
+ QpidByteBuffer output = isDirect ? allocateDirect(read) : allocate(read);
+ output.put(buf, 0, read);
+ output.flip();
+ uncompressedBuffers.add(output);
+ }
+ return uncompressedBuffers;
+ }
+ }
+
+ public static Collection<QpidByteBuffer> deflate(Collection<QpidByteBuffer> uncompressedBuffers) throws IOException
+ {
+ if (uncompressedBuffers == null)
+ {
+ throw new IllegalArgumentException("buffer cannot be null");
+ }
+
+ boolean isDirect = false;
+ Collection<InputStream> streams = new ArrayList<>(uncompressedBuffers.size());
+ for (QpidByteBuffer buffer : uncompressedBuffers)
+ {
+ isDirect = isDirect || buffer.isDirect();
+ streams.add(buffer.asInputStream());
+ }
+
+ QpidByteBufferOutputStream compressedOutput = new QpidByteBufferOutputStream(isDirect, _pooledBufferSize);
+
+ try(InputStream compressedInput = new CompositeInputStream(streams);
+ GZIPOutputStream gzipStream = new GZIPOutputStream(new BufferedOutputStream(compressedOutput, _pooledBufferSize)))
+ {
+ byte[] buf = new byte[16384];
+ int read;
+ while ((read = compressedInput.read(buf)) > -1)
+ {
+ gzipStream.write(buf, 0, read);
+ }
+ }
+
+ // output pipeline will be already flushed and closed
+
+ return compressedOutput.fetchAccumulatedBuffers();
+ }
public static long write(GatheringByteChannel channel, Collection<QpidByteBuffer> buffers) throws IOException
{
Added: qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStream.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStream.java?rev=1709323&view=auto
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStream.java (added)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStream.java Sun Oct 18 22:03:07 2015
@@ -0,0 +1,106 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.bytebuffer;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.LinkedList;
+
+/**
+ * OutputStream implementation that yields a list QpidByteBuffers that contain a copy
+ * of the incoming bytes. Use fetchAccumulatedBuffers to get the buffers. Caller
+ * has responsibility to dispose the buffers after use.
+ *
+ * It will be normally be desirable to front this stream with java.io.BufferedOutputStream
+ * to minimise the number of write and thus the number of buffers created.
+ *
+ * Not thread safe.
+ */
+public class QpidByteBufferOutputStream extends OutputStream
+{
+ private final LinkedList<QpidByteBuffer> _buffers = new LinkedList<>();
+ private final boolean _isDirect;
+ private final int _maximumBufferSize;
+ private boolean _closed;
+
+ public QpidByteBufferOutputStream(final boolean isDirect, final int size)
+ {
+ _isDirect = isDirect;
+ _maximumBufferSize = size;
+ }
+
+ @Override
+ public void write(int b) throws IOException
+ {
+ int size = 1;
+ byte[] data = new byte[] {(byte)b};
+ allocateDataBuffers(data, 0, size);
+ }
+
+ @Override
+ public void write(byte[] data) throws IOException
+ {
+ write(data, 0, data.length);
+ }
+
+ @Override
+ public void write(byte[] data, int offset, int len) throws IOException
+ {
+ allocateDataBuffers(data, offset, len);
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ _closed = true;
+ }
+
+ public Collection<QpidByteBuffer> fetchAccumulatedBuffers()
+ {
+ Collection<QpidByteBuffer> bufs = new ArrayList<>(_buffers);
+ _buffers.clear();
+ return bufs;
+ }
+
+ private void allocateDataBuffers(byte[] data, int offset, int len) throws IOException
+ {
+ if (_closed)
+ {
+ throw new IOException("Stream is closed");
+ }
+
+ int size = Math.min(_maximumBufferSize, len);
+
+ final QpidByteBuffer current =
+ _isDirect ? QpidByteBuffer.allocate(size) : QpidByteBuffer.allocateDirect(
+ _maximumBufferSize);
+ current.put(data, offset, size);
+ current.flip();
+ _buffers.add(current);
+ if (len > size)
+ {
+ allocateDataBuffers(data, offset + size, len - size);
+ }
+ }
+}
Added: qpid/java/trunk/common/src/main/java/org/apache/qpid/streams/CompositeInputStream.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/main/java/org/apache/qpid/streams/CompositeInputStream.java?rev=1709323&view=auto
==============================================================================
--- qpid/java/trunk/common/src/main/java/org/apache/qpid/streams/CompositeInputStream.java (added)
+++ qpid/java/trunk/common/src/main/java/org/apache/qpid/streams/CompositeInputStream.java Sun Oct 18 22:03:07 2015
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.qpid.streams;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collection;
+import java.util.LinkedList;
+
+/**
+ * Allows a series of input streams to be treated as if they were one.
+ * NotThreadSafe
+ */
+public class CompositeInputStream extends InputStream
+{
+ private final LinkedList<InputStream> _inputStreams;
+ private InputStream _current = null;
+
+ public CompositeInputStream(Collection<InputStream> streams)
+ {
+ if (streams == null)
+ {
+ throw new IllegalArgumentException("streams cannot be null");
+ }
+ _inputStreams = new LinkedList<>(streams);
+ }
+
+ @Override
+ public int read() throws IOException
+ {
+ int count = -1;
+ if (_current != null)
+ {
+ count = _current.read();
+ }
+ if (count == -1 && _inputStreams.size() > 0)
+ {
+ if (_current != null)
+ {
+ _current.close();
+ }
+ _current = _inputStreams.removeFirst();
+ count = read();
+ }
+ return count;
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException
+ {
+ int count = -1;
+ if (_current != null)
+ {
+ count = _current.read(b, off, len);
+ }
+
+ if (count < len && _inputStreams.size() > 0)
+ {
+ if (_current != null)
+ {
+ _current.close();
+ }
+
+ _current = _inputStreams.removeFirst();
+ int numRead = count <= 0 ? 0 : count;
+
+ int recursiveCount = read(b, off + numRead, len - numRead);
+
+ if (recursiveCount == -1 && count == -1)
+ {
+ count = -1;
+ }
+ else if (recursiveCount == -1)
+ {
+ count = numRead;
+ }
+ else
+ {
+ count = recursiveCount + numRead;
+ }
+ }
+ return count;
+ }
+
+ public int read(byte[] b) throws IOException
+ {
+ return read(b, 0, b.length);
+ }
+
+ @Override
+ public int available() throws IOException
+ {
+
+ int available = 0;
+ if (_current != null)
+ {
+ available = _current.available();
+ }
+ if (_inputStreams != null)
+ {
+ for (InputStream is : _inputStreams)
+ {
+ if (is != null)
+ {
+ available += is.available();
+ }
+ }
+ }
+ return available;
+ }
+
+ @Override
+ public boolean markSupported()
+ {
+ return false;
+ }
+
+ @Override
+ public void mark(final int readlimit)
+ {
+ }
+
+ @Override
+ public void reset() throws IOException
+ {
+ throw new IOException("mark/reset not supported");
+ }
+
+ @Override
+ public void close() throws IOException
+ {
+ IOException ioException = null;
+ try
+ {
+ if (_current != null)
+ {
+ try
+ {
+ _current.close();
+ _current = null;
+ }
+ catch (IOException e)
+ {
+ ioException = e;
+ }
+ }
+ for (InputStream is : _inputStreams)
+ {
+ try
+ {
+ is.close();
+ }
+ catch (IOException e)
+ {
+ if (ioException != null)
+ {
+ ioException = e;
+ }
+ }
+ }
+ }
+ finally
+ {
+ if (ioException != null)
+ {
+ throw ioException;
+ }
+ }
+ }
+
+}
\ No newline at end of file
Added: qpid/java/trunk/common/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStreamTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStreamTest.java?rev=1709323&view=auto
==============================================================================
--- qpid/java/trunk/common/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStreamTest.java (added)
+++ qpid/java/trunk/common/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStreamTest.java Sun Oct 18 22:03:07 2015
@@ -0,0 +1,90 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.bytebuffer;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import junit.framework.TestCase;
+import org.junit.Assert;
+
+public class QpidByteBufferOutputStreamTest extends TestCase
+{
+ public void testWriteByteByByte() throws Exception
+ {
+ QpidByteBufferOutputStream stream = new QpidByteBufferOutputStream(false, 3);
+ stream.write('a');
+ stream.write('b');
+
+ Collection<QpidByteBuffer> bufs = stream.fetchAccumulatedBuffers();
+ assertEquals("Unexpected number of buffers", 2, bufs.size());
+ Iterator<QpidByteBuffer> bufItr = bufs.iterator();
+
+ QpidByteBuffer buf1 = bufItr.next();
+ assertBufferContent("1st buffer", buf1, "a".getBytes());
+
+ QpidByteBuffer buf2 = bufItr.next();
+ assertBufferContent("2nd buffer", buf2, "b".getBytes());
+ }
+
+ public void testWriteByteArrays() throws Exception
+ {
+ QpidByteBufferOutputStream stream = new QpidByteBufferOutputStream(false, 8);
+ stream.write("abcd".getBytes(), 0, 4);
+ stream.write("_ef_".getBytes(), 1, 2);
+
+ Collection<QpidByteBuffer> bufs = stream.fetchAccumulatedBuffers();
+ assertEquals("Unexpected number of buffers", 2, bufs.size());
+ Iterator<QpidByteBuffer> bufItr = bufs.iterator();
+
+ QpidByteBuffer buf1 = bufItr.next();
+ assertBufferContent("1st buffer", buf1, "abcd".getBytes());
+
+ QpidByteBuffer buf2 = bufItr.next();
+ assertBufferContent("2nd buffer", buf2, "ef".getBytes());
+ }
+
+ public void testWriteByteArrays_ArrayTooLargeForSingleBuffer() throws Exception
+ {
+ QpidByteBufferOutputStream stream = new QpidByteBufferOutputStream(false, 8);
+ stream.write("abcdefghi".getBytes());
+
+ Collection<QpidByteBuffer> bufs = stream.fetchAccumulatedBuffers();
+ assertEquals("Unexpected number of buffers", 2, bufs.size());
+ Iterator<QpidByteBuffer> bufItr = bufs.iterator();
+
+ QpidByteBuffer buf1 = bufItr.next();
+ assertBufferContent("1st buffer", buf1, "abcdefgh".getBytes());
+
+ QpidByteBuffer buf2 = bufItr.next();
+ assertBufferContent("2nd buffer", buf2, "i".getBytes());
+ }
+
+ private void assertBufferContent(String bufName, QpidByteBuffer buf, byte[] expectedBytes)
+ {
+ assertEquals(bufName + " has unexpected number of bytes", expectedBytes.length, buf.remaining());
+ byte[] copy = new byte[buf.remaining()];
+ buf.get(copy);
+ Assert.assertArrayEquals(bufName + " has unexpected content", expectedBytes, copy);
+ }
+
+}
\ No newline at end of file
Modified: qpid/java/trunk/common/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferTest.java?rev=1709323&r1=1709322&r2=1709323&view=diff
==============================================================================
--- qpid/java/trunk/common/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferTest.java (original)
+++ qpid/java/trunk/common/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferTest.java Sun Oct 18 22:03:07 2015
@@ -20,13 +20,20 @@
package org.apache.qpid.bytebuffer;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+
+import org.junit.Assert;
+
import org.apache.qpid.test.utils.QpidTestCase;
public class QpidByteBufferTest extends QpidTestCase
{
- public static final int BUFFER_SIZE = 1;
- public static final int POOL_SIZE = 1 ;
+ public static final int BUFFER_SIZE = 10;
+ public static final int POOL_SIZE = 20;
@Override
protected void setUp() throws Exception
@@ -99,4 +106,50 @@ public class QpidByteBufferTest extends
// pass
}
}
+
+ public void testDeflateInflate() throws Exception
+ {
+ byte[] input = "aaabbbcccddddeeeffff".getBytes();
+ QpidByteBuffer original = QpidByteBuffer.wrap(input);
+
+ Collection<QpidByteBuffer> deflated = QpidByteBuffer.deflate(Collections.singleton(original));
+ assertNotNull(deflated);
+
+ Collection<QpidByteBuffer> inflated = QpidByteBuffer.inflate(deflated);
+ assertNotNull(inflated);
+ assertEquals("Inflated to an unexpected number of inflated buffers", 2, inflated.size());
+
+ Iterator<QpidByteBuffer> bufItr = inflated.iterator();
+ QpidByteBuffer buf1 = bufItr.next();
+ QpidByteBuffer buf2 = bufItr.next();
+
+ assertEquals("Unexpected total remaining", input.length, buf1.remaining() + buf2.remaining());
+
+ byte[] bytes1 = new byte[buf1.remaining()];
+ buf1.get(bytes1);
+ Assert.assertArrayEquals("Inflated buf1 has unexpected content", Arrays.copyOf(input, bytes1.length), bytes1);
+
+ byte[] bytes2 = new byte[buf2.remaining()];
+ buf2.get(bytes2);
+ Assert.assertArrayEquals("Inflated buf2 has unexpected content", Arrays.copyOfRange(input,
+ bytes1.length,
+ input.length), bytes2);
+ }
+
+ public void testInflatingUncompressedBytes_ThrowsZipException() throws Exception
+ {
+ byte[] input = "not_a_compressed_stream".getBytes();
+ QpidByteBuffer original = QpidByteBuffer.wrap(input);
+
+ try
+ {
+ QpidByteBuffer.inflate(Collections.singleton(original));
+ fail("Exception not thrown");
+ }
+ catch(java.util.zip.ZipException ze)
+ {
+ // PASS
+ }
+ }
+
}
Added: qpid/java/trunk/common/src/test/java/org/apache/qpid/streams/CompositeInputStreamTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/common/src/test/java/org/apache/qpid/streams/CompositeInputStreamTest.java?rev=1709323&view=auto
==============================================================================
--- qpid/java/trunk/common/src/test/java/org/apache/qpid/streams/CompositeInputStreamTest.java (added)
+++ qpid/java/trunk/common/src/test/java/org/apache/qpid/streams/CompositeInputStreamTest.java Sun Oct 18 22:03:07 2015
@@ -0,0 +1,132 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.streams;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Arrays;
+
+import junit.framework.TestCase;
+
+public class CompositeInputStreamTest extends TestCase
+{
+
+ public void testReadByteByByte_MultipleStreams() throws Exception
+ {
+ InputStream bis1 = new ByteArrayInputStream("ab".getBytes());
+ InputStream bis2 = new ByteArrayInputStream("cd".getBytes());
+
+ CompositeInputStream cis = new CompositeInputStream(Arrays.asList(bis1, bis2));
+
+ assertEquals("1st read byte unexpected", 'a', cis.read());
+ assertEquals("2nd read byte unexpected", 'b', cis.read());
+ assertEquals("3rd read byte unexpected", 'c', cis.read());
+ assertEquals("4th read byte unexpected", 'd', cis.read());
+
+ assertEquals("Expecting EOF", -1, cis.read());
+ }
+
+ public void testReadByteArray_MultipleStreams() throws Exception
+ {
+ InputStream bis1 = new ByteArrayInputStream("ab".getBytes());
+ InputStream bis2 = new ByteArrayInputStream("cd".getBytes());
+
+ CompositeInputStream cis = new CompositeInputStream(Arrays.asList(bis1, bis2));
+
+ byte[] buf = new byte[3];
+
+ int read1 = cis.read(buf);
+ assertEquals("Unexpected return value from 1st array read", 3, read1);
+ assertArrayEquals("Unexpected bytes from 1st array read", "abc".getBytes(), buf);
+
+ int read2 = cis.read(buf);
+ assertEquals("Unexpected return value from 2nd array read", 1, read2);
+ assertArrayEquals("Unexpected bytes from 1st array read", "d".getBytes(), Arrays.copyOf(buf, 1));
+
+ int read3 = cis.read(buf);
+ assertEquals("Expecting EOF", -1, read3);
+ }
+
+ public void testReadsMixed_SingleStream() throws Exception
+ {
+ InputStream bis = new ByteArrayInputStream("abcd".getBytes());
+
+ CompositeInputStream cis = new CompositeInputStream(Arrays.asList(bis));
+
+ byte[] buf = new byte[3];
+
+ int read1 = cis.read(buf);
+ assertEquals("Unexpected return value from 1st array read", 3, read1);
+ assertArrayEquals("Unexpected bytes from 1st array read", "abc".getBytes(), buf);
+
+ assertEquals("1st read byte unexpected", 'd', cis.read());
+
+ assertEquals("Expecting EOF", -1, cis.read(buf));
+ }
+
+ public void testAvailable_MultipleStreams() throws Exception
+ {
+ InputStream bis1 = new ByteArrayInputStream("ab".getBytes());
+ InputStream bis2 = new ByteArrayInputStream("cd".getBytes());
+
+ CompositeInputStream cis = new CompositeInputStream(Arrays.asList(bis1, bis2));
+
+ assertEquals("Unexpected number of available bytes before read", 4, cis.available());
+ cis.read();
+ assertEquals("Unexpected number of available bytes after 1st read", 3, cis.available());
+ cis.read();
+ cis.read();
+ assertEquals("Unexpected number of available bytes after 3rd read", 1, cis.available());
+ cis.read();
+ assertEquals("Unexpected number of available bytes after last byte read", 0, cis.available());
+ }
+
+ public void testClose() throws Exception
+ {
+ InputStream bis1 = mock(InputStream.class);
+ InputStream bis2 = mock(InputStream.class);
+
+ CompositeInputStream cis = new CompositeInputStream(Arrays.asList(bis1, bis2));
+
+ cis.close();
+ verify(bis1).close();
+ verify(bis1).close();
+ when(bis1.read()).thenThrow(new IOException("mocked stream closed"));
+
+ try
+ {
+ cis.read();
+ fail("Excetion not thrown");
+ }
+ catch(IOException ioe)
+ {
+ // PASS
+ }
+ }
+
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org