You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/11/10 15:50:51 UTC
[4/4] qpid-jms-amqp-0-x git commit: QPID-7725: [Java Client,
AMQP 0-x] Remove QpidByteBuffer
QPID-7725: [Java Client, AMQP 0-x] Remove QpidByteBuffer
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/commit/95203e5d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/tree/95203e5d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/diff/95203e5d
Branch: refs/heads/master
Commit: 95203e5d1228ff5e84bb7993cb92d6212b1b476a
Parents: 8443f85
Author: Lorenz Quack <lq...@apache.org>
Authored: Fri Nov 10 15:50:19 2017 +0000
Committer: Lorenz Quack <lq...@apache.org>
Committed: Fri Nov 10 15:50:19 2017 +0000
----------------------------------------------------------------------
.../org/apache/qpid/bytebuffer/BufferPool.java | 54 --
.../apache/qpid/bytebuffer/ByteBufferRef.java | 34 -
.../qpid/bytebuffer/NonPooledByteBufferRef.java | 57 --
.../qpid/bytebuffer/PooledByteBufferRef.java | 70 --
.../apache/qpid/bytebuffer/QpidByteBuffer.java | 884 ------------------
.../bytebuffer/QpidByteBufferInputStream.java | 109 ---
.../bytebuffer/QpidByteBufferOutputStream.java | 113 ---
.../qpid/client/BasicMessageProducer_0_8.java | 3 +-
.../message/AbstractJMSMessageFactory.java | 26 +-
.../message/Encrypted091MessageFactory.java | 3 +-
.../java/org/apache/qpid/codec/AMQDecoder.java | 31 +-
.../org/apache/qpid/codec/ClientDecoder.java | 23 +-
.../org/apache/qpid/codec/ServerDecoder.java | 251 -----
.../java/org/apache/qpid/framing/AMQFrame.java | 13 +-
.../apache/qpid/framing/AMQMethodBodyImpl.java | 41 +-
.../org/apache/qpid/framing/AMQShortString.java | 32 +-
.../java/org/apache/qpid/framing/AMQType.java | 83 +-
.../org/apache/qpid/framing/AMQTypedValue.java | 17 +-
.../apache/qpid/framing/AccessRequestBody.java | 7 +-
.../qpid/framing/AccessRequestOkBody.java | 10 +-
.../org/apache/qpid/framing/BasicAckBody.java | 7 +-
.../apache/qpid/framing/BasicCancelBody.java | 7 +-
.../apache/qpid/framing/BasicCancelOkBody.java | 7 +-
.../apache/qpid/framing/BasicConsumeBody.java | 10 +-
.../apache/qpid/framing/BasicConsumeOkBody.java | 7 +-
.../framing/BasicContentHeaderProperties.java | 41 +-
.../apache/qpid/framing/BasicDeliverBody.java | 7 +-
.../org/apache/qpid/framing/BasicGetBody.java | 10 +-
.../apache/qpid/framing/BasicGetEmptyBody.java | 7 +-
.../org/apache/qpid/framing/BasicGetOkBody.java | 10 +-
.../org/apache/qpid/framing/BasicNackBody.java | 7 +-
.../apache/qpid/framing/BasicPublishBody.java | 10 +-
.../org/apache/qpid/framing/BasicQosBody.java | 12 +-
.../org/apache/qpid/framing/BasicQosOkBody.java | 5 +-
.../apache/qpid/framing/BasicRecoverBody.java | 7 +-
.../qpid/framing/BasicRecoverSyncBody.java | 7 +-
.../qpid/framing/BasicRecoverSyncOkBody.java | 5 +-
.../apache/qpid/framing/BasicRejectBody.java | 7 +-
.../apache/qpid/framing/BasicReturnBody.java | 10 +-
.../apache/qpid/framing/ChannelAlertBody.java | 10 +-
.../apache/qpid/framing/ChannelCloseBody.java | 14 +-
.../apache/qpid/framing/ChannelCloseOkBody.java | 5 +-
.../apache/qpid/framing/ChannelFlowBody.java | 7 +-
.../apache/qpid/framing/ChannelFlowOkBody.java | 7 +-
.../qpid/framing/ChannelMethodProcessor.java | 4 +-
.../apache/qpid/framing/ChannelOpenBody.java | 7 +-
.../apache/qpid/framing/ChannelOpenOkBody.java | 8 +-
.../apache/qpid/framing/ConfirmSelectBody.java | 7 +-
.../qpid/framing/ConfirmSelectOkBody.java | 5 +-
.../qpid/framing/ConnectionCloseBody.java | 14 +-
.../qpid/framing/ConnectionCloseOkBody.java | 5 +-
.../apache/qpid/framing/ConnectionOpenBody.java | 7 +-
.../qpid/framing/ConnectionOpenOkBody.java | 7 +-
.../qpid/framing/ConnectionRedirectBody.java | 7 +-
.../qpid/framing/ConnectionSecureBody.java | 7 +-
.../qpid/framing/ConnectionSecureOkBody.java | 7 +-
.../qpid/framing/ConnectionStartBody.java | 12 +-
.../qpid/framing/ConnectionStartOkBody.java | 7 +-
.../apache/qpid/framing/ConnectionTuneBody.java | 14 +-
.../qpid/framing/ConnectionTuneOkBody.java | 14 +-
.../org/apache/qpid/framing/ContentBody.java | 23 +-
.../apache/qpid/framing/ContentHeaderBody.java | 39 +-
.../framing/ContentHeaderPropertiesFactory.java | 4 +-
.../org/apache/qpid/framing/EncodingUtils.java | 47 +-
.../apache/qpid/framing/ExchangeBoundBody.java | 7 +-
.../qpid/framing/ExchangeBoundOkBody.java | 11 +-
.../qpid/framing/ExchangeDeclareBody.java | 10 +-
.../qpid/framing/ExchangeDeclareOkBody.java | 5 +-
.../apache/qpid/framing/ExchangeDeleteBody.java | 10 +-
.../qpid/framing/ExchangeDeleteOkBody.java | 5 +-
.../org/apache/qpid/framing/FieldArray.java | 10 +-
.../org/apache/qpid/framing/FieldTable.java | 32 +-
.../framing/FrameCreatingMethodProcessor.java | 5 +-
.../org/apache/qpid/framing/HeartbeatBody.java | 4 +-
.../apache/qpid/framing/ProtocolInitiation.java | 8 +-
.../org/apache/qpid/framing/QueueBindBody.java | 10 +-
.../apache/qpid/framing/QueueBindOkBody.java | 5 +-
.../apache/qpid/framing/QueueDeclareBody.java | 10 +-
.../apache/qpid/framing/QueueDeclareOkBody.java | 12 +-
.../apache/qpid/framing/QueueDeleteBody.java | 10 +-
.../apache/qpid/framing/QueueDeleteOkBody.java | 10 +-
.../org/apache/qpid/framing/QueuePurgeBody.java | 10 +-
.../apache/qpid/framing/QueuePurgeOkBody.java | 10 +-
.../apache/qpid/framing/QueueUnbindBody.java | 10 +-
.../apache/qpid/framing/QueueUnbindOkBody.java | 5 +-
.../org/apache/qpid/framing/TxCommitBody.java | 5 +-
.../org/apache/qpid/framing/TxCommitOkBody.java | 5 +-
.../org/apache/qpid/framing/TxRollbackBody.java | 5 +-
.../apache/qpid/framing/TxRollbackOkBody.java | 5 +-
.../org/apache/qpid/framing/TxSelectBody.java | 5 +-
.../org/apache/qpid/framing/TxSelectOkBody.java | 5 +-
.../apache/qpid/transport/ByteBufferSender.java | 6 +-
.../apache/qpid/transport/MessageTransfer.java | 70 +-
.../java/org/apache/qpid/transport/Method.java | 13 +-
.../apache/qpid/transport/ProtocolHeader.java | 7 +-
.../qpid/transport/network/Assembler.java | 4 +-
.../qpid/transport/network/Disassembler.java | 74 +-
.../qpid/transport/network/io/IoSender.java | 10 +-
.../network/security/sasl/SASLSender.java | 13 +-
.../network/security/ssl/SSLSender.java | 38 +-
.../apache/qpid/transport/util/Functions.java | 55 --
.../org/apache/qpid/util/ByteBufferUtils.java | 86 +-
.../QpidByteBufferOutputStreamTest.java | 114 ---
.../qpid/bytebuffer/QpidByteBufferTest.java | 917 -------------------
.../message/Encrypted091MessageFactoryTest.java | 3 +-
.../org/apache/qpid/codec/AMQDecoderTest.java | 15 +-
.../BasicContentHeaderPropertiesTest.java | 5 +-
.../apache/qpid/framing/EncodingUtilsTest.java | 26 +-
.../org/apache/qpid/framing/FieldTableTest.java | 10 +-
109 files changed, 663 insertions(+), 3364 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/bytebuffer/BufferPool.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/bytebuffer/BufferPool.java b/client/src/main/java/org/apache/qpid/bytebuffer/BufferPool.java
deleted file mode 100644
index cb0b5ba..0000000
--- a/client/src/main/java/org/apache/qpid/bytebuffer/BufferPool.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.bytebuffer;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-class BufferPool
-{
- private final int _maxSize;
- private final ConcurrentLinkedQueue<ByteBuffer> _pooledBuffers = new ConcurrentLinkedQueue<>();
-
- BufferPool(final int maxSize)
- {
- _maxSize = maxSize;
- }
-
- ByteBuffer getBuffer()
- {
- return _pooledBuffers.poll();
- }
-
- void returnBuffer(ByteBuffer buf)
- {
- buf.clear();
- if (_pooledBuffers.size() < _maxSize)
- {
- _pooledBuffers.add(buf);
- }
- }
-
- public int getMaxSize()
- {
- return _maxSize;
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java b/client/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java
deleted file mode 100644
index 30628e4..0000000
--- a/client/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.bytebuffer;
-
-import java.nio.ByteBuffer;
-
-public interface ByteBufferRef
-{
- void incrementRef();
-
- void decrementRef();
-
- ByteBuffer getBuffer();
-
- void removeFromPool();
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java b/client/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java
deleted file mode 100644
index 6f0b8af..0000000
--- a/client/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.bytebuffer;
-
-import java.nio.ByteBuffer;
-
-class NonPooledByteBufferRef implements ByteBufferRef
-{
- private final ByteBuffer _buffer;
-
- NonPooledByteBufferRef(final ByteBuffer buffer)
- {
- _buffer = buffer;
- }
-
- @Override
- public void incrementRef()
- {
-
- }
-
- @Override
- public void decrementRef()
- {
-
- }
-
- @Override
- public ByteBuffer getBuffer()
- {
- return _buffer;
- }
-
- @Override
- public void removeFromPool()
- {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java b/client/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java
deleted file mode 100644
index 807dfe9..0000000
--- a/client/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.bytebuffer;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-
-class PooledByteBufferRef implements ByteBufferRef
-{
- private static final AtomicIntegerFieldUpdater<PooledByteBufferRef> REF_COUNT = AtomicIntegerFieldUpdater.newUpdater(PooledByteBufferRef.class, "_refCount");
-
- private final ByteBuffer _buffer;
- private volatile int _refCount;
-
- PooledByteBufferRef(final ByteBuffer buffer)
- {
- _buffer = buffer;
- }
-
- @Override
- public void incrementRef()
- {
-
- if(REF_COUNT.get(this) >= 0)
- {
- REF_COUNT.incrementAndGet(this);
- }
- }
-
- @Override
- public void decrementRef()
- {
- if(REF_COUNT.get(this) > 0 && REF_COUNT.decrementAndGet(this) == 0)
- {
- QpidByteBuffer.returnToPool(_buffer);
- }
- }
-
- @Override
- public ByteBuffer getBuffer()
- {
- return _buffer.duplicate();
- }
-
- @Override
- public void removeFromPool()
- {
- REF_COUNT.set(this, Integer.MIN_VALUE/2);
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java b/client/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
deleted file mode 100644
index eec2a05..0000000
--- a/client/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
+++ /dev/null
@@ -1,884 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.bytebuffer;
-
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.BufferOverflowException;
-import java.nio.BufferUnderflowException;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.channels.GatheringByteChannel;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
-
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLEngineResult;
-import javax.net.ssl.SSLException;
-
-import org.apache.qpid.streams.CompositeInputStream;
-
-public class QpidByteBuffer
-{
- private static final AtomicIntegerFieldUpdater<QpidByteBuffer>
- DISPOSED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(
- QpidByteBuffer.class,
- "_disposed");
- private static final ThreadLocal<QpidByteBuffer> _cachedBuffer = new ThreadLocal<>();
- private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = new ByteBuffer[0];
- private volatile static boolean _isPoolInitialized;
- private volatile static BufferPool _bufferPool;
- private volatile static int _pooledBufferSize;
- private volatile static ByteBuffer _zeroed;
- private final int _offset;
-
- final ByteBufferRef _ref;
- volatile ByteBuffer _buffer;
- @SuppressWarnings("unused")
- private volatile int _disposed;
-
-
- QpidByteBuffer(ByteBufferRef ref)
- {
- this(ref, ref.getBuffer(), 0);
- }
-
- private QpidByteBuffer(ByteBufferRef ref, ByteBuffer buffer, int offset)
- {
- _ref = ref;
- _buffer = buffer;
- _offset = offset;
- _ref.incrementRef();
- }
-
- public final boolean isDirect()
- {
- return _buffer.isDirect();
- }
-
- public final short getUnsignedByte()
- {
- return (short) (((short) get()) & 0xFF);
- }
-
- public final int getUnsignedShort()
- {
- return ((int) getShort()) & 0xffff;
- }
-
- public final int getUnsignedShort(int pos)
- {
- return ((int) getShort(pos)) & 0xffff;
- }
-
-
- public final long getUnsignedInt()
- {
- return ((long) getInt()) & 0xffffffffL;
- }
-
- public final QpidByteBuffer putUnsignedByte(final short s)
- {
- put((byte) s);
- return this;
- }
-
- public final QpidByteBuffer putUnsignedShort(final int i)
- {
- putShort((short) i);
- return this;
- }
-
- public final QpidByteBuffer putUnsignedInt(final long value)
- {
- putInt((int) value);
- return this;
- }
-
- public final void dispose()
- {
- if (DISPOSED_UPDATER.compareAndSet(this, 0, 1))
- {
- _ref.decrementRef();
- _buffer = null;
- }
- }
-
- public final InputStream asInputStream()
- {
- return new BufferInputStream(this);
- }
-
- public final ByteBuffer asByteBuffer()
- {
- try
- {
- return getUnderlyingBuffer();
- }
- finally
- {
- dispose();
- }
- }
-
- public final CharBuffer decode(Charset charset)
- {
- return charset.decode(getUnderlyingBuffer());
- }
-
- public final int read(ReadableByteChannel channel) throws IOException
- {
- return channel.read(getUnderlyingBuffer());
- }
-
- public final SSLEngineResult decryptSSL(SSLEngine engine, QpidByteBuffer dest) throws SSLException
- {
- return engine.unwrap(getUnderlyingBuffer(), dest.getUnderlyingBuffer());
- }
-
- @Override
- public String toString()
- {
- return "QpidByteBuffer{" +
- "_buffer=" + _buffer +
- ", _disposed=" + _disposed +
- '}';
- }
-
- public final boolean hasRemaining()
- {
- return _buffer.hasRemaining();
- }
-
- public QpidByteBuffer putInt(final int index, final int value)
- {
- _buffer.putInt(index, value);
- return this;
- }
-
- public QpidByteBuffer putShort(final int index, final short value)
- {
- _buffer.putShort(index, value);
- return this;
- }
-
- public QpidByteBuffer putChar(final int index, final char value)
- {
- _buffer.putChar(index, value);
- return this;
- }
-
- public final QpidByteBuffer put(final byte b)
- {
- _buffer.put(b);
- return this;
- }
-
- public QpidByteBuffer put(final int index, final byte b)
- {
- _buffer.put(index, b);
- return this;
- }
-
- public short getShort(final int index)
- {
- return _buffer.getShort(index);
- }
-
- public final QpidByteBuffer mark()
- {
- _buffer.mark();
- return this;
- }
-
- public final long getLong()
- {
- return _buffer.getLong();
- }
-
- public QpidByteBuffer putFloat(final int index, final float value)
- {
- _buffer.putFloat(index, value);
- return this;
- }
-
- public double getDouble(final int index)
- {
- return _buffer.getDouble(index);
- }
-
- public final boolean hasArray()
- {
- return _buffer.hasArray();
- }
-
- public final double getDouble()
- {
- return _buffer.getDouble();
- }
-
- public final QpidByteBuffer putFloat(final float value)
- {
- _buffer.putFloat(value);
- return this;
- }
-
- public final QpidByteBuffer putInt(final int value)
- {
- _buffer.putInt(value);
- return this;
- }
-
- public byte[] array()
- {
- return _buffer.array();
- }
-
- public final QpidByteBuffer putShort(final short value)
- {
- _buffer.putShort(value);
- return this;
- }
-
- public int getInt(final int index)
- {
- return _buffer.getInt(index);
- }
-
- public final int remaining()
- {
- return _buffer.remaining();
- }
-
- public final QpidByteBuffer put(final byte[] src)
- {
- _buffer.put(src);
- return this;
- }
-
- public final QpidByteBuffer put(final ByteBuffer src)
- {
- _buffer.put(src);
- return this;
- }
-
- public final QpidByteBuffer put(final QpidByteBuffer src)
- {
- int sourceRemaining = src.remaining();
- if (sourceRemaining > remaining())
- {
- throw new BufferOverflowException();
- }
-
- _buffer.put(src.getUnderlyingBuffer());
- return this;
- }
-
- public final QpidByteBuffer get(final byte[] dst, final int offset, final int length)
- {
- _buffer.get(dst, offset, length);
- return this;
- }
-
- public final QpidByteBuffer get(final ByteBuffer dst)
- {
- int destinationRemaining = dst.remaining();
- int remaining = remaining();
- if (destinationRemaining < remaining)
- {
- throw new BufferUnderflowException();
- }
- dst.put(_buffer);
- return this;
- }
-
- public final void copyTo(final ByteBuffer dst)
- {
- dst.put(_buffer.duplicate());
- }
-
- public final void putCopyOf(final QpidByteBuffer source)
- {
- int remaining = remaining();
- int sourceRemaining = source.remaining();
- if (sourceRemaining > remaining)
- {
- throw new BufferOverflowException();
- }
-
- put(source.getUnderlyingBuffer().duplicate());
- }
-
- public QpidByteBuffer rewind()
- {
- _buffer.rewind();
- return this;
- }
-
- public QpidByteBuffer clear()
- {
- _buffer.clear();
- return this;
- }
-
- public QpidByteBuffer putLong(final int index, final long value)
- {
- _buffer.putLong(index, value);
- return this;
- }
-
- public QpidByteBuffer compact()
- {
- _buffer.compact();
- return this;
- }
-
- public final QpidByteBuffer putDouble(final double value)
- {
- _buffer.putDouble(value);
- return this;
- }
-
- public int limit()
- {
- return _buffer.limit();
- }
-
- public QpidByteBuffer reset()
- {
- _buffer.reset();
- return this;
- }
-
- public QpidByteBuffer flip()
- {
- _buffer.flip();
- return this;
- }
-
- public final short getShort()
- {
- return _buffer.getShort();
- }
-
- public final float getFloat()
- {
- return _buffer.getFloat();
- }
-
- public QpidByteBuffer limit(final int newLimit)
- {
- _buffer.limit(newLimit);
- return this;
- }
-
- /**
- * Method does not respect mark.
- *
- * @return QpidByteBuffer
- */
- public QpidByteBuffer duplicate()
- {
- ByteBuffer buffer = _ref.getBuffer();
- if (!(_ref instanceof PooledByteBufferRef))
- {
- buffer = buffer.duplicate();
- }
-
- buffer.position(_offset );
- buffer.limit(_offset + _buffer.capacity());
-
- buffer = buffer.slice();
-
- buffer.limit(_buffer.limit());
- buffer.position(_buffer.position());
- return new QpidByteBuffer(_ref, buffer, _offset);
- }
-
- public final QpidByteBuffer put(final byte[] src, final int offset, final int length)
- {
- _buffer.put(src, offset, length);
- return this;
- }
-
- public long getLong(final int index)
- {
- return _buffer.getLong(index);
- }
-
- public int capacity()
- {
- return _buffer.capacity();
- }
-
- public char getChar(final int index)
- {
- return _buffer.getChar(index);
- }
-
- public final byte get()
- {
- return _buffer.get();
- }
-
- public byte get(final int index)
- {
- return _buffer.get(index);
- }
-
- public final QpidByteBuffer get(final byte[] dst)
- {
- _buffer.get(dst);
- return this;
- }
-
- public final void copyTo(final byte[] dst)
- {
- if (remaining() < dst.length)
- {
- throw new BufferUnderflowException();
- }
- _buffer.duplicate().get(dst);
- }
-
- public final QpidByteBuffer putChar(final char value)
- {
- _buffer.putChar(value);
- return this;
- }
-
- public QpidByteBuffer position(final int newPosition)
- {
- _buffer.position(newPosition);
- return this;
- }
-
- public int arrayOffset()
- {
- return _buffer.arrayOffset();
- }
-
- public final char getChar()
- {
- return _buffer.getChar();
- }
-
- public final int getInt()
- {
- return _buffer.getInt();
- }
-
- public final QpidByteBuffer putLong(final long value)
- {
- _buffer.putLong(value);
- return this;
- }
-
- public float getFloat(final int index)
- {
- return _buffer.getFloat(index);
- }
-
- public QpidByteBuffer slice()
- {
- return view(0, _buffer.remaining());
- }
-
- public QpidByteBuffer view(int offset, int length)
- {
- ByteBuffer buffer = _ref.getBuffer();
- if (!(_ref instanceof PooledByteBufferRef))
- {
- buffer = buffer.duplicate();
- }
-
- int newRemaining = Math.min(_buffer.remaining() - offset, length);
-
- int newPosition = _offset + _buffer.position() + offset;
- buffer.limit(newPosition + newRemaining);
- buffer.position(newPosition);
-
- buffer = buffer.slice();
-
- return new QpidByteBuffer(_ref, buffer, newPosition);
- }
-
- public int position()
- {
- return _buffer.position();
- }
-
- public QpidByteBuffer putDouble(final int index, final double value)
- {
- _buffer.putDouble(index, value);
- return this;
- }
-
- ByteBuffer getUnderlyingBuffer()
- {
- return _buffer;
- }
-
- public static QpidByteBuffer allocate(boolean direct, int size)
- {
- return direct ? allocateDirect(size) : allocate(size);
- }
-
- public static QpidByteBuffer allocate(int size)
- {
- return new QpidByteBuffer(new NonPooledByteBufferRef(ByteBuffer.allocate(size)));
- }
-
- public static QpidByteBuffer allocateDirect(int size)
- {
- if (size < 0)
- {
- throw new IllegalArgumentException("Cannot allocate QpidByteBuffer with size "
- + size
- + " which is negative.");
- }
-
- final ByteBufferRef ref;
- if (_isPoolInitialized && _pooledBufferSize >= size)
- {
- if (_pooledBufferSize == size)
- {
- ByteBuffer buf = _bufferPool.getBuffer();
- if (buf == null)
- {
- buf = ByteBuffer.allocateDirect(size);
- }
- ref = new PooledByteBufferRef(buf);
- }
- else
- {
- QpidByteBuffer buf = _cachedBuffer.get();
- if (buf == null || buf.remaining() < size)
- {
- if (buf != null)
- {
- buf.dispose();
- }
- buf = allocateDirect(_pooledBufferSize);
- }
- QpidByteBuffer rVal = buf.view(0, size);
- buf.position(buf.position() + size);
-
- _cachedBuffer.set(buf);
- return rVal;
- }
- }
- else
- {
- ref = new NonPooledByteBufferRef(ByteBuffer.allocateDirect(size));
- }
- return new QpidByteBuffer(ref);
- }
-
- public static Collection<QpidByteBuffer> allocateDirectCollection(int size)
- {
- if (_pooledBufferSize == 0)
- {
- return Collections.singleton(allocateDirect(size));
- }
- else
- {
- List<QpidByteBuffer> buffers = new ArrayList<>((size / _pooledBufferSize) + 2);
- int remaining = size;
-
- QpidByteBuffer buf = _cachedBuffer.get();
- if (buf == null)
- {
- buf = allocateDirect(_pooledBufferSize);
- }
- while (remaining > buf.remaining())
- {
- int bufRemaining = buf.remaining();
- if (buf == _cachedBuffer.get())
- {
- buffers.add(buf.view(0, bufRemaining));
- buf.dispose();
- }
- else
- {
- buffers.add(buf);
- }
- remaining -= bufRemaining;
- buf = allocateDirect(_pooledBufferSize);
- }
- buffers.add(buf.view(0, remaining));
- buf.position(buf.position() + remaining);
-
- if (buf.hasRemaining())
- {
- _cachedBuffer.set(buf);
- }
- else
- {
- _cachedBuffer.set(allocateDirect(_pooledBufferSize));
- buf.dispose();
- }
- return buffers;
- }
- }
-
- public static SSLEngineResult encryptSSL(SSLEngine engine,
- final Collection<QpidByteBuffer> buffers,
- QpidByteBuffer dest) throws SSLException
- {
- final ByteBuffer[] src;
- // QPID-7447: prevent unnecessary allocations
- if (buffers.isEmpty())
- {
- src = EMPTY_BYTE_BUFFER_ARRAY;
- }
- else
- {
- src = new ByteBuffer[buffers.size()];
- Iterator<QpidByteBuffer> iterator = buffers.iterator();
- for (int i = 0; i < src.length; i++)
- {
- src[i] = iterator.next().getUnderlyingBuffer();
- }
- }
- return engine.wrap(src, dest.getUnderlyingBuffer());
- }
-
- public static Collection<QpidByteBuffer> inflate(Collection<QpidByteBuffer> compressedBuffers) throws IOException
- {
- if (compressedBuffers == null)
- {
- throw new IllegalArgumentException("compressedBuffers cannot be null");
- }
-
- boolean isDirect = false;
- Collection<InputStream> streams = new ArrayList<>(compressedBuffers.size());
- for (QpidByteBuffer buffer : compressedBuffers)
- {
- isDirect = isDirect || buffer.isDirect();
- streams.add(buffer.asInputStream());
- }
- final int bufferSize = (isDirect && _pooledBufferSize > 0) ? _pooledBufferSize : 65536;
-
- Collection<QpidByteBuffer> uncompressedBuffers = new ArrayList<>();
- try (GZIPInputStream gzipInputStream = new GZIPInputStream(new CompositeInputStream(streams)))
- {
- byte[] buf = new byte[bufferSize];
- int read;
- while ((read = gzipInputStream.read(buf)) != -1)
- {
- QpidByteBuffer output = isDirect ? allocateDirect(read) : allocate(read);
- output.put(buf, 0, read);
- output.flip();
- uncompressedBuffers.add(output);
- }
- return uncompressedBuffers;
- }
- catch (IOException e)
- {
- for (QpidByteBuffer uncompressedBuffer : uncompressedBuffers)
- {
- uncompressedBuffer.dispose();
- }
- throw e;
- }
- }
-
- public static Collection<QpidByteBuffer> deflate(Collection<QpidByteBuffer> uncompressedBuffers) throws IOException
- {
- if (uncompressedBuffers == null)
- {
- throw new IllegalArgumentException("uncompressedBuffers cannot be null");
- }
-
- boolean isDirect = false;
- Collection<InputStream> streams = new ArrayList<>(uncompressedBuffers.size());
- for (QpidByteBuffer buffer : uncompressedBuffers)
- {
- isDirect = isDirect || buffer.isDirect();
- streams.add(buffer.asInputStream());
- }
- final int bufferSize = (isDirect && _pooledBufferSize > 0) ? _pooledBufferSize : 65536;
-
- try(QpidByteBufferOutputStream compressedOutput = new QpidByteBufferOutputStream(isDirect, bufferSize);
- InputStream compressedInput = new CompositeInputStream(streams);
- GZIPOutputStream gzipStream = new GZIPOutputStream(new BufferedOutputStream(compressedOutput, bufferSize)))
- {
- byte[] buf = new byte[16384];
- int read;
- while ((read = compressedInput.read(buf)) > -1)
- {
- gzipStream.write(buf, 0, read);
- }
- gzipStream.finish();
- gzipStream.flush();
- return compressedOutput.fetchAccumulatedBuffers();
- }
- }
-
- public static long write(GatheringByteChannel channel, Collection<QpidByteBuffer> qpidByteBuffers)
- throws IOException
- {
- ByteBuffer[] byteBuffers = new ByteBuffer[qpidByteBuffers.size()];
- Iterator<QpidByteBuffer> iterator = qpidByteBuffers.iterator();
- for (int i = 0; i < byteBuffers.length; i++)
- {
- byteBuffers[i] = iterator.next().getUnderlyingBuffer();
- }
- return channel.write(byteBuffers);
- }
-
- public static QpidByteBuffer wrap(final ByteBuffer wrap)
- {
- return new QpidByteBuffer(new NonPooledByteBufferRef(wrap));
- }
-
- public static QpidByteBuffer wrap(final byte[] data)
- {
- return wrap(ByteBuffer.wrap(data));
- }
-
- public static QpidByteBuffer wrap(final byte[] data, int offset, int length)
- {
- return wrap(ByteBuffer.wrap(data, offset, length));
- }
-
- static void returnToPool(final ByteBuffer buffer)
- {
- buffer.clear();
- final ByteBuffer duplicate = _zeroed.duplicate();
- duplicate.limit(buffer.capacity());
- buffer.put(duplicate);
-
- _bufferPool.returnBuffer(buffer);
- }
-
- public synchronized static void initialisePool(int bufferSize, int maxPoolSize)
- {
- if (_isPoolInitialized && (bufferSize != _pooledBufferSize || maxPoolSize != _bufferPool.getMaxSize()))
- {
- final String errorMessage = String.format(
- "QpidByteBuffer pool has already been initialised with bufferSize=%d and maxPoolSize=%d." +
- "Re-initialisation with different bufferSize=%d and maxPoolSize=%d is not allowed.",
- _pooledBufferSize,
- _bufferPool.getMaxSize(),
- bufferSize,
- maxPoolSize);
- throw new IllegalStateException(errorMessage);
- }
- if (bufferSize <= 0)
- {
- throw new IllegalArgumentException("Negative or zero bufferSize illegal : " + bufferSize);
- }
-
- _bufferPool = new BufferPool(maxPoolSize);
- _pooledBufferSize = bufferSize;
- _zeroed = ByteBuffer.allocateDirect(_pooledBufferSize);
- _isPoolInitialized = true;
- }
-
- public static int getPooledBufferSize()
- {
- return _pooledBufferSize;
- }
-
- private static final class BufferInputStream extends InputStream
- {
- private final QpidByteBuffer _qpidByteBuffer;
-
- private BufferInputStream(final QpidByteBuffer buffer)
- {
- _qpidByteBuffer = buffer;
- }
-
- @Override
- public int read() throws IOException
- {
- if (_qpidByteBuffer.hasRemaining())
- {
- return _qpidByteBuffer.get() & 0xFF;
- }
- return -1;
- }
-
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException
- {
- if (!_qpidByteBuffer.hasRemaining())
- {
- return -1;
- }
- if (_qpidByteBuffer.remaining() < len)
- {
- len = _qpidByteBuffer.remaining();
- }
- _qpidByteBuffer.get(b, off, len);
-
- return len;
- }
-
- @Override
- public void mark(int readlimit)
- {
- _qpidByteBuffer.mark();
- }
-
- @Override
- public void reset() throws IOException
- {
- _qpidByteBuffer.reset();
- }
-
- @Override
- public boolean markSupported()
- {
- return true;
- }
-
- @Override
- public long skip(long n) throws IOException
- {
- _qpidByteBuffer.position(_qpidByteBuffer.position() + (int) n);
- return n;
- }
-
- @Override
- public int available() throws IOException
- {
- return _qpidByteBuffer.remaining();
- }
-
- @Override
- public void close()
- {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferInputStream.java b/client/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferInputStream.java
deleted file mode 100644
index 54ed85a..0000000
--- a/client/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferInputStream.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.bytebuffer;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-
-import org.apache.qpid.streams.CompositeInputStream;
-
-/**
- * InputStream implementation that takes a list QpidByteBuffers.
- * The QpidByteBufferInputStream takes ownership of the buffers and disposes them on close().
- *
- * Not thread safe.
- */
-public class QpidByteBufferInputStream extends InputStream
-{
- private final CompositeInputStream _compositeInputStream;
- private final Collection<QpidByteBuffer> _buffers;
-
- public QpidByteBufferInputStream(Collection<QpidByteBuffer> buffers)
- {
- _buffers = buffers;
-
- final Collection<InputStream> streams = new ArrayList<>(buffers.size());
- for (QpidByteBuffer buffer : buffers)
- {
- streams.add(buffer.asInputStream());
- }
- _compositeInputStream = new CompositeInputStream(streams);
- }
-
- @Override
- public int read() throws IOException
- {
- return _compositeInputStream.read();
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException
- {
- return _compositeInputStream.read(b, off, len);
- }
-
- @Override
- public void mark(int readlimit)
- {
- _compositeInputStream.mark(readlimit);
- }
-
- @Override
- public void reset() throws IOException
- {
- _compositeInputStream.reset();
- }
-
- @Override
- public boolean markSupported()
- {
- return _compositeInputStream.markSupported();
- }
-
- @Override
- public long skip(long n) throws IOException
- {
- return _compositeInputStream.skip(n);
- }
-
- @Override
- public int available() throws IOException
- {
- return _compositeInputStream.available();
- }
-
- @Override
- public void close() throws IOException
- {
- try
- {
- _compositeInputStream.close();
- }
- finally
- {
- for (QpidByteBuffer buffer : _buffers)
- {
- buffer.dispose();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStream.java b/client/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStream.java
deleted file mode 100644
index e772809..0000000
--- a/client/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStream.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.bytebuffer;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedList;
-
-/**
- * OutputStream implementation that yields a list QpidByteBuffers that contain a copy
- * of the incoming bytes. Use fetchAccumulatedBuffers to get the buffers. Caller
- * has responsibility to dispose the buffers after use.
- *
- * It will be normally be desirable to front this stream with java.io.BufferedOutputStream
- * to minimise the number of write and thus the number of buffers created.
- *
- * Not thread safe.
- */
-public class QpidByteBufferOutputStream extends OutputStream
-{
- private final LinkedList<QpidByteBuffer> _buffers = new LinkedList<>();
- private final boolean _isDirect;
- private final int _maximumBufferSize;
- private boolean _closed;
-
- public QpidByteBufferOutputStream(final boolean isDirect, final int maximumBufferSize)
- {
- if (maximumBufferSize <= 0)
- {
- throw new IllegalArgumentException("Negative or zero maximumBufferSize illegal : " + maximumBufferSize);
- }
- _isDirect = isDirect;
- _maximumBufferSize = maximumBufferSize;
- }
-
- @Override
- public void write(int b) throws IOException
- {
- int size = 1;
- byte[] data = new byte[] {(byte)b};
- allocateDataBuffers(data, 0, size);
- }
-
- @Override
- public void write(byte[] data) throws IOException
- {
- write(data, 0, data.length);
- }
-
- @Override
- public void write(byte[] data, int offset, int len) throws IOException
- {
- allocateDataBuffers(data, offset, len);
- }
-
- @Override
- public void close() throws IOException
- {
- _closed = true;
- for (QpidByteBuffer buffer : _buffers)
- {
- buffer.dispose();
- }
- _buffers.clear();
- }
-
- public Collection<QpidByteBuffer> fetchAccumulatedBuffers()
- {
- Collection<QpidByteBuffer> bufs = new ArrayList<>(_buffers);
- _buffers.clear();
- return bufs;
- }
-
- private void allocateDataBuffers(byte[] data, int offset, int len) throws IOException
- {
- if (_closed)
- {
- throw new IOException("Stream is closed");
- }
-
- int size = Math.min(_maximumBufferSize, len);
-
- QpidByteBuffer current = _isDirect ? QpidByteBuffer.allocateDirect(len) : QpidByteBuffer.allocate(len);
- current.put(data, offset, size);
- current.flip();
- _buffers.add(current);
- if (len > size)
- {
- allocateDataBuffers(data, offset + size, len - size);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
index a7dd2dc..41166e0 100644
--- a/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
+++ b/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
@@ -38,7 +38,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.AMQMessageDelegate_0_8;
import org.apache.qpid.client.message.AbstractJMSMessage;
@@ -239,7 +238,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
final int headerLength = contentHeaderProperties.getPropertyListSize() + 2;
byte[] unencryptedBytes = new byte[headerLength + size];
- QpidByteBuffer output = QpidByteBuffer.wrap(unencryptedBytes);
+ ByteBuffer output = ByteBuffer.wrap(unencryptedBytes);
output.putShort((short) (contentHeaderProperties.getPropertyFlags() & 0xffff));
contentHeaderProperties.writePropertyListPayload(output);
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
index c4cf72a..8bb9178 100644
--- a/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
+++ b/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
@@ -24,7 +24,6 @@ import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
-import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@@ -34,7 +33,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession_0_8;
import org.apache.qpid.client.AMQTopic;
@@ -78,7 +76,7 @@ public abstract class AbstractJMSMessageFactory
_logger.debug("Non-fragmented message body (bodySize=" + contentHeader.getBodySize() + ")");
}
- data = ((ContentBody) bodies.get(0)).getPayload().asByteBuffer().duplicate();
+ data = ((ContentBody) bodies.get(0)).getPayload().duplicate();
}
else if (bodies != null)
{
@@ -93,7 +91,7 @@ public abstract class AbstractJMSMessageFactory
while (it.hasNext())
{
ContentBody cb = (ContentBody) it.next();
- final ByteBuffer payload = cb.getPayload().asByteBuffer().duplicate();
+ final ByteBuffer payload = cb.getPayload().duplicate();
if (payload.isDirect() || payload.isReadOnly())
{
data.put(payload);
@@ -133,24 +131,16 @@ public abstract class AbstractJMSMessageFactory
protected AbstractJMSMessage create010MessageWithBody(long messageNbr, MessageProperties msgProps,
DeliveryProperties deliveryProps,
- Collection<QpidByteBuffer> body) throws QpidException
+ ByteBuffer body) throws QpidException
{
ByteBuffer data;
final boolean debug = _logger.isDebugEnabled();
- if (body != null && body.size() != 0)
+ if (body != null && body.remaining() != 0)
{
- int size = 0;
- for(QpidByteBuffer b : body)
- {
- size += b.remaining();
- }
- data = ByteBuffer.allocate(size);
- for(QpidByteBuffer b : body)
- {
- b.get(data);
- }
+ data = ByteBuffer.allocate(body.remaining());
+ data.put(body);
data.flip();
}
else // body == null
@@ -192,7 +182,7 @@ public abstract class AbstractJMSMessageFactory
}
public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, MessageProperties msgProps,
- DeliveryProperties deliveryProps, Collection<QpidByteBuffer> body)
+ DeliveryProperties deliveryProps, ByteBuffer body)
throws JMSException, QpidException
{
final AbstractJMSMessage msg =
@@ -205,7 +195,7 @@ public abstract class AbstractJMSMessageFactory
private class BodyInputStream extends InputStream
{
private final Iterator<ContentBody> _bodiesIter;
- private QpidByteBuffer _currentBuffer;
+ private ByteBuffer _currentBuffer;
public BodyInputStream(final List<ContentBody> bodies)
{
_bodiesIter = bodies.iterator();
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/client/message/Encrypted091MessageFactory.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/client/message/Encrypted091MessageFactory.java b/client/src/main/java/org/apache/qpid/client/message/Encrypted091MessageFactory.java
index 187fbaf..3bae2d6 100644
--- a/client/src/main/java/org/apache/qpid/client/message/Encrypted091MessageFactory.java
+++ b/client/src/main/java/org/apache/qpid/client/message/Encrypted091MessageFactory.java
@@ -39,7 +39,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -143,7 +142,7 @@ public class Encrypted091MessageFactory extends AbstractJMSMessageFactory
BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
int payloadOffset;
- QpidByteBuffer dataInput = QpidByteBuffer.wrap(unencryptedBytes);
+ ByteBuffer dataInput = ByteBuffer.wrap(unencryptedBytes);
payloadOffset = properties.read(dataInput);
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/codec/AMQDecoder.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/client/src/main/java/org/apache/qpid/codec/AMQDecoder.java
index e0f04f7..febf46b 100644
--- a/client/src/main/java/org/apache/qpid/codec/AMQDecoder.java
+++ b/client/src/main/java/org/apache/qpid/codec/AMQDecoder.java
@@ -20,12 +20,21 @@
*/
package org.apache.qpid.codec;
+import java.nio.ByteBuffer;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQProtocolVersionException;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.MethodProcessor;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.protocol.ErrorCodes;
+import org.apache.qpid.util.ByteBufferUtils;
/**
* AMQDecoder delegates the decoding of AMQP either to a data block decoder, or in the case of new connections, to a
@@ -91,7 +100,7 @@ public abstract class AMQDecoder<T extends MethodProcessor>
return _methodProcessor;
}
- protected final int decode(final QpidByteBuffer buf) throws AMQFrameDecodingException
+ protected final int decode(final ByteBuffer buf) throws AMQFrameDecodingException
{
// If this is the first read then we may be getting a protocol initiation back if we tried to negotiate
// an unsupported version
@@ -124,7 +133,7 @@ public abstract class AMQDecoder<T extends MethodProcessor>
return buf.hasRemaining() ? required : 0;
}
- protected int processAMQPFrames(final QpidByteBuffer buf) throws AMQFrameDecodingException
+ protected int processAMQPFrames(final ByteBuffer buf) throws AMQFrameDecodingException
{
final int required = decodable(buf);
if (required == 0)
@@ -134,7 +143,7 @@ public abstract class AMQDecoder<T extends MethodProcessor>
return required;
}
- protected int decodable(final QpidByteBuffer in) throws AMQFrameDecodingException
+ protected int decodable(final ByteBuffer in) throws AMQFrameDecodingException
{
final int remainingAfterAttributes = in.remaining() - FRAME_HEADER_SIZE;
// type, channel, body length and end byte
@@ -160,13 +169,13 @@ public abstract class AMQDecoder<T extends MethodProcessor>
}
- protected void processInput(final QpidByteBuffer in)
+ protected void processInput(final ByteBuffer in)
throws AMQFrameDecodingException, AMQProtocolVersionException
{
final byte type = in.get();
- final int channel = in.getUnsignedShort();
- final long bodySize = in.getUnsignedInt();
+ final int channel = ByteBufferUtils.getUnsignedShort(in);
+ final long bodySize = ByteBufferUtils.getUnsignedInt(in);
// bodySize can be zero
if ((channel < 0) || (bodySize < 0))
@@ -188,7 +197,7 @@ public abstract class AMQDecoder<T extends MethodProcessor>
}
- protected void processFrame(final int channel, final byte type, final long bodySize, final QpidByteBuffer in)
+ protected void processFrame(final int channel, final byte type, final long bodySize, final ByteBuffer in)
throws AMQFrameDecodingException
{
switch (type)
@@ -211,9 +220,7 @@ public abstract class AMQDecoder<T extends MethodProcessor>
}
- abstract void processMethod(int channelId,
- QpidByteBuffer in)
- throws AMQFrameDecodingException;
+ abstract void processMethod(int channelId, ByteBuffer in) throws AMQFrameDecodingException;
AMQFrameDecodingException newUnknownMethodException(final int classId,
final int methodId,
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/codec/ClientDecoder.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/codec/ClientDecoder.java b/client/src/main/java/org/apache/qpid/codec/ClientDecoder.java
index fe9cbb4..9d31dab 100644
--- a/client/src/main/java/org/apache/qpid/codec/ClientDecoder.java
+++ b/client/src/main/java/org/apache/qpid/codec/ClientDecoder.java
@@ -22,12 +22,11 @@ package org.apache.qpid.codec;
import java.nio.ByteBuffer;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.framing.*;
public class ClientDecoder extends AMQDecoder<ClientMethodProcessor<? extends ClientChannelMethodProcessor>>
{
- private QpidByteBuffer _incompleteBuffer;
+ private ByteBuffer _incompleteBuffer;
/**
* Creates a new AMQP decoder.
@@ -43,14 +42,12 @@ public class ClientDecoder extends AMQDecoder<ClientMethodProcessor<? extends Cl
{
if (_incompleteBuffer == null)
{
- QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(incomingBuffer);
- final int required = decode(qpidByteBuffer);
+ final int required = decode(incomingBuffer);
if (required != 0)
{
- _incompleteBuffer = QpidByteBuffer.allocate(qpidByteBuffer.remaining() + required);
- _incompleteBuffer.put(qpidByteBuffer);
+ _incompleteBuffer = ByteBuffer.allocate(incomingBuffer.remaining() + required);
+ _incompleteBuffer.put(incomingBuffer);
}
- qpidByteBuffer.dispose();
}
else
{
@@ -61,33 +58,29 @@ public class ClientDecoder extends AMQDecoder<ClientMethodProcessor<? extends Cl
else
{
_incompleteBuffer.flip();
- final QpidByteBuffer aggregatedBuffer =
- QpidByteBuffer.allocate(_incompleteBuffer.remaining() + incomingBuffer.remaining());
+ final ByteBuffer aggregatedBuffer =
+ ByteBuffer.allocate(_incompleteBuffer.remaining() + incomingBuffer.remaining());
aggregatedBuffer.put(_incompleteBuffer);
aggregatedBuffer.put(incomingBuffer);
aggregatedBuffer.flip();
final int required = decode(aggregatedBuffer);
- _incompleteBuffer.dispose();
if (required != 0)
{
- _incompleteBuffer = QpidByteBuffer.allocate(aggregatedBuffer.remaining() + required);
+ _incompleteBuffer = ByteBuffer.allocate(aggregatedBuffer.remaining() + required);
_incompleteBuffer.put(aggregatedBuffer);
}
else
{
_incompleteBuffer = null;
}
- aggregatedBuffer.dispose();
}
}
// post-condition: assert(!incomingBuffer.hasRemaining());
}
@Override
- void processMethod(int channelId,
- QpidByteBuffer in)
- throws AMQFrameDecodingException
+ void processMethod(int channelId, ByteBuffer in) throws AMQFrameDecodingException
{
ClientMethodProcessor<? extends ClientChannelMethodProcessor> methodProcessor = getMethodProcessor();
ClientChannelMethodProcessor channelMethodProcessor = methodProcessor.getChannelMethodProcessor(channelId);
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/codec/ServerDecoder.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/codec/ServerDecoder.java b/client/src/main/java/org/apache/qpid/codec/ServerDecoder.java
deleted file mode 100644
index 98b3caf..0000000
--- a/client/src/main/java/org/apache/qpid/codec/ServerDecoder.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.codec;
-
-import java.io.IOException;
-
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.framing.*;
-
-public class ServerDecoder extends AMQDecoder<ServerMethodProcessor<? extends ServerChannelMethodProcessor>>
-{
-
- /**
- * Creates a new AMQP decoder.
- *
- * @param methodProcessor method processor
- */
- public ServerDecoder(final ServerMethodProcessor<? extends ServerChannelMethodProcessor> methodProcessor)
- {
- super(true, methodProcessor);
- }
-
- public void decodeBuffer(QpidByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
- {
- decode(buf);
- }
-
-
- void processMethod(int channelId,
- QpidByteBuffer in)
- throws AMQFrameDecodingException
- {
- ServerMethodProcessor<? extends ServerChannelMethodProcessor> methodProcessor = getMethodProcessor();
- final int classAndMethod = in.getInt();
- int classId = classAndMethod >> 16;
- int methodId = classAndMethod & 0xFFFF;
- methodProcessor.setCurrentMethod(classId, methodId);
- try
- {
- switch (classAndMethod)
- {
- //CONNECTION_CLASS:
- case 0x000a000b:
- ConnectionStartOkBody.process(in, methodProcessor);
- break;
- case 0x000a0015:
- ConnectionSecureOkBody.process(in, methodProcessor);
- break;
- case 0x000a001f:
- ConnectionTuneOkBody.process(in, methodProcessor);
- break;
- case 0x000a0028:
- ConnectionOpenBody.process(in, methodProcessor);
- break;
- case 0x000a0032:
- if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v0_8))
- {
- throw newUnknownMethodException(classId, methodId,
- methodProcessor.getProtocolVersion());
- }
- else
- {
- ConnectionCloseBody.process(in, methodProcessor);
- }
- break;
- case 0x000a0033:
- if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v0_8))
- {
- throw newUnknownMethodException(classId, methodId,
- methodProcessor.getProtocolVersion());
- }
- else
- {
- methodProcessor.receiveConnectionCloseOk();
- }
- break;
- case 0x000a003c:
- if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v0_8))
- {
- ConnectionCloseBody.process(in, methodProcessor);
- }
- else
- {
- throw newUnknownMethodException(classId, methodId,
- methodProcessor.getProtocolVersion());
- }
- break;
- case 0x000a003d:
- if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v0_8))
- {
- methodProcessor.receiveConnectionCloseOk();
- }
- else
- {
- throw newUnknownMethodException(classId, methodId,
- methodProcessor.getProtocolVersion());
- }
- break;
-
- // CHANNEL_CLASS:
-
- case 0x0014000a:
- ChannelOpenBody.process(channelId, in, methodProcessor);
- break;
- case 0x00140014:
- ChannelFlowBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x00140015:
- ChannelFlowOkBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x00140028:
- ChannelCloseBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x00140029:
- methodProcessor.getChannelMethodProcessor(channelId).receiveChannelCloseOk();
- break;
-
- // ACCESS_CLASS:
-
- case 0x001e000a:
- AccessRequestBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
-
- // EXCHANGE_CLASS:
-
- case 0x0028000a:
- ExchangeDeclareBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x00280014:
- ExchangeDeleteBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x00280016:
- ExchangeBoundBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
-
-
- // QUEUE_CLASS:
-
- case 0x0032000a:
- QueueDeclareBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x00320014:
- QueueBindBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x0032001e:
- QueuePurgeBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x00320028:
- QueueDeleteBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x00320032:
- QueueUnbindBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
-
-
- // BASIC_CLASS:
-
- case 0x003c000a:
- BasicQosBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x003c0014:
- BasicConsumeBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x003c001e:
- BasicCancelBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x003c0028:
- BasicPublishBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x003c0046:
- BasicGetBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x003c0050:
- BasicAckBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x003c005a:
- BasicRejectBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x003c0064:
- BasicRecoverBody.process(in, methodProcessor.getProtocolVersion(),
- methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x003c0066:
- BasicRecoverSyncBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x003c006e:
- BasicRecoverSyncBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x003c0078:
- BasicNackBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
-
- // CONFIRM CLASS:
-
- case 0x0055000a:
- ConfirmSelectBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
-
- // TX_CLASS:
-
- case 0x005a000a:
- if(!methodProcessor.getChannelMethodProcessor(channelId).ignoreAllButCloseOk())
- {
- methodProcessor.getChannelMethodProcessor(channelId).receiveTxSelect();
- }
- break;
- case 0x005a0014:
- if(!methodProcessor.getChannelMethodProcessor(channelId).ignoreAllButCloseOk())
- {
- methodProcessor.getChannelMethodProcessor(channelId).receiveTxCommit();
- }
- break;
- case 0x005a001e:
- if(!methodProcessor.getChannelMethodProcessor(channelId).ignoreAllButCloseOk())
- {
- methodProcessor.getChannelMethodProcessor(channelId).receiveTxRollback();
- }
- break;
-
-
- default:
- throw newUnknownMethodException(classId, methodId,
- methodProcessor.getProtocolVersion());
-
- }
- }
- finally
- {
- methodProcessor.setCurrentMethod(0, 0);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/AMQFrame.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/AMQFrame.java b/client/src/main/java/org/apache/qpid/framing/AMQFrame.java
index e838dd4..1a13795 100644
--- a/client/src/main/java/org/apache/qpid/framing/AMQFrame.java
+++ b/client/src/main/java/org/apache/qpid/framing/AMQFrame.java
@@ -20,8 +20,10 @@
*/
package org.apache.qpid.framing;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.ByteBufferUtils;
public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
{
@@ -48,7 +50,7 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
}
- private static final QpidByteBuffer FRAME_END_BYTE_BUFFER = QpidByteBuffer.allocateDirect(1);
+ private static final ByteBuffer FRAME_END_BYTE_BUFFER = ByteBuffer.allocate(1);
static
{
FRAME_END_BYTE_BUFFER.put(FRAME_END_BYTE);
@@ -58,14 +60,13 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
@Override
public long writePayload(final ByteBufferSender sender)
{
- QpidByteBuffer frameHeader = QpidByteBuffer.allocate(sender.isDirectBufferPreferred(), HEADER_SIZE);
+ ByteBuffer frameHeader = ByteBuffer.allocate(HEADER_SIZE);
frameHeader.put(_bodyFrame.getFrameType());
- frameHeader.putUnsignedShort(_channel);
- frameHeader.putUnsignedInt((long) _bodyFrame.getSize());
+ ByteBufferUtils.putUnsignedShort(frameHeader, _channel);
+ ByteBufferUtils.putUnsignedInt(frameHeader, _bodyFrame.getSize());
frameHeader.flip();
sender.send(frameHeader);
- frameHeader.dispose();
long size = 8 + _bodyFrame.writePayload(sender);
sender.send(FRAME_END_BYTE_BUFFER.duplicate());
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java b/client/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
index a765288..8ecda94 100644
--- a/client/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
+++ b/client/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
@@ -20,13 +20,15 @@
*/
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.ByteBufferUtils;
public abstract class AMQMethodBodyImpl implements AMQMethodBody
{
@@ -74,17 +76,16 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody
{
final int size = getSize();
- QpidByteBuffer buf = QpidByteBuffer.allocate(sender.isDirectBufferPreferred(), size);
- buf.putUnsignedShort(getClazz());
- buf.putUnsignedShort(getMethod());
+ ByteBuffer buf = ByteBuffer.allocate(size);
+ ByteBufferUtils.putUnsignedShort(buf, getClazz());
+ ByteBufferUtils.putUnsignedShort(buf, getMethod());
writeMethodPayload(buf);
buf.flip();
sender.send(buf);
- buf.dispose();
return size;
}
- abstract protected void writeMethodPayload(QpidByteBuffer buffer);
+ abstract protected void writeMethodPayload(ByteBuffer buffer);
protected int getSizeOf(AMQShortString string)
@@ -92,18 +93,18 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody
return EncodingUtils.encodedShortStringLength(string);
}
- protected void writeByte(QpidByteBuffer buffer, byte b)
+ protected void writeByte(ByteBuffer buffer, byte b)
{
buffer.put(b);
}
- protected void writeAMQShortString(QpidByteBuffer buffer, AMQShortString string)
+ protected void writeAMQShortString(ByteBuffer buffer, AMQShortString string)
{
EncodingUtils.writeShortStringBytes(buffer, string);
}
- protected void writeInt(QpidByteBuffer buffer, int i)
+ protected void writeInt(ByteBuffer buffer, int i)
{
buffer.putInt(i);
}
@@ -114,12 +115,12 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody
return EncodingUtils.encodedFieldTableLength(table); //To change body of created methods use File | Settings | File Templates.
}
- protected void writeFieldTable(QpidByteBuffer buffer, FieldTable table)
+ protected void writeFieldTable(ByteBuffer buffer, FieldTable table)
{
EncodingUtils.writeFieldTableBytes(buffer, table);
}
- protected void writeLong(QpidByteBuffer buffer, long l)
+ protected void writeLong(ByteBuffer buffer, long l)
{
buffer.putLong(l);
}
@@ -130,34 +131,34 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody
return (response == null) ? 4 : response.length + 4;
}
- protected void writeBytes(QpidByteBuffer buffer, byte[] data)
+ protected void writeBytes(ByteBuffer buffer, byte[] data)
{
EncodingUtils.writeBytes(buffer,data);
}
- protected void writeShort(QpidByteBuffer buffer, short s)
+ protected void writeShort(ByteBuffer buffer, short s)
{
buffer.putShort(s);
}
- protected void writeBitfield(QpidByteBuffer buffer, byte bitfield0)
+ protected void writeBitfield(ByteBuffer buffer, byte bitfield0)
{
buffer.put(bitfield0);
}
- protected void writeUnsignedShort(QpidByteBuffer buffer, int s)
+ protected void writeUnsignedShort(ByteBuffer buffer, int s)
{
- buffer.putUnsignedShort(s);
+ ByteBufferUtils.putUnsignedShort(buffer, s);
}
- protected void writeUnsignedInteger(QpidByteBuffer buffer, long i)
+ protected void writeUnsignedInteger(ByteBuffer buffer, long i)
{
- buffer.putUnsignedInt(i);
+ ByteBufferUtils.putUnsignedInt(buffer, i);
}
- protected void writeUnsignedByte(QpidByteBuffer buffer, short unsignedByte)
+ protected void writeUnsignedByte(ByteBuffer buffer, short unsignedByte)
{
- buffer.putUnsignedByte(unsignedByte);
+ ByteBufferUtils.putUnsignedByte(buffer, unsignedByte);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/AMQShortString.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/AMQShortString.java b/client/src/main/java/org/apache/qpid/framing/AMQShortString.java
index a2ce6fd..9d2ba59 100644
--- a/client/src/main/java/org/apache/qpid/framing/AMQShortString.java
+++ b/client/src/main/java/org/apache/qpid/framing/AMQShortString.java
@@ -28,8 +28,6 @@ import java.util.Arrays;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-
/**
* A short string is a representation of an AMQ Short String
* Short strings differ from the Java String class by being limited to on ASCII characters (0-127)
@@ -123,34 +121,6 @@ public final class AMQShortString implements Comparable<AMQShortString>
}
}
- public static AMQShortString readAMQShortString(QpidByteBuffer buffer)
- {
- int length = ((int) buffer.get()) & 0xff;
- if(length == 0)
- {
- return null;
- }
- else
- {
- if (length > MAX_LENGTH)
- {
- throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
- }
- if(length > buffer.remaining())
- {
- throw new IllegalArgumentException("Cannot create AMQShortString with length "
- + length + " from a ByteBuffer with only "
- + buffer.remaining()
- + " bytes.");
-
- }
- byte[] data = new byte[length];
- buffer.get(data);
- return new AMQShortString(data, 0, length);
- }
- }
-
-
public AMQShortString(byte[] data, final int offset, final int length)
{
if (length > MAX_LENGTH)
@@ -197,7 +167,7 @@ public final class AMQShortString implements Comparable<AMQShortString>
}
}
- public void writeToBuffer(QpidByteBuffer buffer)
+ public void writeToBuffer(ByteBuffer buffer)
{
final int size = length();
buffer.put((byte)size);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org