You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by lq...@apache.org on 2017/11/10 15:50:48 UTC
[1/4] qpid-jms-amqp-0-x git commit: QPID-7725: [Java Client,
AMQP 0-x] Remove QpidByteBuffer
Repository: qpid-jms-amqp-0-x
Updated Branches:
refs/heads/master 8443f85a5 -> 95203e5d1
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStreamTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStreamTest.java b/client/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStreamTest.java
deleted file mode 100644
index c7b421e..0000000
--- a/client/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStreamTest.java
+++ /dev/null
@@ -1,114 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.bytebuffer;
-
-import java.util.Collection;
-import java.util.Iterator;
-
-import junit.framework.TestCase;
-import org.junit.Assert;
-
-public class QpidByteBufferOutputStreamTest extends TestCase
-{
- public void testWriteByteByByte() throws Exception
- {
- boolean direct = false;
- QpidByteBufferOutputStream stream = new QpidByteBufferOutputStream(direct, 3);
- stream.write('a');
- stream.write('b');
-
- Collection<QpidByteBuffer> bufs = stream.fetchAccumulatedBuffers();
- assertEquals("Unexpected number of buffers", 2, bufs.size());
- Iterator<QpidByteBuffer> bufItr = bufs.iterator();
-
- QpidByteBuffer buf1 = bufItr.next();
- assertBufferContent("1st buffer", buf1, "a".getBytes(), direct);
-
-
- QpidByteBuffer buf2 = bufItr.next();
- assertBufferContent("2nd buffer", buf2, "b".getBytes(), direct);
- }
-
- public void testWriteByteArrays() throws Exception
- {
- boolean direct = false;
- QpidByteBufferOutputStream stream = new QpidByteBufferOutputStream(direct, 8);
- stream.write("abcd".getBytes(), 0, 4);
- stream.write("_ef_".getBytes(), 1, 2);
-
- Collection<QpidByteBuffer> bufs = stream.fetchAccumulatedBuffers();
- assertEquals("Unexpected number of buffers", 2, bufs.size());
- Iterator<QpidByteBuffer> bufItr = bufs.iterator();
-
- QpidByteBuffer buf1 = bufItr.next();
- assertBufferContent("1st buffer", buf1, "abcd".getBytes(), direct);
-
- QpidByteBuffer buf2 = bufItr.next();
- assertBufferContent("2nd buffer", buf2, "ef".getBytes(), direct);
- }
-
- public void testWriteMixed() throws Exception
- {
- boolean direct = true;
- QpidByteBufferOutputStream stream = new QpidByteBufferOutputStream(direct, 3);
- stream.write('a');
- stream.write("bcd".getBytes());
-
- Collection<QpidByteBuffer> bufs = stream.fetchAccumulatedBuffers();
- assertEquals("Unexpected number of buffers", 2, bufs.size());
- Iterator<QpidByteBuffer> bufItr = bufs.iterator();
-
- QpidByteBuffer buf1 = bufItr.next();
- assertBufferContent("1st buffer", buf1, "a".getBytes(), direct);
-
- QpidByteBuffer buf2 = bufItr.next();
- assertBufferContent("2nd buffer", buf2, "bcd".getBytes(), direct);
- }
-
-
- public void testWriteByteArrays_ArrayTooLargeForSingleBuffer() throws Exception
- {
- boolean direct = false;
- QpidByteBufferOutputStream stream = new QpidByteBufferOutputStream(direct, 8);
- stream.write("abcdefghi".getBytes());
-
- Collection<QpidByteBuffer> bufs = stream.fetchAccumulatedBuffers();
- assertEquals("Unexpected number of buffers", 2, bufs.size());
- Iterator<QpidByteBuffer> bufItr = bufs.iterator();
-
- QpidByteBuffer buf1 = bufItr.next();
- assertBufferContent("1st buffer", buf1, "abcdefgh".getBytes(), direct);
-
- QpidByteBuffer buf2 = bufItr.next();
- assertBufferContent("2nd buffer", buf2, "i".getBytes(), direct);
- }
-
- private void assertBufferContent(String bufName, QpidByteBuffer buf, byte[] expectedBytes, final boolean direct)
- {
- assertEquals(bufName + " has unexpected number of bytes", expectedBytes.length, buf.remaining());
- byte[] copy = new byte[buf.remaining()];
- buf.get(copy);
- Assert.assertArrayEquals(bufName + " has unexpected content", expectedBytes, copy);
- assertEquals(bufName + " has unexpected type", direct, buf.isDirect());
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferTest.java b/client/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferTest.java
deleted file mode 100644
index 67f5634..0000000
--- a/client/src/test/java/org/apache/qpid/bytebuffer/QpidByteBufferTest.java
+++ /dev/null
@@ -1,917 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.bytebuffer;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.nio.BufferOverflowException;
-import java.nio.BufferUnderflowException;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-
-import com.google.common.io.ByteStreams;
-import org.junit.Assert;
-import org.mockito.internal.util.Primitives;
-
-import org.apache.qpid.test.utils.QpidTestCase;
-import org.apache.qpid.util.ByteBufferUtils;
-
-public class QpidByteBufferTest extends QpidTestCase
-{
- private static final int BUFFER_SIZE = 10;
- private static final int POOL_SIZE = 20;
-
-
- private QpidByteBuffer _slicedBuffer;
- private QpidByteBuffer _parent;
-
- @Override
- protected void setUp() throws Exception
- {
- super.setUp();
- QpidByteBuffer.initialisePool(BUFFER_SIZE, POOL_SIZE);
- _parent = QpidByteBuffer.allocateDirect(BUFFER_SIZE);
- }
-
- @Override
- public void tearDown() throws Exception
- {
- super.tearDown();
- _parent.dispose();
- if (_slicedBuffer != null)
- {
- _slicedBuffer.dispose();
- }
- }
-
- public void testPutGetByIndex() throws Exception
- {
- testPutGetByIndex(double.class, 1.0);
- testPutGetByIndex(float.class, 1.0f);
- testPutGetByIndex(long.class, 1L);
- testPutGetByIndex(int.class, 1);
- testPutGetByIndex(char.class, 'A');
- testPutGetByIndex(short.class, (short)1);
- testPutGetByIndex(byte.class, (byte)1);
- }
-
- public void testPutGet() throws Exception
- {
- testPutGet(double.class, false, 1.0);
- testPutGet(float.class, false, 1.0f);
- testPutGet(long.class, false, 1L);
- testPutGet(int.class, false, 1);
- testPutGet(char.class, false, 'A');
- testPutGet(short.class, false, (short)1);
- testPutGet(byte.class, false, (byte)1);
-
- testPutGet(int.class, true, 1L);
- testPutGet(short.class, true, 1);
- testPutGet(byte.class, true, (short)1);
- }
-
- public void testMarkReset() throws Exception
- {
- _slicedBuffer = createSlice();
-
- _slicedBuffer.mark();
- _slicedBuffer.position(_slicedBuffer.position() + 1);
- assertEquals("Unexpected position after move", 1, _slicedBuffer.position());
-
- _slicedBuffer.reset();
- assertEquals("Unexpected position after reset", 0, _slicedBuffer.position());
- }
-
- public void testPosition() throws Exception
- {
- _slicedBuffer = createSlice();
-
- assertEquals("Unexpected position for new slice", 0, _slicedBuffer.position());
-
- _slicedBuffer.position(1);
- assertEquals("Unexpected position after advance", 1, _slicedBuffer.position());
-
- final int oldLimit = _slicedBuffer.limit();
- _slicedBuffer.limit(oldLimit - 1);
- try
- {
- _slicedBuffer.position(oldLimit);
- fail("Exception not thrown");
- }
- catch (IllegalArgumentException e)
- {
- // pass
- }
- }
-
- public void testBulkPutGet() throws Exception
- {
- _slicedBuffer = createSlice();
-
- final byte[] source = getTestBytes(_slicedBuffer.remaining());
-
- QpidByteBuffer rv = _slicedBuffer.put(source, 0, source.length);
- assertEquals("Unexpected builder return value", _slicedBuffer, rv);
-
- _slicedBuffer.flip();
- byte[] target = new byte[_slicedBuffer.remaining()];
- rv = _slicedBuffer.get(target, 0, target.length);
- assertEquals("Unexpected builder return value", _slicedBuffer, rv);
-
- Assert.assertArrayEquals("Unexpected bulk put/get result", source, target);
-
-
- _slicedBuffer.clear();
- _slicedBuffer.position(1);
-
- try
- {
- _slicedBuffer.put(source, 0, source.length);
- fail("Exception not thrown");
- }
- catch (BufferOverflowException e)
- {
- // pass
- }
-
- assertEquals("Position should be unchanged after failed put", 1, _slicedBuffer.position());
-
- try
- {
- _slicedBuffer.get(target, 0, target.length);
- fail("Exception not thrown");
- }
- catch (BufferUnderflowException e)
- {
- // pass
- }
-
- assertEquals("Position should be unchanged after failed get", 1, _slicedBuffer.position());
-
-
- }
-
- public void testByteBufferPutGet()
- {
- _slicedBuffer = createSlice();
- final byte[] source = getTestBytes(_slicedBuffer.remaining());
-
- ByteBuffer sourceByteBuffer = ByteBuffer.wrap(source);
-
- QpidByteBuffer rv = _slicedBuffer.put(sourceByteBuffer);
- assertEquals("Unexpected builder return value", _slicedBuffer, rv);
-
- assertEquals("Unexpected position", _slicedBuffer.capacity(), _slicedBuffer.position());
- assertEquals("Unexpected remaining", 0, _slicedBuffer.remaining());
-
- assertEquals("Unexpected remaining in source ByteBuffer", 0, sourceByteBuffer.remaining());
-
- _slicedBuffer.flip();
-
- ByteBuffer destinationByteBuffer = ByteBuffer.allocate(source.length);
- _slicedBuffer.get(destinationByteBuffer);
-
-
- assertEquals("Unexpected remaining", 0, _slicedBuffer.remaining());
-
- assertEquals("Unexpected remaining in destination ByteBuffer", 0, destinationByteBuffer.remaining());
- assertEquals("Unexpected position in destination ByteBuffer", source.length, destinationByteBuffer.position());
-
- Assert.assertArrayEquals("Unexpected ByteBuffer put/get result", source, destinationByteBuffer.array());
-
- _slicedBuffer.clear();
- _slicedBuffer.position(1);
-
- sourceByteBuffer.clear();
- try
- {
- _slicedBuffer.put(sourceByteBuffer);
- fail("Exception should be thrown");
- }
- catch(BufferOverflowException e)
- {
- // pass
- }
-
- assertEquals("Position should not be changed after failed put", 1, _slicedBuffer.position());
- assertEquals("Source position should not changed after failed put", source.length, sourceByteBuffer.remaining());
-
- _slicedBuffer.clear();
- destinationByteBuffer.position(1);
-
- try
- {
- _slicedBuffer.get(destinationByteBuffer);
- fail("Exception should be thrown");
- }
- catch(BufferUnderflowException e )
- {
- // pass
- }
- }
-
- public void testQpidByteBufferPutGet()
- {
- _slicedBuffer = createSlice();
- final byte[] source = getTestBytes(_slicedBuffer.remaining());
-
- QpidByteBuffer sourceQpidByteBuffer = QpidByteBuffer.wrap(source);
-
- QpidByteBuffer rv = _slicedBuffer.put(sourceQpidByteBuffer);
- assertEquals("Unexpected builder return value", _slicedBuffer, rv);
-
- assertEquals("Unexpected position", _slicedBuffer.capacity(), _slicedBuffer.position());
- assertEquals("Unexpected remaining", 0, _slicedBuffer.remaining());
-
- assertEquals("Unexpected remaining in source QpidByteBuffer", 0, sourceQpidByteBuffer.remaining());
-
- _slicedBuffer.flip();
-
- ByteBuffer destinationByteBuffer = ByteBuffer.allocate(source.length);
- _slicedBuffer.get(destinationByteBuffer);
-
- assertEquals("Unexpected remaining", 0, _slicedBuffer.remaining());
-
- assertEquals("Unexpected remaining in destination ByteBuffer", 0, destinationByteBuffer.remaining());
- assertEquals("Unexpected position in destination ByteBuffer", source.length, destinationByteBuffer.position());
-
- Assert.assertArrayEquals("Unexpected ByteBuffer put/get result", source, destinationByteBuffer.array());
-
- _slicedBuffer.clear();
- _slicedBuffer.position(1);
-
- sourceQpidByteBuffer.clear();
- try
- {
- _slicedBuffer.put(sourceQpidByteBuffer);
- fail("Exception should be thrown");
- }
- catch(BufferOverflowException e)
- {
- // pass
- }
-
- assertEquals("Position should not be changed after failed put", 1, _slicedBuffer.position());
- assertEquals("Source position should not changed after failed put", source.length, sourceQpidByteBuffer.remaining());
- }
-
- public void testDuplicate()
- {
- _slicedBuffer = createSlice();
- _slicedBuffer.position(1);
- int originalLimit = _slicedBuffer.limit();
- _slicedBuffer.limit(originalLimit - 1);
-
- QpidByteBuffer duplicate = _slicedBuffer.duplicate();
- try
- {
- assertEquals("Unexpected position", _slicedBuffer.position(), duplicate.position() );
- assertEquals("Unexpected limit", _slicedBuffer.limit(), duplicate.limit() );
- assertEquals("Unexpected capacity", _slicedBuffer.capacity(), duplicate.capacity() );
-
- duplicate.position(2);
- duplicate.limit(originalLimit - 2);
-
- assertEquals("Unexpected position in the original", 1, _slicedBuffer.position());
- assertEquals("Unexpected limit in the original", originalLimit -1, _slicedBuffer.limit());
- }
- finally
- {
- duplicate.dispose();
- }
- }
-
- public void testCopyToByteBuffer()
- {
- _slicedBuffer = createSlice();
- byte[] source = getTestBytes(_slicedBuffer.remaining());
- _slicedBuffer.put(source);
- _slicedBuffer.flip();
-
- int originalRemaining = _slicedBuffer.remaining();
- ByteBuffer destination = ByteBuffer.allocate(source.length);
- _slicedBuffer.copyTo(destination);
-
- assertEquals("Unexpected remaining in original QBB", originalRemaining, _slicedBuffer.remaining());
- assertEquals("Unexpected remaining in destination", 0, destination.remaining());
-
- Assert.assertArrayEquals("Unexpected copyTo result", source, destination.array());
- }
-
- public void testCopyToArray()
- {
- _slicedBuffer = createSlice();
- byte[] source = getTestBytes(_slicedBuffer.remaining());
- _slicedBuffer.put(source);
- _slicedBuffer.flip();
-
- int originalRemaining = _slicedBuffer.remaining();
- byte[] destination = new byte[source.length];
- _slicedBuffer.copyTo(destination);
-
- assertEquals("Unexpected remaining in original QBB", originalRemaining, _slicedBuffer.remaining());
-
- Assert.assertArrayEquals("Unexpected copyTo result", source, destination);
- }
-
- public void testPutCopyOf()
- {
- _slicedBuffer = createSlice();
- byte[] source = getTestBytes(_slicedBuffer.remaining());
-
- QpidByteBuffer sourceQpidByteBuffer = QpidByteBuffer.wrap(source);
- _slicedBuffer.putCopyOf(sourceQpidByteBuffer);
-
- assertEquals("Copied buffer should not be changed", source.length, sourceQpidByteBuffer.remaining());
- assertEquals("Buffer should be full", 0, _slicedBuffer.remaining());
- _slicedBuffer.flip();
-
- byte[] destination = new byte[source.length];
- _slicedBuffer.get(destination);
-
- Assert.assertArrayEquals("Unexpected putCopyOf result", source, destination);
- }
-
- public void testCompact()
- {
- _slicedBuffer = createSlice();
- byte[] source = getTestBytes(_slicedBuffer.remaining());
- _slicedBuffer.put(source);
-
- _slicedBuffer.position(1);
- _slicedBuffer.limit(_slicedBuffer.limit() - 1);
-
- int remaining = _slicedBuffer.remaining();
- _slicedBuffer.compact();
-
- assertEquals("Unexpected position", remaining, _slicedBuffer.position());
- assertEquals("Unexpected limit", _slicedBuffer.capacity(), _slicedBuffer.limit());
-
- _slicedBuffer.flip();
-
-
- byte[] destination = new byte[_slicedBuffer.remaining()];
- _slicedBuffer.get(destination);
-
- byte[] expected = new byte[source.length - 2];
- System.arraycopy(source, 1, expected, 0, expected.length);
-
- Assert.assertArrayEquals("Unexpected compact result", expected, destination);
- }
-
- public void testSliceOfSlice()
- {
- _slicedBuffer = createSlice();
- byte[] source = getTestBytes(_slicedBuffer.remaining());
- _slicedBuffer.put(source);
-
- _slicedBuffer.position(1);
- _slicedBuffer.limit(_slicedBuffer.limit() - 1);
-
- int remaining = _slicedBuffer.remaining();
- QpidByteBuffer newSlice = _slicedBuffer.slice();
- try
- {
- assertEquals("Unexpected position in original", 1, _slicedBuffer.position());
- assertEquals("Unexpected limit in original", source.length - 1, _slicedBuffer.limit());
- assertEquals("Unexpected position", 0, newSlice.position());
- assertEquals("Unexpected limit", remaining, newSlice.limit());
- assertEquals("Unexpected capacity", remaining, newSlice.capacity());
-
- byte[] destination = new byte[newSlice.remaining()];
- newSlice.get(destination);
-
- byte[] expected = new byte[source.length - 2];
- System.arraycopy(source, 1, expected, 0, expected.length);
- Assert.assertArrayEquals("Unexpected slice result", expected, destination);
- }
- finally
- {
- newSlice.dispose();
- }
- }
-
- public void testViewOfSlice()
- {
- _slicedBuffer = createSlice();
- byte[] source = getTestBytes(_slicedBuffer.remaining());
- _slicedBuffer.put(source);
-
- _slicedBuffer.position(1);
- _slicedBuffer.limit(_slicedBuffer.limit() - 1);
-
- QpidByteBuffer view = _slicedBuffer.view(0, _slicedBuffer.remaining());
- try
- {
- assertEquals("Unexpected position in original", 1, _slicedBuffer.position());
- assertEquals("Unexpected limit in original", source.length - 1, _slicedBuffer.limit());
-
- assertEquals("Unexpected position", 0, view.position());
- assertEquals("Unexpected limit", _slicedBuffer.remaining(), view.limit());
- assertEquals("Unexpected capacity", _slicedBuffer.remaining(), view.capacity());
-
- byte[] destination = new byte[view.remaining()];
- view.get(destination);
-
- byte[] expected = new byte[source.length - 2];
- System.arraycopy(source, 1, expected, 0, expected.length);
- Assert.assertArrayEquals("Unexpected view result", expected, destination);
- }
- finally
- {
- view.dispose();
- }
-
- view = _slicedBuffer.view(1, _slicedBuffer.remaining() - 2);
- try
- {
- assertEquals("Unexpected position in original", 1, _slicedBuffer.position());
- assertEquals("Unexpected limit in original", source.length - 1, _slicedBuffer.limit());
-
- assertEquals("Unexpected position", 0, view.position());
- assertEquals("Unexpected limit", _slicedBuffer.remaining() - 2, view.limit());
- assertEquals("Unexpected capacity", _slicedBuffer.remaining() - 2, view.capacity());
-
- byte[] destination = new byte[view.remaining()];
- view.get(destination);
-
- byte[] expected = new byte[source.length - 4];
- System.arraycopy(source, 2, expected, 0, expected.length);
- Assert.assertArrayEquals("Unexpected view result", expected, destination);
- }
- finally
- {
- view.dispose();
- }
- }
-
- public void testAsInputStream() throws Exception
- {
- _slicedBuffer = createSlice();
- byte[] source = getTestBytes(_slicedBuffer.remaining());
- _slicedBuffer.put(source);
-
- _slicedBuffer.position(1);
- _slicedBuffer.limit(_slicedBuffer.limit() - 1);
-
- ByteArrayOutputStream destination = new ByteArrayOutputStream();
- try(InputStream is = _slicedBuffer.asInputStream())
- {
- ByteStreams.copy(is, destination);
- }
-
- byte[] expected = new byte[source.length - 2];
- System.arraycopy(source, 1, expected, 0, expected.length);
- Assert.assertArrayEquals("Unexpected view result", expected, destination.toByteArray());
- }
-
- public void testAsByteBuffer() throws Exception
- {
- _slicedBuffer = createSlice();
-
- _slicedBuffer.position(1);
- _slicedBuffer.limit(_slicedBuffer.limit() - 1);
-
- _slicedBuffer.mark();
- int remaining = _slicedBuffer.remaining();
- byte[] source = getTestBytes(remaining);
- _slicedBuffer.put(source);
- _slicedBuffer.reset();
-
- ByteBuffer buffer = _slicedBuffer.asByteBuffer();
- assertEquals("Unexpected remaining", remaining, buffer.remaining());
-
- byte[] target = new byte[remaining];
- buffer.get(target);
- Assert.assertArrayEquals("Unexpected asByteBuffer result", source, target);
- }
-
- public void testDecode()
- {
- _slicedBuffer = createSlice();
- final String input = "ABC";
- _slicedBuffer.put(input.getBytes());
- _slicedBuffer.flip();
-
- final CharBuffer charBuffer = _slicedBuffer.decode(StandardCharsets.US_ASCII);
- final char[] destination = new char[charBuffer.remaining()];
- charBuffer.get(destination);
- Assert.assertArrayEquals("Unexpected char buffer", input.toCharArray(), destination);
- }
-
- private byte[] getTestBytes(final int length)
- {
- final byte[] source = new byte[length];
- for (int i = 0; i < source.length; i++)
- {
- source[i] = (byte) ('A' + i);
- }
- return source;
- }
-
- private QpidByteBuffer createSlice()
- {
- _parent.position(1);
- _parent.limit(_parent.capacity() - 1);
-
- return _parent.slice();
- }
-
- private void testPutGet(final Class<?> primitiveTargetClass, final boolean unsigned, final Object value) throws Exception
- {
- int size = sizeof(primitiveTargetClass);
-
- _parent.position(1);
- _parent.limit(size + 1);
-
- _slicedBuffer = _parent.slice();
- _parent.limit(_parent.capacity());
-
- assertEquals("Unexpected position ", 0, _slicedBuffer.position());
- assertEquals("Unexpected limit ", size, _slicedBuffer.limit());
- assertEquals("Unexpected capacity ", size, _slicedBuffer.capacity());
-
- String methodSuffix = getMethodSuffix(primitiveTargetClass, unsigned);
- Method put = _slicedBuffer.getClass().getMethod("put" + methodSuffix, Primitives.primitiveTypeOf(value.getClass()));
- Method get = _slicedBuffer.getClass().getMethod("get" + methodSuffix);
-
-
- _slicedBuffer.mark();
- QpidByteBuffer rv = (QpidByteBuffer) put.invoke(_slicedBuffer, value);
- assertEquals("Unexpected builder return value for type " + methodSuffix, _slicedBuffer, rv);
-
- assertEquals("Unexpected position for type " + methodSuffix, size, _slicedBuffer.position());
-
- try
- {
- invokeMethod(put, value);
- fail("BufferOverflowException should be thrown for put with insufficient room for " + methodSuffix);
- }
- catch (BufferOverflowException e)
- {
- // pass
- }
-
- _slicedBuffer.reset();
-
- assertEquals("Unexpected position after reset", 0, _slicedBuffer.position());
-
- Object retrievedValue = get.invoke(_slicedBuffer);
- assertEquals("Unexpected value retrieved from get method for " + methodSuffix, value, retrievedValue);
- try
- {
- invokeMethod(get);
- fail("BufferUnderflowException not thrown for get with insufficient room for " + methodSuffix);
- }
- catch (BufferUnderflowException ite)
- {
- // pass
- }
- }
-
- private void testPutGetByIndex(final Class<?> primitiveTargetClass, Object value) throws Exception
- {
- int size = sizeof(primitiveTargetClass);
-
- _parent.position(1);
- _parent.limit(size + 1);
-
- _slicedBuffer = _parent.slice();
- _parent.limit(_parent.capacity());
-
- String methodSuffix = getMethodSuffix(primitiveTargetClass, false);
- Method put = _slicedBuffer.getClass().getMethod("put" + methodSuffix, int.class, primitiveTargetClass);
- Method get = _slicedBuffer.getClass().getMethod("get" + methodSuffix, int.class);
-
- QpidByteBuffer rv = (QpidByteBuffer) put.invoke(_slicedBuffer, 0, value);
- assertEquals("Unexpected builder return value for type " + methodSuffix, _slicedBuffer, rv);
-
- Object retrievedValue = get.invoke(_slicedBuffer, 0);
- assertEquals("Unexpected value retrieved from index get method for " + methodSuffix, value, retrievedValue);
-
- try
- {
- invokeMethod(put, 1, value);
- fail("IndexOutOfBoundsException not thrown for indexed " + methodSuffix + " put");
- }
- catch (IndexOutOfBoundsException ite)
- {
- // pass
- }
-
- try
- {
- invokeMethod(put, -1, value);
- fail("IndexOutOfBoundsException not thrown for indexed " + methodSuffix + " put with negative index");
- }
- catch (IndexOutOfBoundsException ite)
- {
- // pass
- }
-
- try
- {
- invokeMethod(get, 1);
- fail("IndexOutOfBoundsException not thrown for indexed " + methodSuffix + " get");
- }
- catch (IndexOutOfBoundsException ite)
- {
- // pass
- }
-
- try
- {
- invokeMethod(get, -1);
- fail("IndexOutOfBoundsException not thrown for indexed " + methodSuffix + " get with negative index");
- }
- catch (IndexOutOfBoundsException ite)
- {
- // pass
- }
- }
-
- private void invokeMethod(final Method method, final Object... value)
- throws Exception
- {
- try
- {
- method.invoke(_slicedBuffer, value);
- }
- catch (InvocationTargetException e)
- {
- Throwable cause = e.getCause();
- if (cause instanceof Exception)
- {
- throw (Exception)cause;
- }
- fail(String.format("Unexpected throwable on method %s invocation: %s", method.getName(), cause));
- }
- }
-
-
- private String getMethodSuffix(final Class<?> target, final boolean unsigned)
- {
- StringBuilder name = new StringBuilder();
- if (unsigned)
- {
- name.append("Unsigned");
- }
- if ((!target.isAssignableFrom(byte.class) || unsigned))
- {
- String simpleName = target.getSimpleName();
- name.append(simpleName.substring(0, 1).toUpperCase()).append(simpleName.substring(1));
- }
-
- return name.toString();
- }
-
- private int sizeof(final Class<?> type)
- {
- if (type.isAssignableFrom(double.class))
- {
- return 8;
- }
- else if (type.isAssignableFrom(float.class))
- {
- return 4;
- }
- else if (type.isAssignableFrom(long.class))
- {
- return 8;
- }
- else if (type.isAssignableFrom(int.class))
- {
- return 4;
- }
- else if (type.isAssignableFrom(short.class))
- {
- return 2;
- }
- else if (type.isAssignableFrom(char.class))
- {
- return 2;
- }
- else if (type.isAssignableFrom(byte.class))
- {
- return 1;
- }
- else
- {
- throw new UnsupportedOperationException("Unexpected type " + type);
- }
- }
-
- public void testPooledBufferIsZeroedLoan() throws Exception
- {
- QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(BUFFER_SIZE);
-
- buffer.put((byte) 0xFF);
- buffer.dispose();
-
- buffer = QpidByteBuffer.allocateDirect(BUFFER_SIZE);
- buffer.limit(1);
- assertEquals("Pooled QpidByteBuffer is not zeroed.", (byte) 0x0, buffer.get());
- }
-
- public void testAllocateDirectOfSameSize() throws Exception
- {
- int bufferSize = BUFFER_SIZE;
- QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(bufferSize);
- assertEquals("Unexpected buffer size", bufferSize, buffer.capacity());
- assertEquals("Unexpected position on newly created buffer", 0, buffer.position());
- assertEquals("Unexpected limit on newly created buffer", bufferSize, buffer.limit());
- }
-
- public void testAllocateDirectOfSmallerSize() throws Exception
- {
- int bufferSize = BUFFER_SIZE - 1;
- QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(bufferSize);
- assertEquals("Unexpected buffer size", bufferSize, buffer.capacity());
- assertEquals("Unexpected position on newly created buffer", 0, buffer.position());
- assertEquals("Unexpected limit on newly created buffer", bufferSize, buffer.limit());
- }
-
- public void testAllocateDirectOfLargerSize() throws Exception
- {
- int bufferSize = BUFFER_SIZE + 1;
- QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(bufferSize);
- assertEquals("Unexpected buffer size", bufferSize, buffer.capacity());
- assertEquals("Unexpected position on newly created buffer", 0, buffer.position());
- assertEquals("Unexpected limit on newly created buffer", bufferSize, buffer.limit());
- }
-
- public void testAllocateDirectWithNegativeSize() throws Exception
- {
- try
- {
- QpidByteBuffer.allocateDirect(-1);
- fail("It is not legal to create buffer with negative size.");
- }
- catch (IllegalArgumentException e)
- {
- // pass
- }
- }
-
- public void testSettingUpPoolTwice() throws Exception
- {
- try
- {
- QpidByteBuffer.initialisePool(BUFFER_SIZE + 1, POOL_SIZE + 1);
- fail("It is not legal to initialize buffer twice with different settings.");
- }
- catch (IllegalStateException e)
- {
- // pass
- }
- }
-
- public void testDeflateInflateDirect() throws Exception
- {
- byte[] input = "aaabbbcccddddeeeffff".getBytes();
- Collection<QpidByteBuffer> inputBufs = QpidByteBuffer.allocateDirectCollection(input.length);
-
- int offset = 0;
- for (QpidByteBuffer buf : inputBufs)
- {
- int len = buf.remaining();
- buf.put(input, offset, len);
- buf.flip();
- offset += len;
- }
- assertEquals(input.length, ByteBufferUtils.remaining(inputBufs));
-
- doDeflateInflate(input, inputBufs, true);
- }
-
- public void testDeflateInflateHeap() throws Exception
- {
- byte[] input = "aaabbbcccddddeeeffff".getBytes();
- Collection<QpidByteBuffer> inputBufs = Collections.singleton(QpidByteBuffer.wrap(input));
-
- doDeflateInflate(input, inputBufs, false);
- }
-
- public void testInflatingUncompressedBytes_ThrowsZipException() throws Exception
- {
- byte[] input = "not_a_compressed_stream".getBytes();
- QpidByteBuffer original = QpidByteBuffer.wrap(input);
-
- try
- {
- QpidByteBuffer.inflate(Collections.singleton(original));
- fail("Exception not thrown");
- }
- catch(java.util.zip.ZipException ze)
- {
- // PASS
- }
- }
-
- public void testSlice() throws Exception
- {
- QpidByteBuffer directBuffer = QpidByteBuffer.allocate(true, 6);
- directBuffer.position(2);
- directBuffer.limit(5);
- QpidByteBuffer directSlice = directBuffer.slice();
-
- assertTrue("Direct slice should be direct too", directSlice.isDirect());
- assertEquals("Unexpected capacity", 3, directSlice.capacity());
- assertEquals("Unexpected limit", 3, directSlice.limit());
- assertEquals("Unexpected position", 0, directSlice.position());
-
- directBuffer.dispose();
- directSlice.dispose();
- }
-
- public void testView() throws Exception
- {
- byte[] content = "ABCDEF".getBytes();
- QpidByteBuffer buffer = QpidByteBuffer.allocate(true, content.length);
- buffer.put(content);
- buffer.position(2);
- buffer.limit(5);
-
- QpidByteBuffer view = buffer.view(0, buffer.remaining());
-
- assertTrue("Unexpected view direct", view.isDirect());
-
- assertEquals("Unexpected capacity", 3, view.capacity());
- assertEquals("Unexpected limit", 3, view.limit());
- assertEquals("Unexpected position", 0, view.position());
-
- byte[] destination = new byte[view.remaining()];
- view.get(destination);
-
- Assert.assertArrayEquals("CDE".getBytes(), destination);
-
- QpidByteBuffer viewWithOffset = buffer.view(1, 1);
- destination = new byte[viewWithOffset.remaining()];
- viewWithOffset.get(destination);
-
- Assert.assertArrayEquals("D".getBytes(), destination);
-
- buffer.dispose();
- view.dispose();
- viewWithOffset.dispose();
- }
-
- private void doDeflateInflate(byte[] input,
- Collection<QpidByteBuffer> inputBufs,
- boolean direct) throws IOException
- {
- Collection<QpidByteBuffer> deflatedBufs = QpidByteBuffer.deflate(inputBufs);
- assertNotNull(deflatedBufs);
-
- Collection<QpidByteBuffer> inflatedBufs = QpidByteBuffer.inflate(deflatedBufs);
- assertNotNull(inflatedBufs);
- assertTrue("Expected at least on buffer", inflatedBufs.size() >= 1);
-
- int bufNum = 1;
- int inputOffset = 0;
- int inflatedBytesTotal = 0;
- for(QpidByteBuffer inflatedBuf : inflatedBufs)
- {
- assertEquals("Inflated buf " + bufNum + " is of wrong type", direct, inflatedBuf.isDirect());
-
- int inflatedBytesCount = inflatedBuf.remaining();
- inflatedBytesTotal += inflatedBytesCount;
-
- byte[] inflatedBytes = new byte[inflatedBytesCount];
- inflatedBuf.get(inflatedBytes);
- byte[] expectedBytes = Arrays.copyOfRange(input, inputOffset, inputOffset + inflatedBytes.length);
- Assert.assertArrayEquals("Inflated buf" + bufNum + " has unexpected content", expectedBytes, inflatedBytes);
-
- inputOffset += inflatedBytes.length;
- bufNum++;
- }
-
- assertEquals("Unexpected number of inflated bytes", input.length, inflatedBytesTotal);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/test/java/org/apache/qpid/client/message/Encrypted091MessageFactoryTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/qpid/client/message/Encrypted091MessageFactoryTest.java b/client/src/test/java/org/apache/qpid/client/message/Encrypted091MessageFactoryTest.java
index 2062900..bbdc3d1 100644
--- a/client/src/test/java/org/apache/qpid/client/message/Encrypted091MessageFactoryTest.java
+++ b/client/src/test/java/org/apache/qpid/client/message/Encrypted091MessageFactoryTest.java
@@ -38,7 +38,6 @@ import javax.crypto.spec.IvParameterSpec;
import javax.crypto.spec.SecretKeySpec;
import javax.security.auth.x500.X500Principal;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -78,7 +77,7 @@ public class Encrypted091MessageFactoryTest extends QpidTestCase
final int headerLength = _props.getPropertyListSize() + 2;
_unencrypted = new byte[headerLength + _data.length];
- QpidByteBuffer output = QpidByteBuffer.wrap(_unencrypted);
+ ByteBuffer output = ByteBuffer.wrap(_unencrypted);
output.putShort((short) (_props.getPropertyFlags() & 0xffff));
_props.writePropertyListPayload(output);
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java b/client/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
index 49505b4..282d3fb 100644
--- a/client/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
+++ b/client/src/test/java/org/apache/qpid/codec/AMQDecoderTest.java
@@ -29,7 +29,6 @@ import java.util.Collection;
import java.util.List;
import java.util.Random;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.framing.AMQBody;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQFrame;
@@ -138,7 +137,7 @@ public class AMQDecoderTest extends QpidTestCase
{
assertEquals(ContentBody.TYPE, ((AMQFrame) frames.get(0)).getBodyFrame().getFrameType());
ContentBody decodedBody = (ContentBody) ((AMQFrame) frames.get(0)).getBodyFrame();
- final ByteBuffer byteBuffer = decodedBody.getPayload().asByteBuffer().duplicate();
+ final ByteBuffer byteBuffer = decodedBody.getPayload().duplicate();
byte[] bodyBytes = new byte[byteBuffer.remaining()];
byteBuffer.get(bodyBytes);
assertTrue("Body was corrupted", Arrays.equals(payload, bodyBytes));
@@ -246,16 +245,10 @@ public class AMQDecoderTest extends QpidTestCase
private static class TestSender implements ByteBufferSender
{
- private final Collection<QpidByteBuffer> _sentBuffers = new ArrayList<>();
+ private final Collection<ByteBuffer> _sentBuffers = new ArrayList<>();
@Override
- public boolean isDirectBufferPreferred()
- {
- return false;
- }
-
- @Override
- public void send(final QpidByteBuffer msg)
+ public void send(final ByteBuffer msg)
{
_sentBuffers.add(msg.duplicate());
msg.position(msg.limit());
@@ -273,7 +266,7 @@ public class AMQDecoderTest extends QpidTestCase
}
- public Collection<QpidByteBuffer> getSentBuffers()
+ public Collection<ByteBuffer> getSentBuffers()
{
return _sentBuffers;
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java b/client/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java
index c6c870a..2a46dcf 100644
--- a/client/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java
+++ b/client/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java
@@ -20,7 +20,8 @@
*/
package org.apache.qpid.framing;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.test.utils.QpidTestCase;
@@ -78,7 +79,7 @@ public class BasicContentHeaderPropertiesTest extends QpidTestCase
public void testPopulatePropertiesFromBuffer() throws Exception
{
- QpidByteBuffer buf = QpidByteBuffer.wrap(new byte[300]);
+ ByteBuffer buf = ByteBuffer.wrap(new byte[300]);
_testProperties.populatePropertiesFromBuffer(buf, 99, 99);
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/test/java/org/apache/qpid/framing/EncodingUtilsTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/qpid/framing/EncodingUtilsTest.java b/client/src/test/java/org/apache/qpid/framing/EncodingUtilsTest.java
index dd81e25..e55f000 100644
--- a/client/src/test/java/org/apache/qpid/framing/EncodingUtilsTest.java
+++ b/client/src/test/java/org/apache/qpid/framing/EncodingUtilsTest.java
@@ -20,41 +20,27 @@
package org.apache.qpid.framing;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.test.utils.QpidTestCase;
public class EncodingUtilsTest extends QpidTestCase
{
private static final int BUFFER_SIZE = 10;
- private static final int POOL_SIZE = 20;
- private QpidByteBuffer _buffer;
+ private ByteBuffer _buffer;
@Override
protected void setUp() throws Exception
{
super.setUp();
- QpidByteBuffer.initialisePool(BUFFER_SIZE, POOL_SIZE);
- _buffer = QpidByteBuffer.allocateDirect(BUFFER_SIZE);
- }
-
- @Override
- public void tearDown() throws Exception
- {
- try
- {
- _buffer.dispose();
- }
- finally
- {
- super.tearDown();
- }
+ _buffer = ByteBuffer.allocate(BUFFER_SIZE);
}
public void testReadLongAsShortStringWhenDigitsAreSpecified() throws Exception
{
- _buffer.putUnsignedByte((short)3);
+ _buffer.put((byte)3);
_buffer.put((byte)'9');
_buffer.put((byte)'2');
_buffer.put((byte)'0');
@@ -64,7 +50,7 @@ public class EncodingUtilsTest extends QpidTestCase
public void testReadLongAsShortStringWhenNonDigitCharacterIsSpecified() throws Exception
{
- _buffer.putUnsignedByte((short)2);
+ _buffer.put((byte)2);
_buffer.put((byte)'1');
_buffer.put((byte)'a');
_buffer.flip();
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/test/java/org/apache/qpid/framing/FieldTableTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/qpid/framing/FieldTableTest.java b/client/src/test/java/org/apache/qpid/framing/FieldTableTest.java
index 508617b..f759ddc 100644
--- a/client/src/test/java/org/apache/qpid/framing/FieldTableTest.java
+++ b/client/src/test/java/org/apache/qpid/framing/FieldTableTest.java
@@ -21,16 +21,16 @@
package org.apache.qpid.framing;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.test.utils.QpidTestCase;
import org.junit.Assert;
import org.apache.qpid.AMQPInvalidClassException;
+import org.apache.qpid.test.utils.QpidTestCase;
public class FieldTableTest extends QpidTestCase
{
@@ -465,7 +465,7 @@ public class FieldTableTest extends QpidTestCase
outerTable.setFieldTable("innerTable", innerTable);
// Write the outer table into the buffer.
- QpidByteBuffer buf = QpidByteBuffer.allocate(EncodingUtils.encodedFieldTableLength(outerTable));
+ ByteBuffer buf = ByteBuffer.allocate(EncodingUtils.encodedFieldTableLength(outerTable));
outerTable.writeToBuffer(buf);
@@ -594,7 +594,7 @@ public class FieldTableTest extends QpidTestCase
table.setString("null-string", null);
- QpidByteBuffer buf = QpidByteBuffer.allocate((int) table.getEncodedSize() + 4);
+ ByteBuffer buf = ByteBuffer.allocate((int) table.getEncodedSize() + 4);
table.writeToBuffer(buf);
buf.flip();
@@ -916,7 +916,7 @@ public class FieldTableTest extends QpidTestCase
assertEquals("unexpected data length", 24, length);
//Create a second FieldTable from the encoded bytes
- FieldTable tableFromBytes = new FieldTable(QpidByteBuffer.wrap(data));
+ FieldTable tableFromBytes = new FieldTable(ByteBuffer.wrap(data));
//Create a final FieldTable and addAll() from the table created with encoded bytes
FieldTable destinationTable = new FieldTable();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[4/4] qpid-jms-amqp-0-x git commit: QPID-7725: [Java Client,
AMQP 0-x] Remove QpidByteBuffer
Posted by lq...@apache.org.
QPID-7725: [Java Client, AMQP 0-x] Remove QpidByteBuffer
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/commit/95203e5d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/tree/95203e5d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/diff/95203e5d
Branch: refs/heads/master
Commit: 95203e5d1228ff5e84bb7993cb92d6212b1b476a
Parents: 8443f85
Author: Lorenz Quack <lq...@apache.org>
Authored: Fri Nov 10 15:50:19 2017 +0000
Committer: Lorenz Quack <lq...@apache.org>
Committed: Fri Nov 10 15:50:19 2017 +0000
----------------------------------------------------------------------
.../org/apache/qpid/bytebuffer/BufferPool.java | 54 --
.../apache/qpid/bytebuffer/ByteBufferRef.java | 34 -
.../qpid/bytebuffer/NonPooledByteBufferRef.java | 57 --
.../qpid/bytebuffer/PooledByteBufferRef.java | 70 --
.../apache/qpid/bytebuffer/QpidByteBuffer.java | 884 ------------------
.../bytebuffer/QpidByteBufferInputStream.java | 109 ---
.../bytebuffer/QpidByteBufferOutputStream.java | 113 ---
.../qpid/client/BasicMessageProducer_0_8.java | 3 +-
.../message/AbstractJMSMessageFactory.java | 26 +-
.../message/Encrypted091MessageFactory.java | 3 +-
.../java/org/apache/qpid/codec/AMQDecoder.java | 31 +-
.../org/apache/qpid/codec/ClientDecoder.java | 23 +-
.../org/apache/qpid/codec/ServerDecoder.java | 251 -----
.../java/org/apache/qpid/framing/AMQFrame.java | 13 +-
.../apache/qpid/framing/AMQMethodBodyImpl.java | 41 +-
.../org/apache/qpid/framing/AMQShortString.java | 32 +-
.../java/org/apache/qpid/framing/AMQType.java | 83 +-
.../org/apache/qpid/framing/AMQTypedValue.java | 17 +-
.../apache/qpid/framing/AccessRequestBody.java | 7 +-
.../qpid/framing/AccessRequestOkBody.java | 10 +-
.../org/apache/qpid/framing/BasicAckBody.java | 7 +-
.../apache/qpid/framing/BasicCancelBody.java | 7 +-
.../apache/qpid/framing/BasicCancelOkBody.java | 7 +-
.../apache/qpid/framing/BasicConsumeBody.java | 10 +-
.../apache/qpid/framing/BasicConsumeOkBody.java | 7 +-
.../framing/BasicContentHeaderProperties.java | 41 +-
.../apache/qpid/framing/BasicDeliverBody.java | 7 +-
.../org/apache/qpid/framing/BasicGetBody.java | 10 +-
.../apache/qpid/framing/BasicGetEmptyBody.java | 7 +-
.../org/apache/qpid/framing/BasicGetOkBody.java | 10 +-
.../org/apache/qpid/framing/BasicNackBody.java | 7 +-
.../apache/qpid/framing/BasicPublishBody.java | 10 +-
.../org/apache/qpid/framing/BasicQosBody.java | 12 +-
.../org/apache/qpid/framing/BasicQosOkBody.java | 5 +-
.../apache/qpid/framing/BasicRecoverBody.java | 7 +-
.../qpid/framing/BasicRecoverSyncBody.java | 7 +-
.../qpid/framing/BasicRecoverSyncOkBody.java | 5 +-
.../apache/qpid/framing/BasicRejectBody.java | 7 +-
.../apache/qpid/framing/BasicReturnBody.java | 10 +-
.../apache/qpid/framing/ChannelAlertBody.java | 10 +-
.../apache/qpid/framing/ChannelCloseBody.java | 14 +-
.../apache/qpid/framing/ChannelCloseOkBody.java | 5 +-
.../apache/qpid/framing/ChannelFlowBody.java | 7 +-
.../apache/qpid/framing/ChannelFlowOkBody.java | 7 +-
.../qpid/framing/ChannelMethodProcessor.java | 4 +-
.../apache/qpid/framing/ChannelOpenBody.java | 7 +-
.../apache/qpid/framing/ChannelOpenOkBody.java | 8 +-
.../apache/qpid/framing/ConfirmSelectBody.java | 7 +-
.../qpid/framing/ConfirmSelectOkBody.java | 5 +-
.../qpid/framing/ConnectionCloseBody.java | 14 +-
.../qpid/framing/ConnectionCloseOkBody.java | 5 +-
.../apache/qpid/framing/ConnectionOpenBody.java | 7 +-
.../qpid/framing/ConnectionOpenOkBody.java | 7 +-
.../qpid/framing/ConnectionRedirectBody.java | 7 +-
.../qpid/framing/ConnectionSecureBody.java | 7 +-
.../qpid/framing/ConnectionSecureOkBody.java | 7 +-
.../qpid/framing/ConnectionStartBody.java | 12 +-
.../qpid/framing/ConnectionStartOkBody.java | 7 +-
.../apache/qpid/framing/ConnectionTuneBody.java | 14 +-
.../qpid/framing/ConnectionTuneOkBody.java | 14 +-
.../org/apache/qpid/framing/ContentBody.java | 23 +-
.../apache/qpid/framing/ContentHeaderBody.java | 39 +-
.../framing/ContentHeaderPropertiesFactory.java | 4 +-
.../org/apache/qpid/framing/EncodingUtils.java | 47 +-
.../apache/qpid/framing/ExchangeBoundBody.java | 7 +-
.../qpid/framing/ExchangeBoundOkBody.java | 11 +-
.../qpid/framing/ExchangeDeclareBody.java | 10 +-
.../qpid/framing/ExchangeDeclareOkBody.java | 5 +-
.../apache/qpid/framing/ExchangeDeleteBody.java | 10 +-
.../qpid/framing/ExchangeDeleteOkBody.java | 5 +-
.../org/apache/qpid/framing/FieldArray.java | 10 +-
.../org/apache/qpid/framing/FieldTable.java | 32 +-
.../framing/FrameCreatingMethodProcessor.java | 5 +-
.../org/apache/qpid/framing/HeartbeatBody.java | 4 +-
.../apache/qpid/framing/ProtocolInitiation.java | 8 +-
.../org/apache/qpid/framing/QueueBindBody.java | 10 +-
.../apache/qpid/framing/QueueBindOkBody.java | 5 +-
.../apache/qpid/framing/QueueDeclareBody.java | 10 +-
.../apache/qpid/framing/QueueDeclareOkBody.java | 12 +-
.../apache/qpid/framing/QueueDeleteBody.java | 10 +-
.../apache/qpid/framing/QueueDeleteOkBody.java | 10 +-
.../org/apache/qpid/framing/QueuePurgeBody.java | 10 +-
.../apache/qpid/framing/QueuePurgeOkBody.java | 10 +-
.../apache/qpid/framing/QueueUnbindBody.java | 10 +-
.../apache/qpid/framing/QueueUnbindOkBody.java | 5 +-
.../org/apache/qpid/framing/TxCommitBody.java | 5 +-
.../org/apache/qpid/framing/TxCommitOkBody.java | 5 +-
.../org/apache/qpid/framing/TxRollbackBody.java | 5 +-
.../apache/qpid/framing/TxRollbackOkBody.java | 5 +-
.../org/apache/qpid/framing/TxSelectBody.java | 5 +-
.../org/apache/qpid/framing/TxSelectOkBody.java | 5 +-
.../apache/qpid/transport/ByteBufferSender.java | 6 +-
.../apache/qpid/transport/MessageTransfer.java | 70 +-
.../java/org/apache/qpid/transport/Method.java | 13 +-
.../apache/qpid/transport/ProtocolHeader.java | 7 +-
.../qpid/transport/network/Assembler.java | 4 +-
.../qpid/transport/network/Disassembler.java | 74 +-
.../qpid/transport/network/io/IoSender.java | 10 +-
.../network/security/sasl/SASLSender.java | 13 +-
.../network/security/ssl/SSLSender.java | 38 +-
.../apache/qpid/transport/util/Functions.java | 55 --
.../org/apache/qpid/util/ByteBufferUtils.java | 86 +-
.../QpidByteBufferOutputStreamTest.java | 114 ---
.../qpid/bytebuffer/QpidByteBufferTest.java | 917 -------------------
.../message/Encrypted091MessageFactoryTest.java | 3 +-
.../org/apache/qpid/codec/AMQDecoderTest.java | 15 +-
.../BasicContentHeaderPropertiesTest.java | 5 +-
.../apache/qpid/framing/EncodingUtilsTest.java | 26 +-
.../org/apache/qpid/framing/FieldTableTest.java | 10 +-
109 files changed, 663 insertions(+), 3364 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/bytebuffer/BufferPool.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/bytebuffer/BufferPool.java b/client/src/main/java/org/apache/qpid/bytebuffer/BufferPool.java
deleted file mode 100644
index cb0b5ba..0000000
--- a/client/src/main/java/org/apache/qpid/bytebuffer/BufferPool.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.bytebuffer;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.ConcurrentLinkedQueue;
-
-class BufferPool
-{
- private final int _maxSize;
- private final ConcurrentLinkedQueue<ByteBuffer> _pooledBuffers = new ConcurrentLinkedQueue<>();
-
- BufferPool(final int maxSize)
- {
- _maxSize = maxSize;
- }
-
- ByteBuffer getBuffer()
- {
- return _pooledBuffers.poll();
- }
-
- void returnBuffer(ByteBuffer buf)
- {
- buf.clear();
- if (_pooledBuffers.size() < _maxSize)
- {
- _pooledBuffers.add(buf);
- }
- }
-
- public int getMaxSize()
- {
- return _maxSize;
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java b/client/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java
deleted file mode 100644
index 30628e4..0000000
--- a/client/src/main/java/org/apache/qpid/bytebuffer/ByteBufferRef.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.bytebuffer;
-
-import java.nio.ByteBuffer;
-
-public interface ByteBufferRef
-{
- void incrementRef();
-
- void decrementRef();
-
- ByteBuffer getBuffer();
-
- void removeFromPool();
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java b/client/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java
deleted file mode 100644
index 6f0b8af..0000000
--- a/client/src/main/java/org/apache/qpid/bytebuffer/NonPooledByteBufferRef.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.bytebuffer;
-
-import java.nio.ByteBuffer;
-
-class NonPooledByteBufferRef implements ByteBufferRef
-{
- private final ByteBuffer _buffer;
-
- NonPooledByteBufferRef(final ByteBuffer buffer)
- {
- _buffer = buffer;
- }
-
- @Override
- public void incrementRef()
- {
-
- }
-
- @Override
- public void decrementRef()
- {
-
- }
-
- @Override
- public ByteBuffer getBuffer()
- {
- return _buffer;
- }
-
- @Override
- public void removeFromPool()
- {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java b/client/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java
deleted file mode 100644
index 807dfe9..0000000
--- a/client/src/main/java/org/apache/qpid/bytebuffer/PooledByteBufferRef.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.bytebuffer;
-
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-
-class PooledByteBufferRef implements ByteBufferRef
-{
- private static final AtomicIntegerFieldUpdater<PooledByteBufferRef> REF_COUNT = AtomicIntegerFieldUpdater.newUpdater(PooledByteBufferRef.class, "_refCount");
-
- private final ByteBuffer _buffer;
- private volatile int _refCount;
-
- PooledByteBufferRef(final ByteBuffer buffer)
- {
- _buffer = buffer;
- }
-
- @Override
- public void incrementRef()
- {
-
- if(REF_COUNT.get(this) >= 0)
- {
- REF_COUNT.incrementAndGet(this);
- }
- }
-
- @Override
- public void decrementRef()
- {
- if(REF_COUNT.get(this) > 0 && REF_COUNT.decrementAndGet(this) == 0)
- {
- QpidByteBuffer.returnToPool(_buffer);
- }
- }
-
- @Override
- public ByteBuffer getBuffer()
- {
- return _buffer.duplicate();
- }
-
- @Override
- public void removeFromPool()
- {
- REF_COUNT.set(this, Integer.MIN_VALUE/2);
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java b/client/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
deleted file mode 100644
index eec2a05..0000000
--- a/client/src/main/java/org/apache/qpid/bytebuffer/QpidByteBuffer.java
+++ /dev/null
@@ -1,884 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.bytebuffer;
-
-import java.io.BufferedOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.nio.BufferOverflowException;
-import java.nio.BufferUnderflowException;
-import java.nio.ByteBuffer;
-import java.nio.CharBuffer;
-import java.nio.channels.GatheringByteChannel;
-import java.nio.channels.ReadableByteChannel;
-import java.nio.charset.Charset;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-import java.util.zip.GZIPInputStream;
-import java.util.zip.GZIPOutputStream;
-
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLEngineResult;
-import javax.net.ssl.SSLException;
-
-import org.apache.qpid.streams.CompositeInputStream;
-
-public class QpidByteBuffer
-{
- private static final AtomicIntegerFieldUpdater<QpidByteBuffer>
- DISPOSED_UPDATER = AtomicIntegerFieldUpdater.newUpdater(
- QpidByteBuffer.class,
- "_disposed");
- private static final ThreadLocal<QpidByteBuffer> _cachedBuffer = new ThreadLocal<>();
- private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = new ByteBuffer[0];
- private volatile static boolean _isPoolInitialized;
- private volatile static BufferPool _bufferPool;
- private volatile static int _pooledBufferSize;
- private volatile static ByteBuffer _zeroed;
- private final int _offset;
-
- final ByteBufferRef _ref;
- volatile ByteBuffer _buffer;
- @SuppressWarnings("unused")
- private volatile int _disposed;
-
-
- QpidByteBuffer(ByteBufferRef ref)
- {
- this(ref, ref.getBuffer(), 0);
- }
-
- private QpidByteBuffer(ByteBufferRef ref, ByteBuffer buffer, int offset)
- {
- _ref = ref;
- _buffer = buffer;
- _offset = offset;
- _ref.incrementRef();
- }
-
- public final boolean isDirect()
- {
- return _buffer.isDirect();
- }
-
- public final short getUnsignedByte()
- {
- return (short) (((short) get()) & 0xFF);
- }
-
- public final int getUnsignedShort()
- {
- return ((int) getShort()) & 0xffff;
- }
-
- public final int getUnsignedShort(int pos)
- {
- return ((int) getShort(pos)) & 0xffff;
- }
-
-
- public final long getUnsignedInt()
- {
- return ((long) getInt()) & 0xffffffffL;
- }
-
- public final QpidByteBuffer putUnsignedByte(final short s)
- {
- put((byte) s);
- return this;
- }
-
- public final QpidByteBuffer putUnsignedShort(final int i)
- {
- putShort((short) i);
- return this;
- }
-
- public final QpidByteBuffer putUnsignedInt(final long value)
- {
- putInt((int) value);
- return this;
- }
-
- public final void dispose()
- {
- if (DISPOSED_UPDATER.compareAndSet(this, 0, 1))
- {
- _ref.decrementRef();
- _buffer = null;
- }
- }
-
- public final InputStream asInputStream()
- {
- return new BufferInputStream(this);
- }
-
- public final ByteBuffer asByteBuffer()
- {
- try
- {
- return getUnderlyingBuffer();
- }
- finally
- {
- dispose();
- }
- }
-
- public final CharBuffer decode(Charset charset)
- {
- return charset.decode(getUnderlyingBuffer());
- }
-
- public final int read(ReadableByteChannel channel) throws IOException
- {
- return channel.read(getUnderlyingBuffer());
- }
-
- public final SSLEngineResult decryptSSL(SSLEngine engine, QpidByteBuffer dest) throws SSLException
- {
- return engine.unwrap(getUnderlyingBuffer(), dest.getUnderlyingBuffer());
- }
-
- @Override
- public String toString()
- {
- return "QpidByteBuffer{" +
- "_buffer=" + _buffer +
- ", _disposed=" + _disposed +
- '}';
- }
-
- public final boolean hasRemaining()
- {
- return _buffer.hasRemaining();
- }
-
- public QpidByteBuffer putInt(final int index, final int value)
- {
- _buffer.putInt(index, value);
- return this;
- }
-
- public QpidByteBuffer putShort(final int index, final short value)
- {
- _buffer.putShort(index, value);
- return this;
- }
-
- public QpidByteBuffer putChar(final int index, final char value)
- {
- _buffer.putChar(index, value);
- return this;
- }
-
- public final QpidByteBuffer put(final byte b)
- {
- _buffer.put(b);
- return this;
- }
-
- public QpidByteBuffer put(final int index, final byte b)
- {
- _buffer.put(index, b);
- return this;
- }
-
- public short getShort(final int index)
- {
- return _buffer.getShort(index);
- }
-
- public final QpidByteBuffer mark()
- {
- _buffer.mark();
- return this;
- }
-
- public final long getLong()
- {
- return _buffer.getLong();
- }
-
- public QpidByteBuffer putFloat(final int index, final float value)
- {
- _buffer.putFloat(index, value);
- return this;
- }
-
- public double getDouble(final int index)
- {
- return _buffer.getDouble(index);
- }
-
- public final boolean hasArray()
- {
- return _buffer.hasArray();
- }
-
- public final double getDouble()
- {
- return _buffer.getDouble();
- }
-
- public final QpidByteBuffer putFloat(final float value)
- {
- _buffer.putFloat(value);
- return this;
- }
-
- public final QpidByteBuffer putInt(final int value)
- {
- _buffer.putInt(value);
- return this;
- }
-
- public byte[] array()
- {
- return _buffer.array();
- }
-
- public final QpidByteBuffer putShort(final short value)
- {
- _buffer.putShort(value);
- return this;
- }
-
- public int getInt(final int index)
- {
- return _buffer.getInt(index);
- }
-
- public final int remaining()
- {
- return _buffer.remaining();
- }
-
- public final QpidByteBuffer put(final byte[] src)
- {
- _buffer.put(src);
- return this;
- }
-
- public final QpidByteBuffer put(final ByteBuffer src)
- {
- _buffer.put(src);
- return this;
- }
-
- public final QpidByteBuffer put(final QpidByteBuffer src)
- {
- int sourceRemaining = src.remaining();
- if (sourceRemaining > remaining())
- {
- throw new BufferOverflowException();
- }
-
- _buffer.put(src.getUnderlyingBuffer());
- return this;
- }
-
- public final QpidByteBuffer get(final byte[] dst, final int offset, final int length)
- {
- _buffer.get(dst, offset, length);
- return this;
- }
-
- public final QpidByteBuffer get(final ByteBuffer dst)
- {
- int destinationRemaining = dst.remaining();
- int remaining = remaining();
- if (destinationRemaining < remaining)
- {
- throw new BufferUnderflowException();
- }
- dst.put(_buffer);
- return this;
- }
-
- public final void copyTo(final ByteBuffer dst)
- {
- dst.put(_buffer.duplicate());
- }
-
- public final void putCopyOf(final QpidByteBuffer source)
- {
- int remaining = remaining();
- int sourceRemaining = source.remaining();
- if (sourceRemaining > remaining)
- {
- throw new BufferOverflowException();
- }
-
- put(source.getUnderlyingBuffer().duplicate());
- }
-
- public QpidByteBuffer rewind()
- {
- _buffer.rewind();
- return this;
- }
-
- public QpidByteBuffer clear()
- {
- _buffer.clear();
- return this;
- }
-
- public QpidByteBuffer putLong(final int index, final long value)
- {
- _buffer.putLong(index, value);
- return this;
- }
-
- public QpidByteBuffer compact()
- {
- _buffer.compact();
- return this;
- }
-
- public final QpidByteBuffer putDouble(final double value)
- {
- _buffer.putDouble(value);
- return this;
- }
-
- public int limit()
- {
- return _buffer.limit();
- }
-
- public QpidByteBuffer reset()
- {
- _buffer.reset();
- return this;
- }
-
- public QpidByteBuffer flip()
- {
- _buffer.flip();
- return this;
- }
-
- public final short getShort()
- {
- return _buffer.getShort();
- }
-
- public final float getFloat()
- {
- return _buffer.getFloat();
- }
-
- public QpidByteBuffer limit(final int newLimit)
- {
- _buffer.limit(newLimit);
- return this;
- }
-
- /**
- * Method does not respect mark.
- *
- * @return QpidByteBuffer
- */
- public QpidByteBuffer duplicate()
- {
- ByteBuffer buffer = _ref.getBuffer();
- if (!(_ref instanceof PooledByteBufferRef))
- {
- buffer = buffer.duplicate();
- }
-
- buffer.position(_offset );
- buffer.limit(_offset + _buffer.capacity());
-
- buffer = buffer.slice();
-
- buffer.limit(_buffer.limit());
- buffer.position(_buffer.position());
- return new QpidByteBuffer(_ref, buffer, _offset);
- }
-
- public final QpidByteBuffer put(final byte[] src, final int offset, final int length)
- {
- _buffer.put(src, offset, length);
- return this;
- }
-
- public long getLong(final int index)
- {
- return _buffer.getLong(index);
- }
-
- public int capacity()
- {
- return _buffer.capacity();
- }
-
- public char getChar(final int index)
- {
- return _buffer.getChar(index);
- }
-
- public final byte get()
- {
- return _buffer.get();
- }
-
- public byte get(final int index)
- {
- return _buffer.get(index);
- }
-
- public final QpidByteBuffer get(final byte[] dst)
- {
- _buffer.get(dst);
- return this;
- }
-
- public final void copyTo(final byte[] dst)
- {
- if (remaining() < dst.length)
- {
- throw new BufferUnderflowException();
- }
- _buffer.duplicate().get(dst);
- }
-
- public final QpidByteBuffer putChar(final char value)
- {
- _buffer.putChar(value);
- return this;
- }
-
- public QpidByteBuffer position(final int newPosition)
- {
- _buffer.position(newPosition);
- return this;
- }
-
- public int arrayOffset()
- {
- return _buffer.arrayOffset();
- }
-
- public final char getChar()
- {
- return _buffer.getChar();
- }
-
- public final int getInt()
- {
- return _buffer.getInt();
- }
-
- public final QpidByteBuffer putLong(final long value)
- {
- _buffer.putLong(value);
- return this;
- }
-
- public float getFloat(final int index)
- {
- return _buffer.getFloat(index);
- }
-
- public QpidByteBuffer slice()
- {
- return view(0, _buffer.remaining());
- }
-
- public QpidByteBuffer view(int offset, int length)
- {
- ByteBuffer buffer = _ref.getBuffer();
- if (!(_ref instanceof PooledByteBufferRef))
- {
- buffer = buffer.duplicate();
- }
-
- int newRemaining = Math.min(_buffer.remaining() - offset, length);
-
- int newPosition = _offset + _buffer.position() + offset;
- buffer.limit(newPosition + newRemaining);
- buffer.position(newPosition);
-
- buffer = buffer.slice();
-
- return new QpidByteBuffer(_ref, buffer, newPosition);
- }
-
- public int position()
- {
- return _buffer.position();
- }
-
- public QpidByteBuffer putDouble(final int index, final double value)
- {
- _buffer.putDouble(index, value);
- return this;
- }
-
- ByteBuffer getUnderlyingBuffer()
- {
- return _buffer;
- }
-
- public static QpidByteBuffer allocate(boolean direct, int size)
- {
- return direct ? allocateDirect(size) : allocate(size);
- }
-
- public static QpidByteBuffer allocate(int size)
- {
- return new QpidByteBuffer(new NonPooledByteBufferRef(ByteBuffer.allocate(size)));
- }
-
- public static QpidByteBuffer allocateDirect(int size)
- {
- if (size < 0)
- {
- throw new IllegalArgumentException("Cannot allocate QpidByteBuffer with size "
- + size
- + " which is negative.");
- }
-
- final ByteBufferRef ref;
- if (_isPoolInitialized && _pooledBufferSize >= size)
- {
- if (_pooledBufferSize == size)
- {
- ByteBuffer buf = _bufferPool.getBuffer();
- if (buf == null)
- {
- buf = ByteBuffer.allocateDirect(size);
- }
- ref = new PooledByteBufferRef(buf);
- }
- else
- {
- QpidByteBuffer buf = _cachedBuffer.get();
- if (buf == null || buf.remaining() < size)
- {
- if (buf != null)
- {
- buf.dispose();
- }
- buf = allocateDirect(_pooledBufferSize);
- }
- QpidByteBuffer rVal = buf.view(0, size);
- buf.position(buf.position() + size);
-
- _cachedBuffer.set(buf);
- return rVal;
- }
- }
- else
- {
- ref = new NonPooledByteBufferRef(ByteBuffer.allocateDirect(size));
- }
- return new QpidByteBuffer(ref);
- }
-
- public static Collection<QpidByteBuffer> allocateDirectCollection(int size)
- {
- if (_pooledBufferSize == 0)
- {
- return Collections.singleton(allocateDirect(size));
- }
- else
- {
- List<QpidByteBuffer> buffers = new ArrayList<>((size / _pooledBufferSize) + 2);
- int remaining = size;
-
- QpidByteBuffer buf = _cachedBuffer.get();
- if (buf == null)
- {
- buf = allocateDirect(_pooledBufferSize);
- }
- while (remaining > buf.remaining())
- {
- int bufRemaining = buf.remaining();
- if (buf == _cachedBuffer.get())
- {
- buffers.add(buf.view(0, bufRemaining));
- buf.dispose();
- }
- else
- {
- buffers.add(buf);
- }
- remaining -= bufRemaining;
- buf = allocateDirect(_pooledBufferSize);
- }
- buffers.add(buf.view(0, remaining));
- buf.position(buf.position() + remaining);
-
- if (buf.hasRemaining())
- {
- _cachedBuffer.set(buf);
- }
- else
- {
- _cachedBuffer.set(allocateDirect(_pooledBufferSize));
- buf.dispose();
- }
- return buffers;
- }
- }
-
- public static SSLEngineResult encryptSSL(SSLEngine engine,
- final Collection<QpidByteBuffer> buffers,
- QpidByteBuffer dest) throws SSLException
- {
- final ByteBuffer[] src;
- // QPID-7447: prevent unnecessary allocations
- if (buffers.isEmpty())
- {
- src = EMPTY_BYTE_BUFFER_ARRAY;
- }
- else
- {
- src = new ByteBuffer[buffers.size()];
- Iterator<QpidByteBuffer> iterator = buffers.iterator();
- for (int i = 0; i < src.length; i++)
- {
- src[i] = iterator.next().getUnderlyingBuffer();
- }
- }
- return engine.wrap(src, dest.getUnderlyingBuffer());
- }
-
- public static Collection<QpidByteBuffer> inflate(Collection<QpidByteBuffer> compressedBuffers) throws IOException
- {
- if (compressedBuffers == null)
- {
- throw new IllegalArgumentException("compressedBuffers cannot be null");
- }
-
- boolean isDirect = false;
- Collection<InputStream> streams = new ArrayList<>(compressedBuffers.size());
- for (QpidByteBuffer buffer : compressedBuffers)
- {
- isDirect = isDirect || buffer.isDirect();
- streams.add(buffer.asInputStream());
- }
- final int bufferSize = (isDirect && _pooledBufferSize > 0) ? _pooledBufferSize : 65536;
-
- Collection<QpidByteBuffer> uncompressedBuffers = new ArrayList<>();
- try (GZIPInputStream gzipInputStream = new GZIPInputStream(new CompositeInputStream(streams)))
- {
- byte[] buf = new byte[bufferSize];
- int read;
- while ((read = gzipInputStream.read(buf)) != -1)
- {
- QpidByteBuffer output = isDirect ? allocateDirect(read) : allocate(read);
- output.put(buf, 0, read);
- output.flip();
- uncompressedBuffers.add(output);
- }
- return uncompressedBuffers;
- }
- catch (IOException e)
- {
- for (QpidByteBuffer uncompressedBuffer : uncompressedBuffers)
- {
- uncompressedBuffer.dispose();
- }
- throw e;
- }
- }
-
- public static Collection<QpidByteBuffer> deflate(Collection<QpidByteBuffer> uncompressedBuffers) throws IOException
- {
- if (uncompressedBuffers == null)
- {
- throw new IllegalArgumentException("uncompressedBuffers cannot be null");
- }
-
- boolean isDirect = false;
- Collection<InputStream> streams = new ArrayList<>(uncompressedBuffers.size());
- for (QpidByteBuffer buffer : uncompressedBuffers)
- {
- isDirect = isDirect || buffer.isDirect();
- streams.add(buffer.asInputStream());
- }
- final int bufferSize = (isDirect && _pooledBufferSize > 0) ? _pooledBufferSize : 65536;
-
- try(QpidByteBufferOutputStream compressedOutput = new QpidByteBufferOutputStream(isDirect, bufferSize);
- InputStream compressedInput = new CompositeInputStream(streams);
- GZIPOutputStream gzipStream = new GZIPOutputStream(new BufferedOutputStream(compressedOutput, bufferSize)))
- {
- byte[] buf = new byte[16384];
- int read;
- while ((read = compressedInput.read(buf)) > -1)
- {
- gzipStream.write(buf, 0, read);
- }
- gzipStream.finish();
- gzipStream.flush();
- return compressedOutput.fetchAccumulatedBuffers();
- }
- }
-
- public static long write(GatheringByteChannel channel, Collection<QpidByteBuffer> qpidByteBuffers)
- throws IOException
- {
- ByteBuffer[] byteBuffers = new ByteBuffer[qpidByteBuffers.size()];
- Iterator<QpidByteBuffer> iterator = qpidByteBuffers.iterator();
- for (int i = 0; i < byteBuffers.length; i++)
- {
- byteBuffers[i] = iterator.next().getUnderlyingBuffer();
- }
- return channel.write(byteBuffers);
- }
-
- public static QpidByteBuffer wrap(final ByteBuffer wrap)
- {
- return new QpidByteBuffer(new NonPooledByteBufferRef(wrap));
- }
-
- public static QpidByteBuffer wrap(final byte[] data)
- {
- return wrap(ByteBuffer.wrap(data));
- }
-
- public static QpidByteBuffer wrap(final byte[] data, int offset, int length)
- {
- return wrap(ByteBuffer.wrap(data, offset, length));
- }
-
- static void returnToPool(final ByteBuffer buffer)
- {
- buffer.clear();
- final ByteBuffer duplicate = _zeroed.duplicate();
- duplicate.limit(buffer.capacity());
- buffer.put(duplicate);
-
- _bufferPool.returnBuffer(buffer);
- }
-
- public synchronized static void initialisePool(int bufferSize, int maxPoolSize)
- {
- if (_isPoolInitialized && (bufferSize != _pooledBufferSize || maxPoolSize != _bufferPool.getMaxSize()))
- {
- final String errorMessage = String.format(
- "QpidByteBuffer pool has already been initialised with bufferSize=%d and maxPoolSize=%d." +
- "Re-initialisation with different bufferSize=%d and maxPoolSize=%d is not allowed.",
- _pooledBufferSize,
- _bufferPool.getMaxSize(),
- bufferSize,
- maxPoolSize);
- throw new IllegalStateException(errorMessage);
- }
- if (bufferSize <= 0)
- {
- throw new IllegalArgumentException("Negative or zero bufferSize illegal : " + bufferSize);
- }
-
- _bufferPool = new BufferPool(maxPoolSize);
- _pooledBufferSize = bufferSize;
- _zeroed = ByteBuffer.allocateDirect(_pooledBufferSize);
- _isPoolInitialized = true;
- }
-
- public static int getPooledBufferSize()
- {
- return _pooledBufferSize;
- }
-
- private static final class BufferInputStream extends InputStream
- {
- private final QpidByteBuffer _qpidByteBuffer;
-
- private BufferInputStream(final QpidByteBuffer buffer)
- {
- _qpidByteBuffer = buffer;
- }
-
- @Override
- public int read() throws IOException
- {
- if (_qpidByteBuffer.hasRemaining())
- {
- return _qpidByteBuffer.get() & 0xFF;
- }
- return -1;
- }
-
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException
- {
- if (!_qpidByteBuffer.hasRemaining())
- {
- return -1;
- }
- if (_qpidByteBuffer.remaining() < len)
- {
- len = _qpidByteBuffer.remaining();
- }
- _qpidByteBuffer.get(b, off, len);
-
- return len;
- }
-
- @Override
- public void mark(int readlimit)
- {
- _qpidByteBuffer.mark();
- }
-
- @Override
- public void reset() throws IOException
- {
- _qpidByteBuffer.reset();
- }
-
- @Override
- public boolean markSupported()
- {
- return true;
- }
-
- @Override
- public long skip(long n) throws IOException
- {
- _qpidByteBuffer.position(_qpidByteBuffer.position() + (int) n);
- return n;
- }
-
- @Override
- public int available() throws IOException
- {
- return _qpidByteBuffer.remaining();
- }
-
- @Override
- public void close()
- {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferInputStream.java b/client/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferInputStream.java
deleted file mode 100644
index 54ed85a..0000000
--- a/client/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferInputStream.java
+++ /dev/null
@@ -1,109 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.qpid.bytebuffer;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-
-import org.apache.qpid.streams.CompositeInputStream;
-
-/**
- * InputStream implementation that takes a list QpidByteBuffers.
- * The QpidByteBufferInputStream takes ownership of the buffers and disposes them on close().
- *
- * Not thread safe.
- */
-public class QpidByteBufferInputStream extends InputStream
-{
- private final CompositeInputStream _compositeInputStream;
- private final Collection<QpidByteBuffer> _buffers;
-
- public QpidByteBufferInputStream(Collection<QpidByteBuffer> buffers)
- {
- _buffers = buffers;
-
- final Collection<InputStream> streams = new ArrayList<>(buffers.size());
- for (QpidByteBuffer buffer : buffers)
- {
- streams.add(buffer.asInputStream());
- }
- _compositeInputStream = new CompositeInputStream(streams);
- }
-
- @Override
- public int read() throws IOException
- {
- return _compositeInputStream.read();
- }
-
- @Override
- public int read(byte[] b, int off, int len) throws IOException
- {
- return _compositeInputStream.read(b, off, len);
- }
-
- @Override
- public void mark(int readlimit)
- {
- _compositeInputStream.mark(readlimit);
- }
-
- @Override
- public void reset() throws IOException
- {
- _compositeInputStream.reset();
- }
-
- @Override
- public boolean markSupported()
- {
- return _compositeInputStream.markSupported();
- }
-
- @Override
- public long skip(long n) throws IOException
- {
- return _compositeInputStream.skip(n);
- }
-
- @Override
- public int available() throws IOException
- {
- return _compositeInputStream.available();
- }
-
- @Override
- public void close() throws IOException
- {
- try
- {
- _compositeInputStream.close();
- }
- finally
- {
- for (QpidByteBuffer buffer : _buffers)
- {
- buffer.dispose();
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStream.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStream.java b/client/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStream.java
deleted file mode 100644
index e772809..0000000
--- a/client/src/main/java/org/apache/qpid/bytebuffer/QpidByteBufferOutputStream.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid.bytebuffer;
-
-import java.io.IOException;
-import java.io.OutputStream;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.LinkedList;
-
-/**
- * OutputStream implementation that yields a list QpidByteBuffers that contain a copy
- * of the incoming bytes. Use fetchAccumulatedBuffers to get the buffers. Caller
- * has responsibility to dispose the buffers after use.
- *
- * It will be normally be desirable to front this stream with java.io.BufferedOutputStream
- * to minimise the number of write and thus the number of buffers created.
- *
- * Not thread safe.
- */
-public class QpidByteBufferOutputStream extends OutputStream
-{
- private final LinkedList<QpidByteBuffer> _buffers = new LinkedList<>();
- private final boolean _isDirect;
- private final int _maximumBufferSize;
- private boolean _closed;
-
- public QpidByteBufferOutputStream(final boolean isDirect, final int maximumBufferSize)
- {
- if (maximumBufferSize <= 0)
- {
- throw new IllegalArgumentException("Negative or zero maximumBufferSize illegal : " + maximumBufferSize);
- }
- _isDirect = isDirect;
- _maximumBufferSize = maximumBufferSize;
- }
-
- @Override
- public void write(int b) throws IOException
- {
- int size = 1;
- byte[] data = new byte[] {(byte)b};
- allocateDataBuffers(data, 0, size);
- }
-
- @Override
- public void write(byte[] data) throws IOException
- {
- write(data, 0, data.length);
- }
-
- @Override
- public void write(byte[] data, int offset, int len) throws IOException
- {
- allocateDataBuffers(data, offset, len);
- }
-
- @Override
- public void close() throws IOException
- {
- _closed = true;
- for (QpidByteBuffer buffer : _buffers)
- {
- buffer.dispose();
- }
- _buffers.clear();
- }
-
- public Collection<QpidByteBuffer> fetchAccumulatedBuffers()
- {
- Collection<QpidByteBuffer> bufs = new ArrayList<>(_buffers);
- _buffers.clear();
- return bufs;
- }
-
- private void allocateDataBuffers(byte[] data, int offset, int len) throws IOException
- {
- if (_closed)
- {
- throw new IOException("Stream is closed");
- }
-
- int size = Math.min(_maximumBufferSize, len);
-
- QpidByteBuffer current = _isDirect ? QpidByteBuffer.allocateDirect(len) : QpidByteBuffer.allocate(len);
- current.put(data, offset, size);
- current.flip();
- _buffers.add(current);
- if (len > size)
- {
- allocateDataBuffers(data, offset + size, len - size);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
index a7dd2dc..41166e0 100644
--- a/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
+++ b/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
@@ -38,7 +38,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.AMQMessageDelegate_0_8;
import org.apache.qpid.client.message.AbstractJMSMessage;
@@ -239,7 +238,7 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
final int headerLength = contentHeaderProperties.getPropertyListSize() + 2;
byte[] unencryptedBytes = new byte[headerLength + size];
- QpidByteBuffer output = QpidByteBuffer.wrap(unencryptedBytes);
+ ByteBuffer output = ByteBuffer.wrap(unencryptedBytes);
output.putShort((short) (contentHeaderProperties.getPropertyFlags() & 0xffff));
contentHeaderProperties.writePropertyListPayload(output);
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java b/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
index c4cf72a..8bb9178 100644
--- a/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
+++ b/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
@@ -24,7 +24,6 @@ import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
-import java.util.Collection;
import java.util.Iterator;
import java.util.List;
@@ -34,7 +33,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession_0_8;
import org.apache.qpid.client.AMQTopic;
@@ -78,7 +76,7 @@ public abstract class AbstractJMSMessageFactory
_logger.debug("Non-fragmented message body (bodySize=" + contentHeader.getBodySize() + ")");
}
- data = ((ContentBody) bodies.get(0)).getPayload().asByteBuffer().duplicate();
+ data = ((ContentBody) bodies.get(0)).getPayload().duplicate();
}
else if (bodies != null)
{
@@ -93,7 +91,7 @@ public abstract class AbstractJMSMessageFactory
while (it.hasNext())
{
ContentBody cb = (ContentBody) it.next();
- final ByteBuffer payload = cb.getPayload().asByteBuffer().duplicate();
+ final ByteBuffer payload = cb.getPayload().duplicate();
if (payload.isDirect() || payload.isReadOnly())
{
data.put(payload);
@@ -133,24 +131,16 @@ public abstract class AbstractJMSMessageFactory
protected AbstractJMSMessage create010MessageWithBody(long messageNbr, MessageProperties msgProps,
DeliveryProperties deliveryProps,
- Collection<QpidByteBuffer> body) throws QpidException
+ ByteBuffer body) throws QpidException
{
ByteBuffer data;
final boolean debug = _logger.isDebugEnabled();
- if (body != null && body.size() != 0)
+ if (body != null && body.remaining() != 0)
{
- int size = 0;
- for(QpidByteBuffer b : body)
- {
- size += b.remaining();
- }
- data = ByteBuffer.allocate(size);
- for(QpidByteBuffer b : body)
- {
- b.get(data);
- }
+ data = ByteBuffer.allocate(body.remaining());
+ data.put(body);
data.flip();
}
else // body == null
@@ -192,7 +182,7 @@ public abstract class AbstractJMSMessageFactory
}
public AbstractJMSMessage createMessage(long messageNbr, boolean redelivered, MessageProperties msgProps,
- DeliveryProperties deliveryProps, Collection<QpidByteBuffer> body)
+ DeliveryProperties deliveryProps, ByteBuffer body)
throws JMSException, QpidException
{
final AbstractJMSMessage msg =
@@ -205,7 +195,7 @@ public abstract class AbstractJMSMessageFactory
private class BodyInputStream extends InputStream
{
private final Iterator<ContentBody> _bodiesIter;
- private QpidByteBuffer _currentBuffer;
+ private ByteBuffer _currentBuffer;
public BodyInputStream(final List<ContentBody> bodies)
{
_bodiesIter = bodies.iterator();
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/client/message/Encrypted091MessageFactory.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/client/message/Encrypted091MessageFactory.java b/client/src/main/java/org/apache/qpid/client/message/Encrypted091MessageFactory.java
index 187fbaf..3bae2d6 100644
--- a/client/src/main/java/org/apache/qpid/client/message/Encrypted091MessageFactory.java
+++ b/client/src/main/java/org/apache/qpid/client/message/Encrypted091MessageFactory.java
@@ -39,7 +39,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.framing.BasicContentHeaderProperties;
@@ -143,7 +142,7 @@ public class Encrypted091MessageFactory extends AbstractJMSMessageFactory
BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
int payloadOffset;
- QpidByteBuffer dataInput = QpidByteBuffer.wrap(unencryptedBytes);
+ ByteBuffer dataInput = ByteBuffer.wrap(unencryptedBytes);
payloadOffset = properties.read(dataInput);
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/codec/AMQDecoder.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/codec/AMQDecoder.java b/client/src/main/java/org/apache/qpid/codec/AMQDecoder.java
index e0f04f7..febf46b 100644
--- a/client/src/main/java/org/apache/qpid/codec/AMQDecoder.java
+++ b/client/src/main/java/org/apache/qpid/codec/AMQDecoder.java
@@ -20,12 +20,21 @@
*/
package org.apache.qpid.codec;
+import java.nio.ByteBuffer;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.AMQFrameDecodingException;
+import org.apache.qpid.framing.AMQProtocolVersionException;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.HeartbeatBody;
+import org.apache.qpid.framing.MethodProcessor;
+import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.protocol.ErrorCodes;
+import org.apache.qpid.util.ByteBufferUtils;
/**
* AMQDecoder delegates the decoding of AMQP either to a data block decoder, or in the case of new connections, to a
@@ -91,7 +100,7 @@ public abstract class AMQDecoder<T extends MethodProcessor>
return _methodProcessor;
}
- protected final int decode(final QpidByteBuffer buf) throws AMQFrameDecodingException
+ protected final int decode(final ByteBuffer buf) throws AMQFrameDecodingException
{
// If this is the first read then we may be getting a protocol initiation back if we tried to negotiate
// an unsupported version
@@ -124,7 +133,7 @@ public abstract class AMQDecoder<T extends MethodProcessor>
return buf.hasRemaining() ? required : 0;
}
- protected int processAMQPFrames(final QpidByteBuffer buf) throws AMQFrameDecodingException
+ protected int processAMQPFrames(final ByteBuffer buf) throws AMQFrameDecodingException
{
final int required = decodable(buf);
if (required == 0)
@@ -134,7 +143,7 @@ public abstract class AMQDecoder<T extends MethodProcessor>
return required;
}
- protected int decodable(final QpidByteBuffer in) throws AMQFrameDecodingException
+ protected int decodable(final ByteBuffer in) throws AMQFrameDecodingException
{
final int remainingAfterAttributes = in.remaining() - FRAME_HEADER_SIZE;
// type, channel, body length and end byte
@@ -160,13 +169,13 @@ public abstract class AMQDecoder<T extends MethodProcessor>
}
- protected void processInput(final QpidByteBuffer in)
+ protected void processInput(final ByteBuffer in)
throws AMQFrameDecodingException, AMQProtocolVersionException
{
final byte type = in.get();
- final int channel = in.getUnsignedShort();
- final long bodySize = in.getUnsignedInt();
+ final int channel = ByteBufferUtils.getUnsignedShort(in);
+ final long bodySize = ByteBufferUtils.getUnsignedInt(in);
// bodySize can be zero
if ((channel < 0) || (bodySize < 0))
@@ -188,7 +197,7 @@ public abstract class AMQDecoder<T extends MethodProcessor>
}
- protected void processFrame(final int channel, final byte type, final long bodySize, final QpidByteBuffer in)
+ protected void processFrame(final int channel, final byte type, final long bodySize, final ByteBuffer in)
throws AMQFrameDecodingException
{
switch (type)
@@ -211,9 +220,7 @@ public abstract class AMQDecoder<T extends MethodProcessor>
}
- abstract void processMethod(int channelId,
- QpidByteBuffer in)
- throws AMQFrameDecodingException;
+ abstract void processMethod(int channelId, ByteBuffer in) throws AMQFrameDecodingException;
AMQFrameDecodingException newUnknownMethodException(final int classId,
final int methodId,
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/codec/ClientDecoder.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/codec/ClientDecoder.java b/client/src/main/java/org/apache/qpid/codec/ClientDecoder.java
index fe9cbb4..9d31dab 100644
--- a/client/src/main/java/org/apache/qpid/codec/ClientDecoder.java
+++ b/client/src/main/java/org/apache/qpid/codec/ClientDecoder.java
@@ -22,12 +22,11 @@ package org.apache.qpid.codec;
import java.nio.ByteBuffer;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.framing.*;
public class ClientDecoder extends AMQDecoder<ClientMethodProcessor<? extends ClientChannelMethodProcessor>>
{
- private QpidByteBuffer _incompleteBuffer;
+ private ByteBuffer _incompleteBuffer;
/**
* Creates a new AMQP decoder.
@@ -43,14 +42,12 @@ public class ClientDecoder extends AMQDecoder<ClientMethodProcessor<? extends Cl
{
if (_incompleteBuffer == null)
{
- QpidByteBuffer qpidByteBuffer = QpidByteBuffer.wrap(incomingBuffer);
- final int required = decode(qpidByteBuffer);
+ final int required = decode(incomingBuffer);
if (required != 0)
{
- _incompleteBuffer = QpidByteBuffer.allocate(qpidByteBuffer.remaining() + required);
- _incompleteBuffer.put(qpidByteBuffer);
+ _incompleteBuffer = ByteBuffer.allocate(incomingBuffer.remaining() + required);
+ _incompleteBuffer.put(incomingBuffer);
}
- qpidByteBuffer.dispose();
}
else
{
@@ -61,33 +58,29 @@ public class ClientDecoder extends AMQDecoder<ClientMethodProcessor<? extends Cl
else
{
_incompleteBuffer.flip();
- final QpidByteBuffer aggregatedBuffer =
- QpidByteBuffer.allocate(_incompleteBuffer.remaining() + incomingBuffer.remaining());
+ final ByteBuffer aggregatedBuffer =
+ ByteBuffer.allocate(_incompleteBuffer.remaining() + incomingBuffer.remaining());
aggregatedBuffer.put(_incompleteBuffer);
aggregatedBuffer.put(incomingBuffer);
aggregatedBuffer.flip();
final int required = decode(aggregatedBuffer);
- _incompleteBuffer.dispose();
if (required != 0)
{
- _incompleteBuffer = QpidByteBuffer.allocate(aggregatedBuffer.remaining() + required);
+ _incompleteBuffer = ByteBuffer.allocate(aggregatedBuffer.remaining() + required);
_incompleteBuffer.put(aggregatedBuffer);
}
else
{
_incompleteBuffer = null;
}
- aggregatedBuffer.dispose();
}
}
// post-condition: assert(!incomingBuffer.hasRemaining());
}
@Override
- void processMethod(int channelId,
- QpidByteBuffer in)
- throws AMQFrameDecodingException
+ void processMethod(int channelId, ByteBuffer in) throws AMQFrameDecodingException
{
ClientMethodProcessor<? extends ClientChannelMethodProcessor> methodProcessor = getMethodProcessor();
ClientChannelMethodProcessor channelMethodProcessor = methodProcessor.getChannelMethodProcessor(channelId);
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/codec/ServerDecoder.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/codec/ServerDecoder.java b/client/src/main/java/org/apache/qpid/codec/ServerDecoder.java
deleted file mode 100644
index 98b3caf..0000000
--- a/client/src/main/java/org/apache/qpid/codec/ServerDecoder.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.codec;
-
-import java.io.IOException;
-
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.framing.*;
-
-public class ServerDecoder extends AMQDecoder<ServerMethodProcessor<? extends ServerChannelMethodProcessor>>
-{
-
- /**
- * Creates a new AMQP decoder.
- *
- * @param methodProcessor method processor
- */
- public ServerDecoder(final ServerMethodProcessor<? extends ServerChannelMethodProcessor> methodProcessor)
- {
- super(true, methodProcessor);
- }
-
- public void decodeBuffer(QpidByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
- {
- decode(buf);
- }
-
-
- void processMethod(int channelId,
- QpidByteBuffer in)
- throws AMQFrameDecodingException
- {
- ServerMethodProcessor<? extends ServerChannelMethodProcessor> methodProcessor = getMethodProcessor();
- final int classAndMethod = in.getInt();
- int classId = classAndMethod >> 16;
- int methodId = classAndMethod & 0xFFFF;
- methodProcessor.setCurrentMethod(classId, methodId);
- try
- {
- switch (classAndMethod)
- {
- //CONNECTION_CLASS:
- case 0x000a000b:
- ConnectionStartOkBody.process(in, methodProcessor);
- break;
- case 0x000a0015:
- ConnectionSecureOkBody.process(in, methodProcessor);
- break;
- case 0x000a001f:
- ConnectionTuneOkBody.process(in, methodProcessor);
- break;
- case 0x000a0028:
- ConnectionOpenBody.process(in, methodProcessor);
- break;
- case 0x000a0032:
- if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v0_8))
- {
- throw newUnknownMethodException(classId, methodId,
- methodProcessor.getProtocolVersion());
- }
- else
- {
- ConnectionCloseBody.process(in, methodProcessor);
- }
- break;
- case 0x000a0033:
- if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v0_8))
- {
- throw newUnknownMethodException(classId, methodId,
- methodProcessor.getProtocolVersion());
- }
- else
- {
- methodProcessor.receiveConnectionCloseOk();
- }
- break;
- case 0x000a003c:
- if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v0_8))
- {
- ConnectionCloseBody.process(in, methodProcessor);
- }
- else
- {
- throw newUnknownMethodException(classId, methodId,
- methodProcessor.getProtocolVersion());
- }
- break;
- case 0x000a003d:
- if (methodProcessor.getProtocolVersion().equals(ProtocolVersion.v0_8))
- {
- methodProcessor.receiveConnectionCloseOk();
- }
- else
- {
- throw newUnknownMethodException(classId, methodId,
- methodProcessor.getProtocolVersion());
- }
- break;
-
- // CHANNEL_CLASS:
-
- case 0x0014000a:
- ChannelOpenBody.process(channelId, in, methodProcessor);
- break;
- case 0x00140014:
- ChannelFlowBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x00140015:
- ChannelFlowOkBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x00140028:
- ChannelCloseBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x00140029:
- methodProcessor.getChannelMethodProcessor(channelId).receiveChannelCloseOk();
- break;
-
- // ACCESS_CLASS:
-
- case 0x001e000a:
- AccessRequestBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
-
- // EXCHANGE_CLASS:
-
- case 0x0028000a:
- ExchangeDeclareBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x00280014:
- ExchangeDeleteBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x00280016:
- ExchangeBoundBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
-
-
- // QUEUE_CLASS:
-
- case 0x0032000a:
- QueueDeclareBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x00320014:
- QueueBindBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x0032001e:
- QueuePurgeBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x00320028:
- QueueDeleteBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x00320032:
- QueueUnbindBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
-
-
- // BASIC_CLASS:
-
- case 0x003c000a:
- BasicQosBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x003c0014:
- BasicConsumeBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x003c001e:
- BasicCancelBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x003c0028:
- BasicPublishBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x003c0046:
- BasicGetBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x003c0050:
- BasicAckBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x003c005a:
- BasicRejectBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x003c0064:
- BasicRecoverBody.process(in, methodProcessor.getProtocolVersion(),
- methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x003c0066:
- BasicRecoverSyncBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x003c006e:
- BasicRecoverSyncBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
- case 0x003c0078:
- BasicNackBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
-
- // CONFIRM CLASS:
-
- case 0x0055000a:
- ConfirmSelectBody.process(in, methodProcessor.getChannelMethodProcessor(channelId));
- break;
-
- // TX_CLASS:
-
- case 0x005a000a:
- if(!methodProcessor.getChannelMethodProcessor(channelId).ignoreAllButCloseOk())
- {
- methodProcessor.getChannelMethodProcessor(channelId).receiveTxSelect();
- }
- break;
- case 0x005a0014:
- if(!methodProcessor.getChannelMethodProcessor(channelId).ignoreAllButCloseOk())
- {
- methodProcessor.getChannelMethodProcessor(channelId).receiveTxCommit();
- }
- break;
- case 0x005a001e:
- if(!methodProcessor.getChannelMethodProcessor(channelId).ignoreAllButCloseOk())
- {
- methodProcessor.getChannelMethodProcessor(channelId).receiveTxRollback();
- }
- break;
-
-
- default:
- throw newUnknownMethodException(classId, methodId,
- methodProcessor.getProtocolVersion());
-
- }
- }
- finally
- {
- methodProcessor.setCurrentMethod(0, 0);
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/AMQFrame.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/AMQFrame.java b/client/src/main/java/org/apache/qpid/framing/AMQFrame.java
index e838dd4..1a13795 100644
--- a/client/src/main/java/org/apache/qpid/framing/AMQFrame.java
+++ b/client/src/main/java/org/apache/qpid/framing/AMQFrame.java
@@ -20,8 +20,10 @@
*/
package org.apache.qpid.framing;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.ByteBufferUtils;
public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
{
@@ -48,7 +50,7 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
}
- private static final QpidByteBuffer FRAME_END_BYTE_BUFFER = QpidByteBuffer.allocateDirect(1);
+ private static final ByteBuffer FRAME_END_BYTE_BUFFER = ByteBuffer.allocate(1);
static
{
FRAME_END_BYTE_BUFFER.put(FRAME_END_BYTE);
@@ -58,14 +60,13 @@ public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
@Override
public long writePayload(final ByteBufferSender sender)
{
- QpidByteBuffer frameHeader = QpidByteBuffer.allocate(sender.isDirectBufferPreferred(), HEADER_SIZE);
+ ByteBuffer frameHeader = ByteBuffer.allocate(HEADER_SIZE);
frameHeader.put(_bodyFrame.getFrameType());
- frameHeader.putUnsignedShort(_channel);
- frameHeader.putUnsignedInt((long) _bodyFrame.getSize());
+ ByteBufferUtils.putUnsignedShort(frameHeader, _channel);
+ ByteBufferUtils.putUnsignedInt(frameHeader, _bodyFrame.getSize());
frameHeader.flip();
sender.send(frameHeader);
- frameHeader.dispose();
long size = 8 + _bodyFrame.writePayload(sender);
sender.send(FRAME_END_BYTE_BUFFER.duplicate());
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java b/client/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
index a765288..8ecda94 100644
--- a/client/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
+++ b/client/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
@@ -20,13 +20,15 @@
*/
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.ByteBufferUtils;
public abstract class AMQMethodBodyImpl implements AMQMethodBody
{
@@ -74,17 +76,16 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody
{
final int size = getSize();
- QpidByteBuffer buf = QpidByteBuffer.allocate(sender.isDirectBufferPreferred(), size);
- buf.putUnsignedShort(getClazz());
- buf.putUnsignedShort(getMethod());
+ ByteBuffer buf = ByteBuffer.allocate(size);
+ ByteBufferUtils.putUnsignedShort(buf, getClazz());
+ ByteBufferUtils.putUnsignedShort(buf, getMethod());
writeMethodPayload(buf);
buf.flip();
sender.send(buf);
- buf.dispose();
return size;
}
- abstract protected void writeMethodPayload(QpidByteBuffer buffer);
+ abstract protected void writeMethodPayload(ByteBuffer buffer);
protected int getSizeOf(AMQShortString string)
@@ -92,18 +93,18 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody
return EncodingUtils.encodedShortStringLength(string);
}
- protected void writeByte(QpidByteBuffer buffer, byte b)
+ protected void writeByte(ByteBuffer buffer, byte b)
{
buffer.put(b);
}
- protected void writeAMQShortString(QpidByteBuffer buffer, AMQShortString string)
+ protected void writeAMQShortString(ByteBuffer buffer, AMQShortString string)
{
EncodingUtils.writeShortStringBytes(buffer, string);
}
- protected void writeInt(QpidByteBuffer buffer, int i)
+ protected void writeInt(ByteBuffer buffer, int i)
{
buffer.putInt(i);
}
@@ -114,12 +115,12 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody
return EncodingUtils.encodedFieldTableLength(table); //To change body of created methods use File | Settings | File Templates.
}
- protected void writeFieldTable(QpidByteBuffer buffer, FieldTable table)
+ protected void writeFieldTable(ByteBuffer buffer, FieldTable table)
{
EncodingUtils.writeFieldTableBytes(buffer, table);
}
- protected void writeLong(QpidByteBuffer buffer, long l)
+ protected void writeLong(ByteBuffer buffer, long l)
{
buffer.putLong(l);
}
@@ -130,34 +131,34 @@ public abstract class AMQMethodBodyImpl implements AMQMethodBody
return (response == null) ? 4 : response.length + 4;
}
- protected void writeBytes(QpidByteBuffer buffer, byte[] data)
+ protected void writeBytes(ByteBuffer buffer, byte[] data)
{
EncodingUtils.writeBytes(buffer,data);
}
- protected void writeShort(QpidByteBuffer buffer, short s)
+ protected void writeShort(ByteBuffer buffer, short s)
{
buffer.putShort(s);
}
- protected void writeBitfield(QpidByteBuffer buffer, byte bitfield0)
+ protected void writeBitfield(ByteBuffer buffer, byte bitfield0)
{
buffer.put(bitfield0);
}
- protected void writeUnsignedShort(QpidByteBuffer buffer, int s)
+ protected void writeUnsignedShort(ByteBuffer buffer, int s)
{
- buffer.putUnsignedShort(s);
+ ByteBufferUtils.putUnsignedShort(buffer, s);
}
- protected void writeUnsignedInteger(QpidByteBuffer buffer, long i)
+ protected void writeUnsignedInteger(ByteBuffer buffer, long i)
{
- buffer.putUnsignedInt(i);
+ ByteBufferUtils.putUnsignedInt(buffer, i);
}
- protected void writeUnsignedByte(QpidByteBuffer buffer, short unsignedByte)
+ protected void writeUnsignedByte(ByteBuffer buffer, short unsignedByte)
{
- buffer.putUnsignedByte(unsignedByte);
+ ByteBufferUtils.putUnsignedByte(buffer, unsignedByte);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/AMQShortString.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/AMQShortString.java b/client/src/main/java/org/apache/qpid/framing/AMQShortString.java
index a2ce6fd..9d2ba59 100644
--- a/client/src/main/java/org/apache/qpid/framing/AMQShortString.java
+++ b/client/src/main/java/org/apache/qpid/framing/AMQShortString.java
@@ -28,8 +28,6 @@ import java.util.Arrays;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-
/**
* A short string is a representation of an AMQ Short String
* Short strings differ from the Java String class by being limited to on ASCII characters (0-127)
@@ -123,34 +121,6 @@ public final class AMQShortString implements Comparable<AMQShortString>
}
}
- public static AMQShortString readAMQShortString(QpidByteBuffer buffer)
- {
- int length = ((int) buffer.get()) & 0xff;
- if(length == 0)
- {
- return null;
- }
- else
- {
- if (length > MAX_LENGTH)
- {
- throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
- }
- if(length > buffer.remaining())
- {
- throw new IllegalArgumentException("Cannot create AMQShortString with length "
- + length + " from a ByteBuffer with only "
- + buffer.remaining()
- + " bytes.");
-
- }
- byte[] data = new byte[length];
- buffer.get(data);
- return new AMQShortString(data, 0, length);
- }
- }
-
-
public AMQShortString(byte[] data, final int offset, final int length)
{
if (length > MAX_LENGTH)
@@ -197,7 +167,7 @@ public final class AMQShortString implements Comparable<AMQShortString>
}
}
- public void writeToBuffer(QpidByteBuffer buffer)
+ public void writeToBuffer(ByteBuffer buffer)
{
final int size = length();
buffer.put((byte)size);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/4] qpid-jms-amqp-0-x git commit: QPID-7725: [Java Client,
AMQP 0-x] Remove QpidByteBuffer
Posted by lq...@apache.org.
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java b/client/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
index 0d07817..a263215 100644
--- a/client/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
@@ -20,10 +20,12 @@
*/
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.ByteBufferUtils;
public class ContentHeaderBody implements AMQBody
{
@@ -36,12 +38,12 @@ public class ContentHeaderBody implements AMQBody
/** must never be null */
private final BasicContentHeaderProperties _properties;
- public ContentHeaderBody(QpidByteBuffer buffer, long size) throws AMQFrameDecodingException
+ public ContentHeaderBody(ByteBuffer buffer, long size) throws AMQFrameDecodingException
{
- buffer.getUnsignedShort();
- buffer.getUnsignedShort();
+ ByteBufferUtils.getUnsignedShort(buffer);
+ ByteBufferUtils.getUnsignedShort(buffer);
_bodySize = buffer.getLong();
- int propertyFlags = buffer.getUnsignedShort();
+ int propertyFlags = ByteBufferUtils.getUnsignedShort(buffer);
ContentHeaderPropertiesFactory factory = ContentHeaderPropertiesFactory.getInstance();
_properties = factory.createContentHeaderProperties(CLASS_ID, propertyFlags, buffer, (int)size - 14);
@@ -72,7 +74,7 @@ public class ContentHeaderBody implements AMQBody
* @throws AMQFrameDecodingException if there is a decoding issue
* @throws AMQProtocolVersionException if there is a version issue
*/
- public static ContentHeaderBody createFromBuffer(QpidByteBuffer buffer, long size)
+ public static ContentHeaderBody createFromBuffer(ByteBuffer buffer, long size)
throws AMQFrameDecodingException, AMQProtocolVersionException
{
ContentHeaderBody body = new ContentHeaderBody(buffer, size);
@@ -88,23 +90,22 @@ public class ContentHeaderBody implements AMQBody
@Override
public long writePayload(final ByteBufferSender sender)
{
- QpidByteBuffer data = QpidByteBuffer.allocate(sender.isDirectBufferPreferred(), HEADER_SIZE);
- data.putUnsignedShort(CLASS_ID);
- data.putUnsignedShort(0);
+ ByteBuffer data = ByteBuffer.allocate(HEADER_SIZE);
+ ByteBufferUtils.putUnsignedShort(data, CLASS_ID);
+ ByteBufferUtils.putUnsignedShort(data, 0);
data.putLong(_bodySize);
- data.putUnsignedShort(_properties.getPropertyFlags());
+ ByteBufferUtils.putUnsignedShort(data, _properties.getPropertyFlags());
data.flip();
sender.send(data);
- data.dispose();
return HEADER_SIZE + _properties.writePropertyListPayload(sender);
}
- public long writePayload(final QpidByteBuffer buf)
+ public long writePayload(final ByteBuffer buf)
{
- buf.putUnsignedShort(CLASS_ID);
- buf.putUnsignedShort(0);
+ ByteBufferUtils.putUnsignedShort(buf, CLASS_ID);
+ ByteBufferUtils.putUnsignedShort(buf, 0);
buf.putLong(_bodySize);
- buf.putUnsignedShort(_properties.getPropertyFlags());
+ ByteBufferUtils.putUnsignedShort(buf, _properties.getPropertyFlags());
return HEADER_SIZE + _properties.writePropertyListPayload(buf);
}
@@ -160,15 +161,15 @@ public class ContentHeaderBody implements AMQBody
_bodySize = bodySize;
}
- public static void process(final QpidByteBuffer buffer,
+ public static void process(final ByteBuffer buffer,
final ChannelMethodProcessor methodProcessor, final long size)
throws AMQFrameDecodingException
{
- int classId = buffer.getUnsignedShort();
- buffer.getUnsignedShort();
+ int classId = ByteBufferUtils.getUnsignedShort(buffer);
+ ByteBufferUtils.getUnsignedShort(buffer);
long bodySize = buffer.getLong();
- int propertyFlags = buffer.getUnsignedShort();
+ int propertyFlags = ByteBufferUtils.getUnsignedShort(buffer);
BasicContentHeaderProperties properties;
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java b/client/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
index 29d8849..dd35127 100644
--- a/client/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
+++ b/client/src/main/java/org/apache/qpid/framing/ContentHeaderPropertiesFactory.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.framing;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import java.nio.ByteBuffer;
public class ContentHeaderPropertiesFactory
{
@@ -36,7 +36,7 @@ public class ContentHeaderPropertiesFactory
}
public BasicContentHeaderProperties createContentHeaderProperties(int classId, int propertyFlags,
- QpidByteBuffer buffer, int size)
+ ByteBuffer buffer, int size)
throws AMQFrameDecodingException
{
BasicContentHeaderProperties properties;
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/EncodingUtils.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/EncodingUtils.java b/client/src/main/java/org/apache/qpid/framing/EncodingUtils.java
index 8f156da..0272f9e 100644
--- a/client/src/main/java/org/apache/qpid/framing/EncodingUtils.java
+++ b/client/src/main/java/org/apache/qpid/framing/EncodingUtils.java
@@ -20,12 +20,13 @@
*/
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.util.ByteBufferUtils;
public class EncodingUtils
{
@@ -119,7 +120,7 @@ public class EncodingUtils
}
}
- public static void writeLongAsShortString(QpidByteBuffer buffer, long l)
+ public static void writeLongAsShortString(ByteBuffer buffer, long l)
{
String s = Long.toString(l);
byte[] encodedString = new byte[1+s.length()];
@@ -134,7 +135,7 @@ public class EncodingUtils
}
- public static void writeShortStringBytes(QpidByteBuffer buffer, AMQShortString s)
+ public static void writeShortStringBytes(ByteBuffer buffer, AMQShortString s)
{
if (s != null)
{
@@ -147,18 +148,18 @@ public class EncodingUtils
}
}
- public static void writeLongStringBytes(QpidByteBuffer buffer, String s)
+ public static void writeLongStringBytes(ByteBuffer buffer, String s)
{
if (s != null)
{
int len = getUTF8Length(s);
- buffer.putUnsignedInt((long) len);
+ ByteBufferUtils.putUnsignedInt(buffer, (long) len);
buffer.put(asUTF8Bytes(s));
}
else
{
- buffer.putUnsignedInt((long) 0);
+ ByteBufferUtils.putUnsignedInt(buffer, (long) 0);
}
}
@@ -167,7 +168,7 @@ public class EncodingUtils
return 4;
}
- public static void writeFieldTableBytes(QpidByteBuffer buffer, FieldTable table)
+ public static void writeFieldTableBytes(ByteBuffer buffer, FieldTable table)
{
if (table != null)
{
@@ -175,26 +176,26 @@ public class EncodingUtils
}
else
{
- buffer.putUnsignedInt((long) 0);
+ ByteBufferUtils.putUnsignedInt(buffer, (long) 0);
}
}
- public static void writeLongstr(QpidByteBuffer buffer, byte[] data)
+ public static void writeLongstr(ByteBuffer buffer, byte[] data)
{
if (data != null)
{
- buffer.putUnsignedInt((long) data.length);
+ ByteBufferUtils.putUnsignedInt(buffer, (long) data.length);
buffer.put(data);
}
else
{
- buffer.putUnsignedInt((long) 0);
+ ByteBufferUtils.putUnsignedInt(buffer, (long) 0);
}
}
- public static FieldTable readFieldTable(QpidByteBuffer input) throws AMQFrameDecodingException
+ public static FieldTable readFieldTable(ByteBuffer input) throws AMQFrameDecodingException
{
- long length = input.getUnsignedInt();
+ long length = ByteBufferUtils.getUnsignedInt(input);
if (length == 0)
{
return null;
@@ -206,7 +207,7 @@ public class EncodingUtils
}
- public static String readLongString(QpidByteBuffer buffer)
+ public static String readLongString(ByteBuffer buffer)
{
long length = ((long)(buffer.getInt())) & 0xFFFFFFFFL;
if (length == 0)
@@ -222,7 +223,7 @@ public class EncodingUtils
}
}
- public static byte[] readLongstr(QpidByteBuffer buffer)
+ public static byte[] readLongstr(ByteBuffer buffer)
{
long length = ((long)(buffer.getInt())) & 0xFFFFFFFFL;
if (length == 0)
@@ -242,7 +243,7 @@ public class EncodingUtils
// AMQP_BOOLEAN_PROPERTY_PREFIX
- public static void writeBoolean(QpidByteBuffer buffer, boolean aBoolean)
+ public static void writeBoolean(ByteBuffer buffer, boolean aBoolean)
{
buffer.put(aBoolean ? (byte)1 : (byte)0);
}
@@ -282,9 +283,9 @@ public class EncodingUtils
return 8;
}
- public static byte[] readBytes(QpidByteBuffer buffer)
+ public static byte[] readBytes(ByteBuffer buffer)
{
- long length = buffer.getUnsignedInt();
+ long length = ByteBufferUtils.getUnsignedInt(buffer);
if (length == 0)
{
return null;
@@ -298,17 +299,17 @@ public class EncodingUtils
}
}
- public static void writeBytes(QpidByteBuffer buffer, byte[] data)
+ public static void writeBytes(ByteBuffer buffer, byte[] data)
{
if (data != null)
{
// TODO: check length fits in an unsigned byte
- buffer.putUnsignedInt((long)data.length);
+ ByteBufferUtils.putUnsignedInt(buffer, data.length);
buffer.put(data);
}
else
{
- buffer.putUnsignedInt(0L);
+ ByteBufferUtils.putUnsignedInt(buffer, 0L);
}
}
@@ -318,9 +319,9 @@ public class EncodingUtils
return encodedByteLength();
}
- public static long readLongAsShortString(QpidByteBuffer buffer) throws AMQFrameDecodingException
+ public static long readLongAsShortString(ByteBuffer buffer) throws AMQFrameDecodingException
{
- short length = buffer.getUnsignedByte();
+ short length = ByteBufferUtils.getUnsignedByte(buffer);
short pos = 0;
if (length == 0)
{
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java b/client/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java
index 8d5dcfd..880f2ed 100644
--- a/client/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/ExchangeBoundBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class ExchangeBoundBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -84,7 +85,7 @@ public class ExchangeBoundBody extends AMQMethodBodyImpl implements EncodableAMQ
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeAMQShortString( buffer, _exchange );
writeAMQShortString( buffer, _routingKey );
@@ -111,7 +112,7 @@ public class ExchangeBoundBody extends AMQMethodBodyImpl implements EncodableAMQ
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer,
+ public static void process(final ByteBuffer buffer,
final ServerChannelMethodProcessor dispatcher)
{
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java b/client/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java
index 6b3379f..296e7d8 100644
--- a/client/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java
@@ -27,8 +27,10 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.util.ByteBufferUtils;
public class ExchangeBoundOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -82,7 +84,7 @@ public class ExchangeBoundOkBody extends AMQMethodBodyImpl implements EncodableA
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeUnsignedShort( buffer, _replyCode );
writeAMQShortString( buffer, _replyText );
@@ -105,11 +107,10 @@ public class ExchangeBoundOkBody extends AMQMethodBodyImpl implements EncodableA
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer,
+ public static void process(final ByteBuffer buffer,
final ClientChannelMethodProcessor dispatcher)
{
-
- int replyCode = buffer.getUnsignedShort();
+ int replyCode = ByteBufferUtils.getUnsignedShort(buffer);
AMQShortString replyText = AMQShortString.readAMQShortString(buffer);
if(!dispatcher.ignoreAllButCloseOk())
{
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java b/client/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java
index 5bded0a..48f583a 100644
--- a/client/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/ExchangeDeclareBody.java
@@ -27,8 +27,10 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.util.ByteBufferUtils;
public class ExchangeDeclareBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -144,7 +146,7 @@ public class ExchangeDeclareBody extends AMQMethodBodyImpl implements EncodableA
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeUnsignedShort( buffer, _ticket );
writeAMQShortString( buffer, _exchange );
@@ -191,11 +193,11 @@ public class ExchangeDeclareBody extends AMQMethodBodyImpl implements EncodableA
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer,
+ public static void process(final ByteBuffer buffer,
final ServerChannelMethodProcessor dispatcher) throws AMQFrameDecodingException
{
- int ticket = buffer.getUnsignedShort();
+ int ticket = ByteBufferUtils.getUnsignedShort(buffer);
AMQShortString exchange = AMQShortString.readAMQShortString(buffer);
AMQShortString type = AMQShortString.readAMQShortString(buffer);
byte bitfield = buffer.get();
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/ExchangeDeclareOkBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/ExchangeDeclareOkBody.java b/client/src/main/java/org/apache/qpid/framing/ExchangeDeclareOkBody.java
index 288c559..9f377d0 100644
--- a/client/src/main/java/org/apache/qpid/framing/ExchangeDeclareOkBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/ExchangeDeclareOkBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class ExchangeDeclareOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -60,7 +61,7 @@ public class ExchangeDeclareOkBody extends AMQMethodBodyImpl implements Encodabl
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java b/client/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java
index 4f69bcf..05dd6c5 100644
--- a/client/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/ExchangeDeleteBody.java
@@ -27,8 +27,10 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.util.ByteBufferUtils;
public class ExchangeDeleteBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -97,7 +99,7 @@ public class ExchangeDeleteBody extends AMQMethodBodyImpl implements EncodableAM
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeUnsignedShort( buffer, _ticket );
writeAMQShortString( buffer, _exchange );
@@ -127,11 +129,11 @@ public class ExchangeDeleteBody extends AMQMethodBodyImpl implements EncodableAM
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer,
+ public static void process(final ByteBuffer buffer,
final ServerChannelMethodProcessor dispatcher)
{
- int ticket = buffer.getUnsignedShort();
+ int ticket = ByteBufferUtils.getUnsignedShort(buffer);
AMQShortString exchange = AMQShortString.readAMQShortString(buffer);
byte bitfield = buffer.get();
boolean ifUnused = (bitfield & 0x01) == 0x01;
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/ExchangeDeleteOkBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/ExchangeDeleteOkBody.java b/client/src/main/java/org/apache/qpid/framing/ExchangeDeleteOkBody.java
index 7fca59e..d6bad7a 100644
--- a/client/src/main/java/org/apache/qpid/framing/ExchangeDeleteOkBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/ExchangeDeleteOkBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class ExchangeDeleteOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -60,7 +61,7 @@ public class ExchangeDeleteOkBody extends AMQMethodBodyImpl implements Encodable
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/FieldArray.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/FieldArray.java b/client/src/main/java/org/apache/qpid/framing/FieldArray.java
index 7b7c2ec..33a32f9 100644
--- a/client/src/main/java/org/apache/qpid/framing/FieldArray.java
+++ b/client/src/main/java/org/apache/qpid/framing/FieldArray.java
@@ -21,6 +21,7 @@
package org.apache.qpid.framing;
import java.math.BigDecimal;
+import java.nio.ByteBuffer;
import java.util.AbstractCollection;
import java.util.ArrayList;
import java.util.Arrays;
@@ -31,7 +32,7 @@ import java.util.Iterator;
import java.util.Map;
import java.util.Set;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.util.ByteBufferUtils;
public class FieldArray<T> extends AbstractCollection<T>
{
@@ -104,7 +105,7 @@ public class FieldArray<T> extends AbstractCollection<T>
}
}
- public void writeToBuffer(final QpidByteBuffer buffer)
+ public void writeToBuffer(final ByteBuffer buffer)
{
buffer.putInt(getEncodingSize());
for( T obj : this)
@@ -113,17 +114,16 @@ public class FieldArray<T> extends AbstractCollection<T>
}
}
- public static FieldArray<?> readFromBuffer(final QpidByteBuffer buffer)
+ public static FieldArray<?> readFromBuffer(final ByteBuffer buffer)
{
ArrayList<Object> result = new ArrayList<>();
int size = buffer.getInt();
- QpidByteBuffer slicedBuffer = buffer.view(0,size);
+ ByteBuffer slicedBuffer = ByteBufferUtils.view(buffer, 0, size);
buffer.position(buffer.position()+size);
while(slicedBuffer.hasRemaining())
{
result.add(AMQTypedValue.readFromBuffer(slicedBuffer).getValue());
}
- slicedBuffer.dispose();
return new FieldArray<>(result);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/FieldTable.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/FieldTable.java b/client/src/main/java/org/apache/qpid/framing/FieldTable.java
index ad3c6c6..d6d90e3 100644
--- a/client/src/main/java/org/apache/qpid/framing/FieldTable.java
+++ b/client/src/main/java/org/apache/qpid/framing/FieldTable.java
@@ -21,6 +21,7 @@
package org.apache.qpid.framing;
import java.math.BigDecimal;
+import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Collections;
import java.util.Date;
@@ -36,7 +37,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQPInvalidClassException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.util.ByteBufferUtils;
public class FieldTable
{
@@ -44,7 +45,7 @@ public class FieldTable
private static final String STRICT_AMQP_NAME = "STRICT_AMQP";
private static final boolean STRICT_AMQP = Boolean.valueOf(System.getProperty(STRICT_AMQP_NAME, "false"));
- private QpidByteBuffer _encodedForm;
+ private ByteBuffer _encodedForm;
private Map<AMQShortString, AMQTypedValue> _properties = null;
private long _encodedSize;
private static final int INITIAL_HASHMAP_CAPACITY = 16;
@@ -73,15 +74,15 @@ public class FieldTable
}
}
- public FieldTable(QpidByteBuffer input, int len)
+ public FieldTable(ByteBuffer input, int len)
{
this();
- _encodedForm = input.view(0,len);
+ _encodedForm = ByteBufferUtils.view(input, 0, len);
input.position(input.position()+len);
_encodedSize = len;
}
- public FieldTable(QpidByteBuffer buffer)
+ public FieldTable(ByteBuffer buffer)
{
this();
_encodedForm = buffer.duplicate();
@@ -813,7 +814,7 @@ public class FieldTable
// ************************* Byte Buffer Processing
- public void writeToBuffer(QpidByteBuffer buffer)
+ public void writeToBuffer(ByteBuffer buffer)
{
final boolean trace = _logger.isDebugEnabled();
@@ -826,7 +827,7 @@ public class FieldTable
}
}
- buffer.putUnsignedInt(getEncodedSize());
+ ByteBufferUtils.putUnsignedInt(buffer, getEncodedSize());
putDataInBuffer(buffer);
}
@@ -837,14 +838,14 @@ public class FieldTable
if(_encodedForm == null)
{
byte[] data = new byte[(int) getEncodedSize()];
- QpidByteBuffer buf = QpidByteBuffer.wrap(data);
+ ByteBuffer buf = ByteBuffer.wrap(data);
putDataInBuffer(buf);
return data;
}
else
{
byte[] encodedCopy = new byte[_encodedForm.remaining()];
- _encodedForm.copyTo(encodedCopy);
+ ByteBufferUtils.copyTo(_encodedForm, encodedCopy);
return encodedCopy;
}
@@ -933,7 +934,6 @@ public class FieldTable
if (_encodedForm != null)
{
- _encodedForm.dispose();
_encodedForm = null;
}
}
@@ -953,7 +953,6 @@ public class FieldTable
if (_encodedForm != null)
{
- _encodedForm.dispose();
_encodedForm = null;
}
}
@@ -1084,7 +1083,6 @@ public class FieldTable
initMapIfNecessary();
if (_encodedForm != null)
{
- _encodedForm.dispose();
_encodedForm = null;
}
_properties.clear();
@@ -1100,14 +1098,11 @@ public class FieldTable
return _properties.keySet();
}
- private void putDataInBuffer(QpidByteBuffer buffer)
+ private void putDataInBuffer(ByteBuffer buffer)
{
if (_encodedForm != null)
{
- byte[] encodedCopy = new byte[_encodedForm.remaining()];
- _encodedForm.copyTo(encodedCopy);
-
- buffer.put(encodedCopy);
+ ByteBufferUtils.copyTo(_encodedForm, buffer);
}
else if (_properties != null)
{
@@ -1137,7 +1132,7 @@ public class FieldTable
private void setFromBuffer() throws AMQFrameDecodingException
{
- final QpidByteBuffer slice = _encodedForm.slice();
+ final ByteBuffer slice = _encodedForm.slice();
if (_encodedSize > 0)
{
@@ -1156,7 +1151,6 @@ public class FieldTable
while (slice.hasRemaining());
}
- slice.dispose();
}
public int hashCode()
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java b/client/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
index 9875cc9..c40529c 100644
--- a/client/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
+++ b/client/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
@@ -20,11 +20,10 @@
*/
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-
public class FrameCreatingMethodProcessor implements MethodProcessor<FrameCreatingMethodProcessor.ClientAndServerChannelMethodProcessor>,
ClientMethodProcessor<FrameCreatingMethodProcessor.ClientAndServerChannelMethodProcessor>,
ServerMethodProcessor<FrameCreatingMethodProcessor.ClientAndServerChannelMethodProcessor>
@@ -605,7 +604,7 @@ public class FrameCreatingMethodProcessor implements MethodProcessor<FrameCreati
}
@Override
- public void receiveMessageContent(QpidByteBuffer data)
+ public void receiveMessageContent(ByteBuffer data)
{
_processedMethods.add(new AMQFrame(_channelId, new ContentBody(data)));
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/HeartbeatBody.java b/client/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
index cb985f5..a7461ee 100644
--- a/client/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
@@ -22,9 +22,9 @@ package org.apache.qpid.framing;
import java.io.DataInputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.transport.ByteBufferSender;
@@ -84,7 +84,7 @@ public class HeartbeatBody implements AMQBody
}
public static void process(final int channel,
- final QpidByteBuffer in,
+ final ByteBuffer in,
final MethodProcessor processor,
final long bodySize)
{
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java b/client/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
index 757ae51..818ef41 100644
--- a/client/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
+++ b/client/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
@@ -21,11 +21,11 @@
package org.apache.qpid.framing;
import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.transport.ByteBufferSender;
public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQDataBlock
@@ -61,7 +61,7 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
pv.equals(ProtocolVersion.v0_91) ? 1 : pv.getMinorVersion());
}
- public ProtocolInitiation(QpidByteBuffer in)
+ public ProtocolInitiation(ByteBuffer in)
{
_protocolHeader = new byte[4];
in.get(_protocolHeader);
@@ -90,7 +90,7 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
data[5] = _protocolInstance;
data[6] = _protocolMajor;
data[7] = _protocolMinor;
- sender.send(QpidByteBuffer.wrap(data));
+ sender.send(ByteBuffer.wrap(data));
return 8l;
}
@@ -149,7 +149,7 @@ public class ProtocolInitiation extends AMQDataBlock implements EncodableAMQData
* @param in input buffer
* @return number of extra octets of data required data to decode the PI frame fully
*/
- public int decodable(QpidByteBuffer in)
+ public int decodable(ByteBuffer in)
{
return (in.remaining() >= 8) ? 0 : 8 - in.remaining();
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/QueueBindBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/QueueBindBody.java b/client/src/main/java/org/apache/qpid/framing/QueueBindBody.java
index e7feebd..e776d96 100644
--- a/client/src/main/java/org/apache/qpid/framing/QueueBindBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/QueueBindBody.java
@@ -27,8 +27,10 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.util.ByteBufferUtils;
public class QueueBindBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -112,7 +114,7 @@ public class QueueBindBody extends AMQMethodBodyImpl implements EncodableAMQData
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeUnsignedShort( buffer, _ticket );
writeAMQShortString( buffer, _queue );
@@ -151,11 +153,11 @@ public class QueueBindBody extends AMQMethodBodyImpl implements EncodableAMQData
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer,
+ public static void process(final ByteBuffer buffer,
final ServerChannelMethodProcessor dispatcher) throws AMQFrameDecodingException
{
- int ticket = buffer.getUnsignedShort();
+ int ticket = ByteBufferUtils.getUnsignedShort(buffer);
AMQShortString queue = AMQShortString.readAMQShortString(buffer);
AMQShortString exchange = AMQShortString.readAMQShortString(buffer);
AMQShortString bindingKey = AMQShortString.readAMQShortString(buffer);
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/QueueBindOkBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/QueueBindOkBody.java b/client/src/main/java/org/apache/qpid/framing/QueueBindOkBody.java
index 13dd32f..f3e5045 100644
--- a/client/src/main/java/org/apache/qpid/framing/QueueBindOkBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/QueueBindOkBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class QueueBindOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -60,7 +61,7 @@ public class QueueBindOkBody extends AMQMethodBodyImpl implements EncodableAMQDa
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java b/client/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java
index dbedd5f..76d2377 100644
--- a/client/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/QueueDeclareBody.java
@@ -27,8 +27,10 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.util.ByteBufferUtils;
public class QueueDeclareBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -136,7 +138,7 @@ public class QueueDeclareBody extends AMQMethodBodyImpl implements EncodableAMQD
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeUnsignedShort( buffer, _ticket );
writeAMQShortString( buffer, _queue );
@@ -179,11 +181,11 @@ public class QueueDeclareBody extends AMQMethodBodyImpl implements EncodableAMQD
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer,
+ public static void process(final ByteBuffer buffer,
final ServerChannelMethodProcessor dispatcher) throws AMQFrameDecodingException
{
- int ticket = buffer.getUnsignedShort();
+ int ticket = ByteBufferUtils.getUnsignedShort(buffer);
AMQShortString queue = AMQShortString.readAMQShortString(buffer);
byte bitfield = buffer.get();
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java b/client/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java
index bbdd41e..b79921b 100644
--- a/client/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/QueueDeclareOkBody.java
@@ -27,8 +27,10 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.util.ByteBufferUtils;
public class QueueDeclareOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -82,7 +84,7 @@ public class QueueDeclareOkBody extends AMQMethodBodyImpl implements EncodableAM
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeAMQShortString( buffer, _queue );
writeUnsignedInteger( buffer, _messageCount );
@@ -109,12 +111,12 @@ public class QueueDeclareOkBody extends AMQMethodBodyImpl implements EncodableAM
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer,
+ public static void process(final ByteBuffer buffer,
final ClientChannelMethodProcessor dispatcher)
{
AMQShortString queue = AMQShortString.readAMQShortString(buffer);
- long messageCount = buffer.getUnsignedInt();
- long consumerCount = buffer.getUnsignedInt();
+ long messageCount = ByteBufferUtils.getUnsignedInt(buffer);
+ long consumerCount = ByteBufferUtils.getUnsignedInt(buffer);
if(!dispatcher.ignoreAllButCloseOk())
{
dispatcher.receiveQueueDeclareOk(queue, messageCount, consumerCount);
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java b/client/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java
index 007f309..ed085eb 100644
--- a/client/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/QueueDeleteBody.java
@@ -27,8 +27,10 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.util.ByteBufferUtils;
public class QueueDeleteBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -107,7 +109,7 @@ public class QueueDeleteBody extends AMQMethodBodyImpl implements EncodableAMQDa
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeUnsignedShort( buffer, _ticket );
writeAMQShortString( buffer, _queue );
@@ -140,11 +142,11 @@ public class QueueDeleteBody extends AMQMethodBodyImpl implements EncodableAMQDa
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer,
+ public static void process(final ByteBuffer buffer,
final ServerChannelMethodProcessor dispatcher)
{
- int ticket = buffer.getUnsignedShort();
+ int ticket = ByteBufferUtils.getUnsignedShort(buffer);
AMQShortString queue = AMQShortString.readAMQShortString(buffer);
byte bitfield = buffer.get();
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java b/client/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java
index 3fe0f40..8b9b303 100644
--- a/client/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/QueueDeleteOkBody.java
@@ -27,8 +27,10 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.util.ByteBufferUtils;
public class QueueDeleteOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -67,7 +69,7 @@ public class QueueDeleteOkBody extends AMQMethodBodyImpl implements EncodableAMQ
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeUnsignedInteger( buffer, _messageCount );
}
@@ -86,10 +88,10 @@ public class QueueDeleteOkBody extends AMQMethodBodyImpl implements EncodableAMQ
return buf.toString();
}
- public static void process(QpidByteBuffer buffer,
+ public static void process(ByteBuffer buffer,
final ClientChannelMethodProcessor dispatcher)
{
- long messageCount = buffer.getUnsignedInt();
+ long messageCount = ByteBufferUtils.getUnsignedInt(buffer);
if(!dispatcher.ignoreAllButCloseOk())
{
dispatcher.receiveQueueDeleteOk(messageCount);
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java b/client/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java
index dac9822..d0b91a4 100644
--- a/client/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/QueuePurgeBody.java
@@ -27,8 +27,10 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.util.ByteBufferUtils;
public class QueuePurgeBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -87,7 +89,7 @@ public class QueuePurgeBody extends AMQMethodBodyImpl implements EncodableAMQDat
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeUnsignedShort( buffer, _ticket );
writeAMQShortString( buffer, _queue );
@@ -114,11 +116,11 @@ public class QueuePurgeBody extends AMQMethodBodyImpl implements EncodableAMQDat
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer,
+ public static void process(final ByteBuffer buffer,
final ServerChannelMethodProcessor dispatcher)
{
- int ticket = buffer.getUnsignedShort();
+ int ticket = ByteBufferUtils.getUnsignedShort(buffer);
AMQShortString queue = AMQShortString.readAMQShortString(buffer);
boolean nowait = (buffer.get() & 0x01) == 0x01;
if(!dispatcher.ignoreAllButCloseOk())
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java b/client/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java
index b26a6d8..451f030 100644
--- a/client/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/QueuePurgeOkBody.java
@@ -27,8 +27,10 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.util.ByteBufferUtils;
public class QueuePurgeOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -67,7 +69,7 @@ public class QueuePurgeOkBody extends AMQMethodBodyImpl implements EncodableAMQD
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeUnsignedInteger( buffer, _messageCount );
}
@@ -86,10 +88,10 @@ public class QueuePurgeOkBody extends AMQMethodBodyImpl implements EncodableAMQD
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer,
+ public static void process(final ByteBuffer buffer,
final ClientChannelMethodProcessor dispatcher)
{
- long messageCount = buffer.getUnsignedInt();
+ long messageCount = ByteBufferUtils.getUnsignedInt(buffer);
if(!dispatcher.ignoreAllButCloseOk())
{
dispatcher.receiveQueuePurgeOk(messageCount);
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java b/client/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java
index 1c2fbb7..12bafd1 100644
--- a/client/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/QueueUnbindBody.java
@@ -27,8 +27,10 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.util.ByteBufferUtils;
public class QueueUnbindBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -99,7 +101,7 @@ public class QueueUnbindBody extends AMQMethodBodyImpl implements EncodableAMQDa
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeUnsignedShort( buffer, _ticket );
writeAMQShortString( buffer, _queue );
@@ -134,11 +136,11 @@ public class QueueUnbindBody extends AMQMethodBodyImpl implements EncodableAMQDa
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer,
+ public static void process(final ByteBuffer buffer,
final ServerChannelMethodProcessor dispatcher) throws AMQFrameDecodingException
{
- int ticket = buffer.getUnsignedShort();
+ int ticket = ByteBufferUtils.getUnsignedShort(buffer);
AMQShortString queue = AMQShortString.readAMQShortString(buffer);
AMQShortString exchange = AMQShortString.readAMQShortString(buffer);
AMQShortString routingKey = AMQShortString.readAMQShortString(buffer);
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/QueueUnbindOkBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/QueueUnbindOkBody.java b/client/src/main/java/org/apache/qpid/framing/QueueUnbindOkBody.java
index 5e0e68e..4a9dd33 100644
--- a/client/src/main/java/org/apache/qpid/framing/QueueUnbindOkBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/QueueUnbindOkBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class QueueUnbindOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -60,7 +61,7 @@ public class QueueUnbindOkBody extends AMQMethodBodyImpl implements EncodableAMQ
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/TxCommitBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/TxCommitBody.java b/client/src/main/java/org/apache/qpid/framing/TxCommitBody.java
index 6594a3e..8f8a864 100644
--- a/client/src/main/java/org/apache/qpid/framing/TxCommitBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/TxCommitBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class TxCommitBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -61,7 +62,7 @@ public class TxCommitBody extends AMQMethodBodyImpl implements EncodableAMQDataB
return 0;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/TxCommitOkBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/TxCommitOkBody.java b/client/src/main/java/org/apache/qpid/framing/TxCommitOkBody.java
index ebdfa99..c5f9731 100644
--- a/client/src/main/java/org/apache/qpid/framing/TxCommitOkBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/TxCommitOkBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class TxCommitOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -59,7 +60,7 @@ public class TxCommitOkBody extends AMQMethodBodyImpl implements EncodableAMQDat
return 0;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/TxRollbackBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/TxRollbackBody.java b/client/src/main/java/org/apache/qpid/framing/TxRollbackBody.java
index 2576a38..881191c 100644
--- a/client/src/main/java/org/apache/qpid/framing/TxRollbackBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/TxRollbackBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class TxRollbackBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -59,7 +60,7 @@ public class TxRollbackBody extends AMQMethodBodyImpl implements EncodableAMQDat
return 0;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/TxRollbackOkBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/TxRollbackOkBody.java b/client/src/main/java/org/apache/qpid/framing/TxRollbackOkBody.java
index 38189fd..816269d 100644
--- a/client/src/main/java/org/apache/qpid/framing/TxRollbackOkBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/TxRollbackOkBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class TxRollbackOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -60,7 +61,7 @@ public class TxRollbackOkBody extends AMQMethodBodyImpl implements EncodableAMQD
return 0;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/TxSelectBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/TxSelectBody.java b/client/src/main/java/org/apache/qpid/framing/TxSelectBody.java
index 184a692..d5752f9 100644
--- a/client/src/main/java/org/apache/qpid/framing/TxSelectBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/TxSelectBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class TxSelectBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -62,7 +63,7 @@ public class TxSelectBody extends AMQMethodBodyImpl implements EncodableAMQDataB
return 0;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/TxSelectOkBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/TxSelectOkBody.java b/client/src/main/java/org/apache/qpid/framing/TxSelectOkBody.java
index d8e8c8c..82d14c2 100644
--- a/client/src/main/java/org/apache/qpid/framing/TxSelectOkBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/TxSelectOkBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class TxSelectOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -58,7 +59,7 @@ public class TxSelectOkBody extends AMQMethodBodyImpl implements EncodableAMQDat
return 0;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/transport/ByteBufferSender.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/transport/ByteBufferSender.java b/client/src/main/java/org/apache/qpid/transport/ByteBufferSender.java
index aac4ebd..7dcaf61 100644
--- a/client/src/main/java/org/apache/qpid/transport/ByteBufferSender.java
+++ b/client/src/main/java/org/apache/qpid/transport/ByteBufferSender.java
@@ -20,13 +20,11 @@
*/
package org.apache.qpid.transport;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import java.nio.ByteBuffer;
public interface ByteBufferSender
{
- boolean isDirectBufferPreferred();
-
- void send(QpidByteBuffer msg);
+ void send(ByteBuffer msg);
void flush();
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/transport/MessageTransfer.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/transport/MessageTransfer.java b/client/src/main/java/org/apache/qpid/transport/MessageTransfer.java
index cd0d255..53d9940 100644
--- a/client/src/main/java/org/apache/qpid/transport/MessageTransfer.java
+++ b/client/src/main/java/org/apache/qpid/transport/MessageTransfer.java
@@ -21,19 +21,14 @@ package org.apache.qpid.transport;
*/
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
+import java.nio.ByteBuffer;
import java.util.LinkedHashMap;
-import java.util.List;
import java.util.Map;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.transport.codec.Decoder;
import org.apache.qpid.transport.codec.Encoder;
-
-import org.apache.qpid.util.Strings;
import org.apache.qpid.transport.network.Frame;
+import org.apache.qpid.util.Strings;
public final class MessageTransfer extends Method {
@@ -71,22 +66,13 @@ public final class MessageTransfer extends Method {
private MessageAcceptMode acceptMode;
private MessageAcquireMode acquireMode;
private Header header;
- private Collection<QpidByteBuffer> _body;
+ private ByteBuffer _body;
public MessageTransfer() {}
public MessageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode, Header header, java.nio.ByteBuffer body, Option ... options)
{
- this(destination,
- acceptMode,
- acquireMode,
- header,
- Collections.singletonList(QpidByteBuffer.wrap(body)),
- options);
- }
-
- public MessageTransfer(String destination, MessageAcceptMode acceptMode, MessageAcquireMode acquireMode, Header header, Collection<QpidByteBuffer> body, Option ... _options) {
if(destination != null) {
setDestination(destination);
}
@@ -99,13 +85,13 @@ public final class MessageTransfer extends Method {
setHeader(header);
setBody(body);
- for (int i=0; i < _options.length; i++) {
- switch (_options[i]) {
+ for (int i=0; i < options.length; i++) {
+ switch (options[i]) {
case SYNC: this.setSync(true); break;
case BATCH: this.setBatch(true); break;
case UNRELIABLE: this.setUnreliable(true); break;
case NONE: break;
- default: throw new IllegalArgumentException("invalid option: " + _options[i]);
+ default: throw new IllegalArgumentException("invalid option: " + options[i]);
}
}
@@ -209,19 +195,13 @@ public final class MessageTransfer extends Method {
}
@Override
- public final Collection<QpidByteBuffer> getBody() {
- if (this._body == null)
- {
- return null;
- }
- else
- {
- return Collections.unmodifiableCollection(_body);
- }
+ public final ByteBuffer getBody()
+ {
+ return _body;
}
@Override
- public final void setBody(Collection<QpidByteBuffer> body)
+ public final void setBody(ByteBuffer body)
{
if (body == null)
{
@@ -230,14 +210,8 @@ public final class MessageTransfer extends Method {
}
else
{
- _body = new ArrayList<>(body.size());
- int size = 0;
- for (QpidByteBuffer buf : body)
- {
- size += buf.remaining();
- _body.add(buf.duplicate());
- }
- _bodySize = size;
+ _body = body.duplicate();
+ _bodySize = _body.remaining();
}
}
@@ -247,25 +221,16 @@ public final class MessageTransfer extends Method {
return _bodySize;
}
- public final MessageTransfer body(List<QpidByteBuffer> body)
+ public final byte[] getBodyBytes()
{
- setBody(body);
- return this;
- }
-
- public final byte[] getBodyBytes() {
- Collection<QpidByteBuffer> body = getBody();
byte[] bytes = new byte[getBodySize()];
- for(QpidByteBuffer buf : body)
- {
- buf.duplicate().get(bytes);
- }
+ _body.duplicate().get(bytes);
return bytes;
}
public final void setBody(byte[] body)
{
- setBody(Collections.singletonList(QpidByteBuffer.wrap(body)));
+ setBody(ByteBuffer.wrap(body));
}
public final String getBodyString() {
@@ -338,10 +303,7 @@ public final class MessageTransfer extends Method {
{
if (_body != null)
{
- for (QpidByteBuffer buf : _body)
- {
- buf.dispose();
- }
+ _body = null;
}
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/transport/Method.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/transport/Method.java b/client/src/main/java/org/apache/qpid/transport/Method.java
index d23ce72..744556a 100644
--- a/client/src/main/java/org/apache/qpid/transport/Method.java
+++ b/client/src/main/java/org/apache/qpid/transport/Method.java
@@ -20,12 +20,11 @@
*/
package org.apache.qpid.transport;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.transport.network.Frame;
-
import static org.apache.qpid.transport.util.Functions.str;
-import java.util.Collection;
+import java.nio.ByteBuffer;
+
+import org.apache.qpid.transport.network.Frame;
/**
* Method
@@ -126,12 +125,12 @@ public abstract class Method extends Struct implements ProtocolEvent
throw new UnsupportedOperationException();
}
- public Collection<QpidByteBuffer> getBody()
+ public ByteBuffer getBody()
{
return null;
}
- public void setBody(Collection<QpidByteBuffer> body)
+ public void setBody(ByteBuffer body)
{
throw new UnsupportedOperationException();
}
@@ -217,7 +216,7 @@ public abstract class Method extends Struct implements ProtocolEvent
str.append(st);
}
}
- Collection<QpidByteBuffer> body = getBody();
+ ByteBuffer body = getBody();
if (body != null)
{
str.append("\n body=");
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/transport/ProtocolHeader.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/transport/ProtocolHeader.java b/client/src/main/java/org/apache/qpid/transport/ProtocolHeader.java
index f0d4887..a9481ac 100644
--- a/client/src/main/java/org/apache/qpid/transport/ProtocolHeader.java
+++ b/client/src/main/java/org/apache/qpid/transport/ProtocolHeader.java
@@ -20,7 +20,8 @@
*/
package org.apache.qpid.transport;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.transport.network.Frame;
import org.apache.qpid.transport.network.NetworkDelegate;
import org.apache.qpid.transport.network.NetworkEvent;
@@ -92,9 +93,9 @@ public final class ProtocolHeader implements NetworkEvent, ProtocolEvent
return false;
}
- public QpidByteBuffer toByteBuffer()
+ public ByteBuffer toByteBuffer()
{
- QpidByteBuffer buf = QpidByteBuffer.allocate(8);
+ ByteBuffer buf = ByteBuffer.allocate(8);
buf.put(AMQP);
buf.put(protoClass);
buf.put(instance);
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/transport/network/Assembler.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/transport/network/Assembler.java b/client/src/main/java/org/apache/qpid/transport/network/Assembler.java
index 931b629..0f383b9 100644
--- a/client/src/main/java/org/apache/qpid/transport/network/Assembler.java
+++ b/client/src/main/java/org/apache/qpid/transport/network/Assembler.java
@@ -22,12 +22,10 @@ package org.apache.qpid.transport.network;
import java.nio.ByteBuffer;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageProperties;
@@ -243,7 +241,7 @@ public class Assembler implements NetworkEventReceiver, NetworkDelegate
break;
case BODY:
command = getIncompleteCommand(channel);
- command.setBody(Collections.singletonList(QpidByteBuffer.wrap(segment)));
+ command.setBody(segment);
setIncompleteCommand(channel, null);
emit(channel, command);
break;
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/transport/network/Disassembler.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/transport/network/Disassembler.java b/client/src/main/java/org/apache/qpid/transport/network/Disassembler.java
index 47f9d7a..785e252 100644
--- a/client/src/main/java/org/apache/qpid/transport/network/Disassembler.java
+++ b/client/src/main/java/org/apache/qpid/transport/network/Disassembler.java
@@ -28,14 +28,10 @@ import static org.apache.qpid.transport.network.Frame.LAST_FRAME;
import static org.apache.qpid.transport.network.Frame.LAST_SEG;
import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.FrameSizeObserver;
import org.apache.qpid.transport.Header;
@@ -48,6 +44,7 @@ import org.apache.qpid.transport.ProtocolHeader;
import org.apache.qpid.transport.SegmentType;
import org.apache.qpid.transport.Struct;
import org.apache.qpid.transport.codec.BBEncoder;
+import org.apache.qpid.util.ByteBufferUtils;
/**
* Disassembler
@@ -172,47 +169,31 @@ public final class Disassembler implements ProtocolEventSender, ProtocolDelegate
{
ByteBuffer buf = enc.underlyingBuffer();
buf.flip();
- QpidByteBuffer copy = QpidByteBuffer.allocate(_sender.isDirectBufferPreferred(), buf.remaining());
- copy.putCopyOf(QpidByteBuffer.wrap(buf));
+ ByteBuffer copy = ByteBuffer.allocate(buf.remaining());
+ ByteBufferUtils.copyTo(buf, copy);
copy.flip();
- final QpidByteBuffer methodBuf = copy.view(0, methodLimit);
- fragment(flags, type, method, Collections.singletonList(methodBuf));
- methodBuf.dispose();
+ final ByteBuffer methodBuf = ByteBufferUtils.view(copy,0, methodLimit);
+ fragment(flags, type, method, methodBuf);
if (payload)
{
- Collection<QpidByteBuffer> bodies = method.getBody();
- QpidByteBuffer headerBuf = copy.view(methodLimit, headerLimit);
- fragment(bodies == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, Collections.singletonList(headerBuf));
- headerBuf.dispose();
- if (bodies != null)
+ ByteBuffer body = method.getBody();
+ ByteBuffer headerBuf = ByteBufferUtils.view(copy, methodLimit, headerLimit);
+ fragment(body == null ? LAST_SEG : 0x0, SegmentType.HEADER, method, headerBuf);
+ if (body != null)
{
- Collection<QpidByteBuffer> dup = new ArrayList<>(bodies.size());
- for(QpidByteBuffer b : bodies)
- {
- dup.add(b.duplicate());
- }
- fragment(LAST_SEG, SegmentType.BODY, method, dup);
- for(QpidByteBuffer b : dup)
- {
- b.dispose();
- }
+ fragment(LAST_SEG, SegmentType.BODY, method, body.duplicate());
}
}
- copy.dispose();
}
}
- private void fragment(byte flags, SegmentType type, ProtocolEvent event, Collection<QpidByteBuffer> buffers)
+ private void fragment(byte flags, SegmentType type, ProtocolEvent event, ByteBuffer buffer)
{
byte typeb = (byte) type.getValue();
byte track = event.getEncodedTrack() == Frame.L4 ? (byte) 1 : (byte) 0;
- int remaining = 0;
- for(QpidByteBuffer b : buffers)
- {
- remaining += b.remaining();
- }
+ int remaining = buffer.remaining();
boolean first = true;
while (true)
{
@@ -230,7 +211,7 @@ public final class Disassembler implements ProtocolEventSender, ProtocolDelegate
newflags |= LAST_FRAME;
}
- frame(newflags, typeb, track, event.getChannel(), size, buffers);
+ frame(newflags, typeb, track, event.getChannel(), size, buffer);
if (remaining == 0)
{
@@ -239,9 +220,9 @@ public final class Disassembler implements ProtocolEventSender, ProtocolDelegate
}
}
- private void frame(byte flags, byte type, byte track, int channel, int size, Collection<QpidByteBuffer> buffers)
+ private void frame(byte flags, byte type, byte track, int channel, int size, ByteBuffer buffer)
{
- QpidByteBuffer data = QpidByteBuffer.allocate(_sender.isDirectBufferPreferred(), HEADER_SIZE);
+ ByteBuffer data = ByteBuffer.allocate(HEADER_SIZE);
data.put(0, flags);
data.put(1, type);
@@ -252,31 +233,12 @@ public final class Disassembler implements ProtocolEventSender, ProtocolDelegate
_sender.send(data);
- data.dispose();
if(size > 0)
{
- int residual = size;
- for(QpidByteBuffer b : buffers)
- {
- final int remaining = b.remaining();
- if(remaining > 0 )
- {
- if(remaining >= residual)
- {
- final QpidByteBuffer buffer = b.view(0, residual);
- _sender.send(buffer);
- buffer.dispose();
- b.position(b.position() + residual);
- break;
- }
- else
- {
- _sender.send(b);
- residual-=remaining;
- }
- }
- }
+ final ByteBuffer view = ByteBufferUtils.view(buffer, 0, size);
+ _sender.send(view);
+ buffer.position(buffer.position() + size);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/transport/network/io/IoSender.java b/client/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
index 3ab0601..b17e3de 100644
--- a/client/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
+++ b/client/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
@@ -23,6 +23,7 @@ import static org.apache.qpid.transport.util.Functions.mod;
import java.io.IOException;
import java.io.OutputStream;
import java.net.Socket;
+import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.net.ssl.SSLSocket;
@@ -30,7 +31,6 @@ import javax.net.ssl.SSLSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.thread.Threading;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.SenderClosedException;
@@ -117,13 +117,7 @@ public final class IoSender implements Runnable, ByteBufferSender
return result;
}
- @Override
- public boolean isDirectBufferPreferred()
- {
- return false;
- }
-
- public void send(QpidByteBuffer buf)
+ public void send(ByteBuffer buf)
{
checkNotAlreadyClosed();
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java b/client/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
index 16d5a00..b7802f7 100644
--- a/client/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
+++ b/client/src/main/java/org/apache/qpid/transport/network/security/sasl/SASLSender.java
@@ -21,6 +21,7 @@
package org.apache.qpid.transport.network.security.sasl;
+import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.security.sasl.SaslException;
@@ -28,7 +29,6 @@ import javax.security.sasl.SaslException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.SenderException;
@@ -71,13 +71,7 @@ public class SASLSender extends SASLEncryptor implements ByteBufferSender
delegate.flush();
}
- @Override
- public boolean isDirectBufferPreferred()
- {
- return false;
- }
-
- public void send(QpidByteBuffer buf)
+ public void send(ByteBuffer buf)
{
if (closed.get())
{
@@ -98,7 +92,7 @@ public class SASLSender extends SASLEncryptor implements ByteBufferSender
byte[] out = getSaslClient().wrap(appData, 0, length);
LOGGER.debug("out.length {}", out.length);
- delegate.send(QpidByteBuffer.wrap(out));
+ delegate.send(ByteBuffer.wrap(out));
}
catch (SaslException e)
{
@@ -111,7 +105,6 @@ public class SASLSender extends SASLEncryptor implements ByteBufferSender
{
delegate.send(buf);
}
- buf.dispose();
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java b/client/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
index 236b68d..a8c8eb5 100644
--- a/client/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
+++ b/client/src/main/java/org/apache/qpid/transport/network/security/ssl/SSLSender.java
@@ -19,6 +19,7 @@
*/
package org.apache.qpid.transport.network.security.ssl;
+import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -32,10 +33,10 @@ import javax.net.ssl.SSLException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.transport.ByteBufferSender;
import org.apache.qpid.transport.SenderException;
import org.apache.qpid.transport.network.security.SSLStatus;
+import org.apache.qpid.util.ByteBufferUtils;
public class SSLSender implements ByteBufferSender
{
@@ -44,14 +45,14 @@ public class SSLSender implements ByteBufferSender
private final ByteBufferSender delegate;
private final SSLEngine engine;
private final int sslBufSize;
- private final QpidByteBuffer netData;
+ private final ByteBuffer netData;
private final long timeout;
private final SSLStatus _sslStatus;
private String _hostname;
private final AtomicBoolean closed = new AtomicBoolean(false);
- private final ConcurrentLinkedQueue<QpidByteBuffer> _pending = new ConcurrentLinkedQueue<>();
+ private final ConcurrentLinkedQueue<ByteBuffer> _pending = new ConcurrentLinkedQueue<>();
public SSLSender(SSLEngine engine, ByteBufferSender delegate, SSLStatus sslStatus)
@@ -59,7 +60,7 @@ public class SSLSender implements ByteBufferSender
this.engine = engine;
this.delegate = delegate;
sslBufSize = engine.getSession().getPacketBufferSize();
- netData = QpidByteBuffer.allocate(sslBufSize);
+ netData = ByteBuffer.allocate(sslBufSize);
timeout = Long.getLong("qpid.ssl_timeout", 60000);
_sslStatus = sslStatus;
}
@@ -111,9 +112,9 @@ public class SSLSender implements ByteBufferSender
private void tearDownSSLConnection() throws Exception
{
- SSLEngineResult result = QpidByteBuffer.encryptSSL(engine,
- Collections.singletonList(QpidByteBuffer.allocate(0)),
- netData);
+ SSLEngineResult result = ByteBufferUtils.encryptSSL(engine,
+ Collections.singletonList(ByteBuffer.allocate(0)),
+ netData);
Status status = result.getStatus();
int read = result.bytesProduced();
while (status != Status.CLOSED)
@@ -128,7 +129,7 @@ public class SSLSender implements ByteBufferSender
netData.limit(netData.position());
netData.position(netData.position() - read);
- QpidByteBuffer data = netData.slice();
+ ByteBuffer data = netData.slice();
netData.limit(limit);
netData.position(netData.position() + read);
@@ -136,9 +137,9 @@ public class SSLSender implements ByteBufferSender
delegate.send(data);
flush();
}
- result = QpidByteBuffer.encryptSSL(engine,
- Collections.singletonList(QpidByteBuffer.allocate(0)),
- netData);
+ result = ByteBufferUtils.encryptSSL(engine,
+ Collections.singletonList(ByteBuffer.allocate(0)),
+ netData);
status = result.getStatus();
read = result.bytesProduced();
}
@@ -151,13 +152,7 @@ public class SSLSender implements ByteBufferSender
}
- @Override
- public boolean isDirectBufferPreferred()
- {
- return false;
- }
-
- public void send(QpidByteBuffer appData)
+ public void send(ByteBuffer appData)
{
_pending.add(appData.duplicate());
@@ -178,16 +173,15 @@ public class SSLSender implements ByteBufferSender
int read = 0;
try
{
- SSLEngineResult result = QpidByteBuffer.encryptSSL(engine, _pending, netData);
+ SSLEngineResult result = ByteBufferUtils.encryptSSL(engine, _pending, netData);
while(!_pending.isEmpty())
{
- QpidByteBuffer buf = _pending.peek();
+ ByteBuffer buf = _pending.peek();
if (buf.hasRemaining())
{
break;
}
- buf.dispose();
_pending.poll();
}
@@ -207,7 +201,7 @@ public class SSLSender implements ByteBufferSender
netData.limit(netData.position());
netData.position(netData.position() - read);
- QpidByteBuffer data = netData.slice();
+ ByteBuffer data = netData.slice();
netData.limit(limit);
netData.position(netData.position() + read);
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/transport/util/Functions.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/transport/util/Functions.java b/client/src/main/java/org/apache/qpid/transport/util/Functions.java
index 68523f7..3c60f1b 100644
--- a/client/src/main/java/org/apache/qpid/transport/util/Functions.java
+++ b/client/src/main/java/org/apache/qpid/transport/util/Functions.java
@@ -23,10 +23,6 @@ package org.apache.qpid.transport.util;
import static java.lang.Math.min;
import java.nio.ByteBuffer;
-import java.util.Collection;
-
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-import org.apache.qpid.util.ByteBufferUtils;
/**
@@ -72,32 +68,6 @@ public final class Functions
public static final String str(ByteBuffer buf, int limit,int start)
{
- return str(QpidByteBuffer.wrap(buf), limit, start);
- }
-
- public static final String str(QpidByteBuffer buf)
- {
- return str(buf, buf.remaining());
- }
-
- public static final String str(QpidByteBuffer buf, int limit)
- {
- return str(buf, limit, buf.position());
- }
-
- public static final String str(Collection<QpidByteBuffer> buf, int limit, int start)
- {
- return str(ByteBufferUtils.combine(buf),limit,start);
- }
-
-
- public static final String str(Collection<QpidByteBuffer> buf, int limit)
- {
- return str(buf, limit, 0);
- }
-
- public static final String str(QpidByteBuffer buf, int limit, int start)
- {
StringBuilder str = new StringBuilder();
str.append('"');
@@ -139,10 +109,6 @@ public final class Functions
{
return hex(bytes, limit, "");
}
- public static String hex(QpidByteBuffer bytes, int limit)
- {
- return hex(bytes, limit, "");
- }
public static String hex(byte[] bytes, int limit, CharSequence separator)
{
@@ -163,25 +129,4 @@ public final class Functions
}
return sb.toString();
}
-
- public static String hex(QpidByteBuffer bytes, int limit, CharSequence separator)
- {
- limit = Math.min(limit, bytes == null ? 0 : bytes.remaining());
- StringBuilder sb = new StringBuilder(3 + limit*2);
- for(int i = 0; i < limit; i++)
- {
- sb.append(HEX_CHARACTERS[(((int)(bytes.get(bytes.position()+i))) & 0xf0)>>4]);
- sb.append(HEX_CHARACTERS[(((int)bytes.get(bytes.position()+i)) & 0x0f)]);
- if(i != bytes.remaining() - 1)
- {
- sb.append(separator);
- }
- }
- if(bytes != null && bytes.remaining()>limit)
- {
- sb.append("...");
- }
- return sb.toString();
- }
-
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/util/ByteBufferUtils.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/util/ByteBufferUtils.java b/client/src/main/java/org/apache/qpid/util/ByteBufferUtils.java
index ce1ae1c..0f31c93 100644
--- a/client/src/main/java/org/apache/qpid/util/ByteBufferUtils.java
+++ b/client/src/main/java/org/apache/qpid/util/ByteBufferUtils.java
@@ -23,13 +23,16 @@ package org.apache.qpid.util;
import java.nio.ByteBuffer;
import java.util.Collection;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLException;
public class ByteBufferUtils
{
private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
+ private static final ByteBuffer[] EMPTY_BYTE_BUFFER_ARRAY = new ByteBuffer[0];
- public static ByteBuffer combine(Collection<QpidByteBuffer> bufs)
+ public static ByteBuffer combine(Collection<ByteBuffer> bufs)
{
if(bufs == null || bufs.isEmpty())
{
@@ -39,33 +42,88 @@ public class ByteBufferUtils
{
int size = 0;
boolean isDirect = false;
- for(QpidByteBuffer buf : bufs)
+ for(ByteBuffer buf : bufs)
{
size += buf.remaining();
isDirect = isDirect || buf.isDirect();
}
ByteBuffer combined = isDirect ? ByteBuffer.allocateDirect(size) : ByteBuffer.allocate(size);
- for(QpidByteBuffer buf : bufs)
+ for(ByteBuffer buf : bufs)
{
- buf.copyTo(combined);
+ copyTo(buf, combined);
}
combined.flip();
return combined;
}
}
- public static int remaining(Collection<QpidByteBuffer> bufs)
+
+ public static long getUnsignedInt(ByteBuffer buffer)
{
- int size = 0;
- if (bufs != null && !bufs.isEmpty())
- {
- for (QpidByteBuffer buf : bufs)
- {
- size += buf.remaining();
- }
+ return ((long) buffer.getInt()) & 0xffffffffL;
+ }
+
+ public static void putUnsignedInt(ByteBuffer buffer, long value)
+ {
+ buffer.putInt((int) value);
+ }
+
+ public static int getUnsignedShort(ByteBuffer buffer)
+ {
+ return ((int) buffer.getShort()) & 0xffff;
+ }
+
+ public static void putUnsignedShort(ByteBuffer buffer, int value)
+ {
+ buffer.putShort((short) value);
+ }
+
+ public static short getUnsignedByte(ByteBuffer buffer)
+ {
+ return (short) (((short) buffer.get()) & 0xff);
+ }
+ public static void putUnsignedByte(ByteBuffer buffer, short value)
+ {
+ buffer.put((byte) value);
+ }
+
+ public static ByteBuffer view(ByteBuffer buffer, int offset, int length)
+ {
+ ByteBuffer view = buffer.slice();
+ view.position(offset);
+ int newLimit = Math.min(view.position() + length, view.capacity());
+ view.limit(newLimit);
+ return view.slice();
+ }
+
+ public static void copyTo(ByteBuffer src, byte[] dst)
+ {
+ ByteBuffer copy = src.duplicate();
+ copy.get(dst);
+ }
+
+ public static void copyTo(ByteBuffer src, ByteBuffer dst)
+ {
+ ByteBuffer copy = src.duplicate();
+ dst.put(copy);
+ }
+
+ public static SSLEngineResult encryptSSL(SSLEngine engine,
+ final Collection<ByteBuffer> buffers,
+ ByteBuffer dest) throws SSLException
+ {
+ final ByteBuffer[] src;
+ // QPID-7447: prevent unnecessary allocations
+ if (buffers.isEmpty())
+ {
+ src = EMPTY_BYTE_BUFFER_ARRAY;
+ }
+ else
+ {
+ src = buffers.toArray(new ByteBuffer[buffers.size()]);
}
- return size;
+ return engine.wrap(src, dest);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[3/4] qpid-jms-amqp-0-x git commit: QPID-7725: [Java Client,
AMQP 0-x] Remove QpidByteBuffer
Posted by lq...@apache.org.
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/AMQType.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/AMQType.java b/client/src/main/java/org/apache/qpid/framing/AMQType.java
index b42efed..24594ce 100644
--- a/client/src/main/java/org/apache/qpid/framing/AMQType.java
+++ b/client/src/main/java/org/apache/qpid/framing/AMQType.java
@@ -21,9 +21,10 @@
package org.apache.qpid.framing;
import java.math.BigDecimal;
+import java.nio.ByteBuffer;
import java.util.Collection;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.util.ByteBufferUtils;
/**
* AMQType is a type that represents the different possible AMQP field table types. It provides operations for each
@@ -51,12 +52,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, QpidByteBuffer buffer)
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{
EncodingUtils.writeLongStringBytes(buffer, (String) value);
}
- public Object readValueFromBuffer(QpidByteBuffer buffer)
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
return EncodingUtils.readLongString(buffer);
}
@@ -98,14 +99,14 @@ public enum AMQType
}
- public void writeValueImpl(Object value, QpidByteBuffer buffer)
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{
buffer.putLong( (Long) value);
}
- public Object readValueFromBuffer(QpidByteBuffer buffer)
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
- return buffer.getUnsignedInt();
+ return ByteBufferUtils.getUnsignedInt(buffer);
}
},
@@ -130,7 +131,7 @@ public enum AMQType
}
- public void writeValueImpl(Object value, QpidByteBuffer buffer)
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{
BigDecimal bd = (BigDecimal) value;
@@ -143,7 +144,7 @@ public enum AMQType
buffer.putInt(unscaled);
}
- public Object readValueFromBuffer(QpidByteBuffer buffer)
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
byte places = buffer.get();
@@ -175,12 +176,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, QpidByteBuffer buffer)
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{
buffer.putLong ((Long) value);
}
- public Object readValueFromBuffer(QpidByteBuffer buffer)
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
return buffer.getLong();
}
@@ -233,7 +234,7 @@ public enum AMQType
return (FieldTable) value;
}
- public void writeValueImpl(Object value, QpidByteBuffer buffer)
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{
// Ensure that the value is a FieldTable.
if (!(value instanceof FieldTable))
@@ -253,7 +254,7 @@ public enum AMQType
*
* @return An instance of the type.
*/
- public Object readValueFromBuffer(QpidByteBuffer buffer)
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
try
{
@@ -295,7 +296,7 @@ public enum AMQType
return FieldArray.asFieldArray((Collection)value);
}
- public void writeValueImpl(Object value, QpidByteBuffer buffer)
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{
if (!(value instanceof FieldArray))
@@ -316,7 +317,7 @@ public enum AMQType
*
* @return An instance of the type.
*/
- public Object readValueFromBuffer(QpidByteBuffer buffer)
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
// Read size of field table then all name/value pairs.
return FieldArray.readFromBuffer(buffer);
@@ -343,10 +344,10 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, QpidByteBuffer buffer)
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{ }
- public Object readValueFromBuffer(QpidByteBuffer buffer)
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
return null;
}
@@ -372,12 +373,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, QpidByteBuffer buffer)
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{
EncodingUtils.writeLongstr(buffer, (byte[]) value);
}
- public Object readValueFromBuffer(QpidByteBuffer buffer)
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
return EncodingUtils.readLongstr(buffer);
}
@@ -402,12 +403,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, QpidByteBuffer buffer)
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{
EncodingUtils.writeLongStringBytes(buffer, (String) value);
}
- public Object readValueFromBuffer(QpidByteBuffer buffer)
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
return EncodingUtils.readLongString(buffer);
}
@@ -433,12 +434,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, QpidByteBuffer buffer)
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{
EncodingUtils.writeLongStringBytes(buffer, (String) value);
}
- public Object readValueFromBuffer(QpidByteBuffer buffer)
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
return EncodingUtils.readLongString(buffer);
}
@@ -468,12 +469,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, QpidByteBuffer buffer)
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{
EncodingUtils.writeBoolean(buffer, (Boolean) value);
}
- public Object readValueFromBuffer(QpidByteBuffer buffer)
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
return buffer.get() == 1;
}
@@ -503,13 +504,13 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, QpidByteBuffer buffer)
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{
char charVal = (Character) value;
buffer.put((byte) charVal);
}
- public Object readValueFromBuffer(QpidByteBuffer buffer)
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
return (char) buffer.get();
}
@@ -539,12 +540,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, QpidByteBuffer buffer)
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{
buffer.put((Byte) value);
}
- public Object readValueFromBuffer(QpidByteBuffer buffer)
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
return buffer.get();
}
@@ -578,12 +579,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, QpidByteBuffer buffer)
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{
buffer.putShort((Short) value);
}
- public Object readValueFromBuffer(QpidByteBuffer buffer)
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
return buffer.getShort();
}
@@ -620,11 +621,11 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, QpidByteBuffer buffer)
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{
buffer.putInt((Integer) value);
}
- public Object readValueFromBuffer(QpidByteBuffer buffer)
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
return buffer.getInt();
}
@@ -666,12 +667,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, QpidByteBuffer buffer)
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{
buffer.putLong ((Long) value);
}
- public Object readValueFromBuffer(QpidByteBuffer buffer)
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
return buffer.getLong();
}
@@ -701,12 +702,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, QpidByteBuffer buffer)
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{
buffer.putFloat ((Float) value);
}
- public Object readValueFromBuffer(QpidByteBuffer buffer)
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
return buffer.getFloat();
}
@@ -740,12 +741,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, QpidByteBuffer buffer)
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{
buffer.putDouble((Double) value);
}
- public Object readValueFromBuffer(QpidByteBuffer buffer)
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
return buffer.getDouble();
}
@@ -805,13 +806,13 @@ public enum AMQType
return AMQTypedValue.createAMQTypedValue(this, toNativeValue(value));
}
- public void writeToBuffer(Object value, QpidByteBuffer buffer)
+ public void writeToBuffer(Object value, ByteBuffer buffer)
{
buffer.put(identifier());
writeValueImpl(value, buffer);
}
- abstract void writeValueImpl(Object value, QpidByteBuffer buffer);
+ abstract void writeValueImpl(Object value, ByteBuffer buffer);
/**
* Reads an instance of the type from a specified byte buffer.
@@ -820,5 +821,5 @@ public enum AMQType
*
* @return An instance of the type.
*/
- abstract Object readValueFromBuffer(QpidByteBuffer buffer);
+ abstract Object readValueFromBuffer(ByteBuffer buffer);
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/AMQTypedValue.java b/client/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
index dd4605e..d71e89b 100644
--- a/client/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
+++ b/client/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
@@ -21,12 +21,11 @@
package org.apache.qpid.framing;
import java.math.BigDecimal;
+import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.Date;
import java.util.Map;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
-
/**
* AMQTypedValue combines together a native Java Object value, and an {@link AMQType}, as a fully typed AMQP parameter
* value. It provides the ability to read and write fully typed parameters to and from byte buffers. It also provides
@@ -40,7 +39,7 @@ public abstract class AMQTypedValue
public abstract Object getValue();
- public abstract void writeToBuffer(QpidByteBuffer buffer);
+ public abstract void writeToBuffer(ByteBuffer buffer);
public abstract int getEncodingSize();
@@ -64,7 +63,7 @@ public abstract class AMQTypedValue
_value = type.toNativeValue(value);
}
- private GenericTypedValue(AMQType type, QpidByteBuffer buffer)
+ private GenericTypedValue(AMQType type, ByteBuffer buffer)
{
_type = type;
_value = type.readValueFromBuffer(buffer);
@@ -81,7 +80,7 @@ public abstract class AMQTypedValue
return _value;
}
- public void writeToBuffer(QpidByteBuffer buffer)
+ public void writeToBuffer(ByteBuffer buffer)
{
_type.writeToBuffer(_value, buffer);
}
@@ -128,7 +127,7 @@ public abstract class AMQTypedValue
_value = value;
}
- public LongTypedValue(QpidByteBuffer buffer)
+ public LongTypedValue(ByteBuffer buffer)
{
_value = buffer.getLong();
}
@@ -144,7 +143,7 @@ public abstract class AMQTypedValue
return _value;
}
- public void writeToBuffer(QpidByteBuffer buffer)
+ public void writeToBuffer(ByteBuffer buffer)
{
buffer.put(AMQType.LONG.identifier());
buffer.putLong(_value);
@@ -183,7 +182,7 @@ public abstract class AMQTypedValue
return _value;
}
- public void writeToBuffer(QpidByteBuffer buffer)
+ public void writeToBuffer(ByteBuffer buffer)
{
buffer.put(AMQType.INT.identifier());
buffer.putInt(_value);
@@ -196,7 +195,7 @@ public abstract class AMQTypedValue
}
- public static AMQTypedValue readFromBuffer(QpidByteBuffer buffer)
+ public static AMQTypedValue readFromBuffer(ByteBuffer buffer)
{
AMQType type = AMQTypeMap.getType(buffer.get());
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/AccessRequestBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/AccessRequestBody.java b/client/src/main/java/org/apache/qpid/framing/AccessRequestBody.java
index 9d2c535..0291c1b 100644
--- a/client/src/main/java/org/apache/qpid/framing/AccessRequestBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/AccessRequestBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class AccessRequestBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -120,7 +121,7 @@ public class AccessRequestBody extends AMQMethodBodyImpl implements EncodableAMQ
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeAMQShortString( buffer, _realm );
writeBitfield( buffer, _bitfield0 );
@@ -155,7 +156,7 @@ public class AccessRequestBody extends AMQMethodBodyImpl implements EncodableAMQ
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer,
+ public static void process(final ByteBuffer buffer,
final ServerChannelMethodProcessor dispatcher)
{
AMQShortString realm = AMQShortString.readAMQShortString(buffer);
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java b/client/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java
index bffaa21..d03094d 100644
--- a/client/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/AccessRequestOkBody.java
@@ -27,8 +27,10 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.util.ByteBufferUtils;
public class AccessRequestOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -67,7 +69,7 @@ public class AccessRequestOkBody extends AMQMethodBodyImpl implements EncodableA
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeUnsignedShort( buffer, _ticket );
}
@@ -86,10 +88,10 @@ public class AccessRequestOkBody extends AMQMethodBodyImpl implements EncodableA
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer,
+ public static void process(final ByteBuffer buffer,
final ClientChannelMethodProcessor dispatcher)
{
- int ticket = buffer.getUnsignedShort();
+ int ticket = ByteBufferUtils.getUnsignedShort(buffer);
if(!dispatcher.ignoreAllButCloseOk())
{
dispatcher.receiveAccessRequestOk(ticket);
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/BasicAckBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/BasicAckBody.java b/client/src/main/java/org/apache/qpid/framing/BasicAckBody.java
index 9a832a2..410c7d4 100644
--- a/client/src/main/java/org/apache/qpid/framing/BasicAckBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/BasicAckBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class BasicAckBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -79,7 +80,7 @@ public class BasicAckBody extends AMQMethodBodyImpl implements EncodableAMQDataB
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeLong( buffer, _deliveryTag );
writeBitfield( buffer, _bitfield0 );
@@ -102,7 +103,7 @@ public class BasicAckBody extends AMQMethodBodyImpl implements EncodableAMQDataB
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer,
+ public static void process(final ByteBuffer buffer,
final ChannelMethodProcessor dispatcher)
{
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/BasicCancelBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/BasicCancelBody.java b/client/src/main/java/org/apache/qpid/framing/BasicCancelBody.java
index 7890127..5af85e2 100644
--- a/client/src/main/java/org/apache/qpid/framing/BasicCancelBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/BasicCancelBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class BasicCancelBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -80,7 +81,7 @@ public class BasicCancelBody extends AMQMethodBodyImpl implements EncodableAMQDa
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeAMQShortString( buffer, _consumerTag );
writeBitfield( buffer, _bitfield0 );
@@ -103,7 +104,7 @@ public class BasicCancelBody extends AMQMethodBodyImpl implements EncodableAMQDa
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer,
+ public static void process(final ByteBuffer buffer,
final ServerChannelMethodProcessor dispatcher)
{
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java b/client/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java
index a2f0dfc..0434d5c 100644
--- a/client/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/BasicCancelOkBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class BasicCancelOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -68,7 +69,7 @@ public class BasicCancelOkBody extends AMQMethodBodyImpl implements EncodableAMQ
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeAMQShortString( buffer, _consumerTag );
}
@@ -87,7 +88,7 @@ public class BasicCancelOkBody extends AMQMethodBodyImpl implements EncodableAMQ
return buf.toString();
}
- public static void process(QpidByteBuffer in,
+ public static void process(ByteBuffer in,
final ClientChannelMethodProcessor dispatcher)
{
AMQShortString consumerTag = AMQShortString.readAMQShortString(in);
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java b/client/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java
index c9492d4..bca5473 100644
--- a/client/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/BasicConsumeBody.java
@@ -27,8 +27,10 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.util.ByteBufferUtils;
public class BasicConsumeBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -134,7 +136,7 @@ public class BasicConsumeBody extends AMQMethodBodyImpl implements EncodableAMQD
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeUnsignedShort( buffer, _ticket );
writeAMQShortString( buffer, _queue );
@@ -178,12 +180,12 @@ public class BasicConsumeBody extends AMQMethodBodyImpl implements EncodableAMQD
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer,
+ public static void process(final ByteBuffer buffer,
final ServerChannelMethodProcessor dispatcher)
throws AMQFrameDecodingException
{
- int ticket = buffer.getUnsignedShort();
+ int ticket = ByteBufferUtils.getUnsignedShort(buffer);
AMQShortString queue = AMQShortString.readAMQShortString(buffer);
AMQShortString consumerTag = AMQShortString.readAMQShortString(buffer);
byte bitfield = buffer.get();
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java b/client/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java
index 0b5d120..a24f0e2 100644
--- a/client/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/BasicConsumeOkBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class BasicConsumeOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -68,7 +69,7 @@ public class BasicConsumeOkBody extends AMQMethodBodyImpl implements EncodableAM
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeAMQShortString( buffer, _consumerTag );
}
@@ -87,7 +88,7 @@ public class BasicConsumeOkBody extends AMQMethodBodyImpl implements EncodableAM
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer,
+ public static void process(final ByteBuffer buffer,
final ClientChannelMethodProcessor dispatcher)
{
AMQShortString consumerTag = AMQShortString.readAMQShortString(buffer);
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java b/client/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
index 4ba8d59..ec4512e 100644
--- a/client/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
+++ b/client/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
@@ -20,11 +20,13 @@
*/
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.ByteBufferUtils;
public class BasicContentHeaderProperties
{
@@ -80,7 +82,7 @@ public class BasicContentHeaderProperties
private static final int APPLICATION_ID_MASK = 1 << 3;
private static final int CLUSTER_ID_MASK = 1 << 2;
- private QpidByteBuffer _encodedForm;
+ private ByteBuffer _encodedForm;
public BasicContentHeaderProperties(BasicContentHeaderProperties other)
@@ -224,11 +226,11 @@ public class BasicContentHeaderProperties
return _propertyFlags;
}
- public synchronized long writePropertyListPayload(QpidByteBuffer buffer)
+ public synchronized long writePropertyListPayload(ByteBuffer buffer)
{
if(useEncodedForm())
{
- buffer.putCopyOf(_encodedForm);
+ ByteBufferUtils.copyTo(_encodedForm, buffer);
return _encodedForm.remaining();
}
@@ -317,10 +319,10 @@ public class BasicContentHeaderProperties
}
}
- public int read(QpidByteBuffer input)
+ public int read(ByteBuffer input)
{
- _propertyFlags = input.getUnsignedShort();
+ _propertyFlags = ByteBufferUtils.getUnsignedShort(input);
int length = 2;
if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0)
{
@@ -459,25 +461,22 @@ public class BasicContentHeaderProperties
{
if(useEncodedForm())
{
- final QpidByteBuffer duplicate = _encodedForm.duplicate();
- sender.send(duplicate);
- duplicate.dispose();
+ sender.send(_encodedForm.duplicate());
return _encodedForm.remaining();
}
else
{
int propertyListSize = getPropertyListSize();
- QpidByteBuffer buf = QpidByteBuffer.allocate(sender.isDirectBufferPreferred(), propertyListSize);
+ ByteBuffer buf = ByteBuffer.allocate(propertyListSize);
writePropertyListPayload(buf);
buf.flip();
sender.send(buf);
- buf.dispose();
return propertyListSize;
}
}
- public synchronized void populatePropertiesFromBuffer(QpidByteBuffer buffer, int propertyFlags, int size) throws AMQFrameDecodingException
+ public synchronized void populatePropertiesFromBuffer(ByteBuffer buffer, int propertyFlags, int size) throws AMQFrameDecodingException
{
_propertyFlags = propertyFlags;
@@ -485,20 +484,14 @@ public class BasicContentHeaderProperties
{
_logger.debug("Property flags: " + _propertyFlags);
}
- if(_encodedForm != null)
- {
- _encodedForm.dispose();
- }
- _encodedForm = buffer.view(0,size);
+ _encodedForm = ByteBufferUtils.view(buffer, 0, size);
- final QpidByteBuffer byteBuffer = _encodedForm.slice();
- decode(byteBuffer);
- byteBuffer.dispose();
+ decode(_encodedForm.slice());
buffer.position(buffer.position()+size);
}
- private void decode(QpidByteBuffer buffer) throws AMQFrameDecodingException
+ private void decode(ByteBuffer buffer) throws AMQFrameDecodingException
{
if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0)
{
@@ -512,11 +505,10 @@ public class BasicContentHeaderProperties
if ((_propertyFlags & HEADERS_MASK) != 0)
{
- long length = buffer.getUnsignedInt();
+ long length = ByteBufferUtils.getUnsignedInt(buffer);
- QpidByteBuffer buf = buffer.view(0, (int)length);
+ ByteBuffer buf = ByteBufferUtils.view(buffer, 0, (int) length);
_headers = new FieldTable(buf);
- buf.dispose();
buffer.position(buffer.position()+(int)length);
}
@@ -958,7 +950,6 @@ public class BasicContentHeaderProperties
{
if(_encodedForm != null)
{
- _encodedForm.dispose();
_encodedForm = null;
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java b/client/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java
index f7e87dd..44c4b46 100644
--- a/client/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/BasicDeliverBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class BasicDeliverBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -104,7 +105,7 @@ public class BasicDeliverBody extends AMQMethodBodyImpl implements EncodableAMQD
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeAMQShortString( buffer, _consumerTag );
writeLong( buffer, _deliveryTag );
@@ -139,7 +140,7 @@ public class BasicDeliverBody extends AMQMethodBodyImpl implements EncodableAMQD
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer,
+ public static void process(final ByteBuffer buffer,
final ClientChannelMethodProcessor dispatcher)
{
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/BasicGetBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/BasicGetBody.java b/client/src/main/java/org/apache/qpid/framing/BasicGetBody.java
index 61a2e13..f61dcf4 100644
--- a/client/src/main/java/org/apache/qpid/framing/BasicGetBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/BasicGetBody.java
@@ -27,8 +27,10 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.util.ByteBufferUtils;
public class BasicGetBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -87,7 +89,7 @@ public class BasicGetBody extends AMQMethodBodyImpl implements EncodableAMQDataB
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeUnsignedShort( buffer, _ticket );
writeAMQShortString( buffer, _queue );
@@ -114,11 +116,11 @@ public class BasicGetBody extends AMQMethodBodyImpl implements EncodableAMQDataB
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer,
+ public static void process(final ByteBuffer buffer,
final ServerChannelMethodProcessor dispatcher)
{
- int ticket = buffer.getUnsignedShort();
+ int ticket = ByteBufferUtils.getUnsignedShort(buffer);
AMQShortString queue = AMQShortString.readAMQShortString(buffer);
boolean noAck = (buffer.get() & 0x01) != 0;
if(!dispatcher.ignoreAllButCloseOk())
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java b/client/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java
index dbc52a4..ea3a56d 100644
--- a/client/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/BasicGetEmptyBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class BasicGetEmptyBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -68,7 +69,7 @@ public class BasicGetEmptyBody extends AMQMethodBodyImpl implements EncodableAMQ
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeAMQShortString( buffer, _clusterId );
}
@@ -87,7 +88,7 @@ public class BasicGetEmptyBody extends AMQMethodBodyImpl implements EncodableAMQ
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer,
+ public static void process(final ByteBuffer buffer,
final ClientChannelMethodProcessor dispatcher)
{
AMQShortString clusterId = AMQShortString.readAMQShortString(buffer);
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java b/client/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java
index 4032d50..82101d0 100644
--- a/client/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/BasicGetOkBody.java
@@ -27,8 +27,10 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.util.ByteBufferUtils;
public class BasicGetOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -103,7 +105,7 @@ public class BasicGetOkBody extends AMQMethodBodyImpl implements EncodableAMQDat
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeLong( buffer, _deliveryTag );
writeBitfield( buffer, _bitfield0 );
@@ -138,14 +140,14 @@ public class BasicGetOkBody extends AMQMethodBodyImpl implements EncodableAMQDat
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer,
+ public static void process(final ByteBuffer buffer,
final ClientChannelMethodProcessor dispatcher)
{
long deliveryTag = buffer.getLong();
boolean redelivered = (buffer.get() & 0x01) != 0;
AMQShortString exchange = AMQShortString.readAMQShortString(buffer);
AMQShortString routingKey = AMQShortString.readAMQShortString(buffer);
- long messageCount = buffer.getUnsignedInt();
+ long messageCount = ByteBufferUtils.getUnsignedInt(buffer);
if(!dispatcher.ignoreAllButCloseOk())
{
dispatcher.receiveBasicGetOk(deliveryTag, redelivered, exchange, routingKey, messageCount);
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/BasicNackBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/BasicNackBody.java b/client/src/main/java/org/apache/qpid/framing/BasicNackBody.java
index fbb26d1..daa302a 100644
--- a/client/src/main/java/org/apache/qpid/framing/BasicNackBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/BasicNackBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class BasicNackBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -91,7 +92,7 @@ public class BasicNackBody extends AMQMethodBodyImpl implements EncodableAMQData
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeLong( buffer, _deliveryTag );
writeBitfield( buffer, _bitfield0 );
@@ -117,7 +118,7 @@ public class BasicNackBody extends AMQMethodBodyImpl implements EncodableAMQData
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer,
+ public static void process(final ByteBuffer buffer,
final ChannelMethodProcessor dispatcher)
{
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/BasicPublishBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/BasicPublishBody.java b/client/src/main/java/org/apache/qpid/framing/BasicPublishBody.java
index fbf08d8..164e5ef 100644
--- a/client/src/main/java/org/apache/qpid/framing/BasicPublishBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/BasicPublishBody.java
@@ -27,8 +27,10 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.util.ByteBufferUtils;
public class BasicPublishBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -105,7 +107,7 @@ public class BasicPublishBody extends AMQMethodBodyImpl implements EncodableAMQD
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeUnsignedShort( buffer, _ticket );
writeAMQShortString( buffer, _exchange );
@@ -139,11 +141,11 @@ public class BasicPublishBody extends AMQMethodBodyImpl implements EncodableAMQD
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer,
+ public static void process(final ByteBuffer buffer,
final ServerChannelMethodProcessor dispatcher)
{
- int ticket = buffer.getUnsignedShort();
+ int ticket = ByteBufferUtils.getUnsignedShort(buffer);
AMQShortString exchange = AMQShortString.readAMQShortString(buffer);
AMQShortString routingKey = AMQShortString.readAMQShortString(buffer);
byte bitfield = buffer.get();
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/BasicQosBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/BasicQosBody.java b/client/src/main/java/org/apache/qpid/framing/BasicQosBody.java
index c3caa8a..a33db2c 100644
--- a/client/src/main/java/org/apache/qpid/framing/BasicQosBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/BasicQosBody.java
@@ -27,8 +27,10 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.util.ByteBufferUtils;
public class BasicQosBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -86,7 +88,7 @@ public class BasicQosBody extends AMQMethodBodyImpl implements EncodableAMQDataB
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeUnsignedInteger( buffer, _prefetchSize );
writeUnsignedShort( buffer, _prefetchCount );
@@ -113,12 +115,12 @@ public class BasicQosBody extends AMQMethodBodyImpl implements EncodableAMQDataB
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer,
+ public static void process(final ByteBuffer buffer,
final ServerChannelMethodProcessor dispatcher)
{
- long prefetchSize = buffer.getUnsignedInt();
- int prefetchCount = buffer.getUnsignedShort();
+ long prefetchSize = ByteBufferUtils.getUnsignedInt(buffer);
+ int prefetchCount = ByteBufferUtils.getUnsignedShort(buffer);
boolean global = (buffer.get() & 0x01) == 0x01;
if(!dispatcher.ignoreAllButCloseOk())
{
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/BasicQosOkBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/BasicQosOkBody.java b/client/src/main/java/org/apache/qpid/framing/BasicQosOkBody.java
index 7e3f867..3e73136 100644
--- a/client/src/main/java/org/apache/qpid/framing/BasicQosOkBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/BasicQosOkBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class BasicQosOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -60,7 +61,7 @@ public class BasicQosOkBody extends AMQMethodBodyImpl implements EncodableAMQDat
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java b/client/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java
index 04190ef..f75e9e8 100644
--- a/client/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/BasicRecoverBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class BasicRecoverBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -72,7 +73,7 @@ public class BasicRecoverBody extends AMQMethodBodyImpl implements EncodableAMQD
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeBitfield( buffer, _bitfield0 );
}
@@ -91,7 +92,7 @@ public class BasicRecoverBody extends AMQMethodBodyImpl implements EncodableAMQD
return buf.toString();
}
- public static void process(final QpidByteBuffer in,
+ public static void process(final ByteBuffer in,
final ProtocolVersion protocolVersion,
final ServerChannelMethodProcessor dispatcher)
{
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java b/client/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java
index b5d78ae..4af0504 100644
--- a/client/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/BasicRecoverSyncBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class BasicRecoverSyncBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -74,7 +75,7 @@ public class BasicRecoverSyncBody extends AMQMethodBodyImpl implements Encodable
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeBitfield( buffer, _bitfield0 );
}
@@ -93,7 +94,7 @@ public class BasicRecoverSyncBody extends AMQMethodBodyImpl implements Encodable
return buf.toString();
}
- public static void process(final QpidByteBuffer in,
+ public static void process(final ByteBuffer in,
final ServerChannelMethodProcessor dispatcher)
{
boolean requeue = (in.get() & 0x01) == 0x01;
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/BasicRecoverSyncOkBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/BasicRecoverSyncOkBody.java b/client/src/main/java/org/apache/qpid/framing/BasicRecoverSyncOkBody.java
index 0dba591..a246f12 100644
--- a/client/src/main/java/org/apache/qpid/framing/BasicRecoverSyncOkBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/BasicRecoverSyncOkBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class BasicRecoverSyncOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -68,7 +69,7 @@ public class BasicRecoverSyncOkBody extends AMQMethodBodyImpl implements Encodab
return 0;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/BasicRejectBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/BasicRejectBody.java b/client/src/main/java/org/apache/qpid/framing/BasicRejectBody.java
index 854916e..8fd30e6 100644
--- a/client/src/main/java/org/apache/qpid/framing/BasicRejectBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/BasicRejectBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class BasicRejectBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -79,7 +80,7 @@ public class BasicRejectBody extends AMQMethodBodyImpl implements EncodableAMQDa
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeLong( buffer, _deliveryTag );
writeBitfield( buffer, _bitfield0 );
@@ -102,7 +103,7 @@ public class BasicRejectBody extends AMQMethodBodyImpl implements EncodableAMQDa
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer,
+ public static void process(final ByteBuffer buffer,
final ServerChannelMethodProcessor dispatcher)
{
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/BasicReturnBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/BasicReturnBody.java b/client/src/main/java/org/apache/qpid/framing/BasicReturnBody.java
index a5f7511..2dd3a6b 100644
--- a/client/src/main/java/org/apache/qpid/framing/BasicReturnBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/BasicReturnBody.java
@@ -27,8 +27,10 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.util.ByteBufferUtils;
public class BasicReturnBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -91,7 +93,7 @@ public class BasicReturnBody extends AMQMethodBodyImpl implements EncodableAMQDa
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeUnsignedShort( buffer, _replyCode );
writeAMQShortString( buffer, _replyText );
@@ -122,11 +124,11 @@ public class BasicReturnBody extends AMQMethodBodyImpl implements EncodableAMQDa
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer,
+ public static void process(final ByteBuffer buffer,
final ClientChannelMethodProcessor dispatcher)
{
- int replyCode = buffer.getUnsignedShort();
+ int replyCode = ByteBufferUtils.getUnsignedShort(buffer);
AMQShortString replyText = AMQShortString.readAMQShortString(buffer);
AMQShortString exchange = AMQShortString.readAMQShortString(buffer);
AMQShortString routingKey = AMQShortString.readAMQShortString(buffer);
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java b/client/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java
index e4065c8..e06aa65 100644
--- a/client/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/ChannelAlertBody.java
@@ -27,8 +27,10 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.util.ByteBufferUtils;
public class ChannelAlertBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -83,7 +85,7 @@ public class ChannelAlertBody extends AMQMethodBodyImpl implements EncodableAMQD
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeUnsignedShort( buffer, _replyCode );
writeAMQShortString( buffer, _replyText );
@@ -110,12 +112,12 @@ public class ChannelAlertBody extends AMQMethodBodyImpl implements EncodableAMQD
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer,
+ public static void process(final ByteBuffer buffer,
final ClientChannelMethodProcessor dispatcher)
throws AMQFrameDecodingException
{
- int replyCode = buffer.getUnsignedShort();
+ int replyCode = ByteBufferUtils.getUnsignedShort(buffer);
AMQShortString replyText = AMQShortString.readAMQShortString(buffer);
FieldTable details = EncodingUtils.readFieldTable(buffer);
if(!dispatcher.ignoreAllButCloseOk())
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java b/client/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java
index 03ebd01..961d58d 100644
--- a/client/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/ChannelCloseBody.java
@@ -27,8 +27,10 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.util.ByteBufferUtils;
public class ChannelCloseBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -89,7 +91,7 @@ public class ChannelCloseBody extends AMQMethodBodyImpl implements EncodableAMQD
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeUnsignedShort( buffer, _replyCode );
writeAMQShortString( buffer, _replyText );
@@ -120,14 +122,14 @@ public class ChannelCloseBody extends AMQMethodBodyImpl implements EncodableAMQD
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer,
+ public static void process(final ByteBuffer buffer,
final ChannelMethodProcessor dispatcher)
{
- int replyCode = buffer.getUnsignedShort();
+ int replyCode = ByteBufferUtils.getUnsignedShort(buffer);
AMQShortString replyText = AMQShortString.readAMQShortString(buffer);
- int classId = buffer.getUnsignedShort();
- int methodId = buffer.getUnsignedShort();
+ int classId = ByteBufferUtils.getUnsignedShort(buffer);
+ int methodId = ByteBufferUtils.getUnsignedShort(buffer);
if(!dispatcher.ignoreAllButCloseOk())
{
dispatcher.receiveChannelClose(replyCode, replyText, classId, methodId);
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/ChannelCloseOkBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/ChannelCloseOkBody.java b/client/src/main/java/org/apache/qpid/framing/ChannelCloseOkBody.java
index 3b1366e..42eedfb 100644
--- a/client/src/main/java/org/apache/qpid/framing/ChannelCloseOkBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/ChannelCloseOkBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class ChannelCloseOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -59,7 +60,7 @@ public class ChannelCloseOkBody extends AMQMethodBodyImpl implements EncodableAM
return 0;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java b/client/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java
index 5cf1e38..30bac89 100644
--- a/client/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/ChannelFlowBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class ChannelFlowBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -64,7 +65,7 @@ public class ChannelFlowBody extends AMQMethodBodyImpl implements EncodableAMQDa
return 1;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeBitfield( buffer, _active ? (byte)1 : (byte)0);
}
@@ -83,7 +84,7 @@ public class ChannelFlowBody extends AMQMethodBodyImpl implements EncodableAMQDa
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer,
+ public static void process(final ByteBuffer buffer,
final ChannelMethodProcessor dispatcher)
{
boolean active = (buffer.get() & 0x01) == 0x01;
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java b/client/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java
index c91bc26..d783125 100644
--- a/client/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/ChannelFlowOkBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class ChannelFlowOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -65,7 +66,7 @@ public class ChannelFlowOkBody extends AMQMethodBodyImpl implements EncodableAMQ
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeBitfield( buffer, _active ? (byte)1 : (byte)0 );
}
@@ -84,7 +85,7 @@ public class ChannelFlowOkBody extends AMQMethodBodyImpl implements EncodableAMQ
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer,
+ public static void process(final ByteBuffer buffer,
final ChannelMethodProcessor dispatcher)
{
boolean active = (buffer.get() & 0x01) == 0x01;
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java b/client/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java
index 5a111b0..607ca96 100644
--- a/client/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java
+++ b/client/src/main/java/org/apache/qpid/framing/ChannelMethodProcessor.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.framing;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import java.nio.ByteBuffer;
public interface ChannelMethodProcessor
{
@@ -32,7 +32,7 @@ public interface ChannelMethodProcessor
void receiveChannelCloseOk();
- void receiveMessageContent(QpidByteBuffer data);
+ void receiveMessageContent(ByteBuffer data);
void receiveMessageHeader(BasicContentHeaderProperties properties, long bodySize);
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java b/client/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java
index fbf66ed..0b8eaa0 100644
--- a/client/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/ChannelOpenBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class ChannelOpenBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -57,7 +58,7 @@ public class ChannelOpenBody extends AMQMethodBodyImpl implements EncodableAMQDa
return 1;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeAMQShortString( buffer, null );
}
@@ -73,7 +74,7 @@ public class ChannelOpenBody extends AMQMethodBodyImpl implements EncodableAMQDa
}
public static void process(final int channelId,
- final QpidByteBuffer buffer,
+ final ByteBuffer buffer,
final ServerMethodProcessor dispatcher)
{
AMQShortString.readAMQShortString(buffer);
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java b/client/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java
index 82d2388..92588f1 100644
--- a/client/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/ChannelOpenOkBody.java
@@ -28,9 +28,9 @@
package org.apache.qpid.framing;
import java.io.IOException;
+import java.nio.ByteBuffer;
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class ChannelOpenOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -41,7 +41,7 @@ public class ChannelOpenOkBody extends AMQMethodBodyImpl implements EncodableAMQ
public static final ChannelOpenOkBody INSTANCE_0_8 = new ChannelOpenOkBody(true);
public static final ChannelOpenOkBody INSTANCE_0_9 = new ChannelOpenOkBody(false);
- public static ChannelOpenOkBody getInstance(ProtocolVersion protocolVersion, QpidByteBuffer input)
+ public static ChannelOpenOkBody getInstance(ProtocolVersion protocolVersion, ByteBuffer input)
throws IOException
{
final boolean isAMQP08 = ProtocolVersion.v0_8.equals(protocolVersion);
@@ -77,7 +77,7 @@ public class ChannelOpenOkBody extends AMQMethodBodyImpl implements EncodableAMQ
return _isAMQP08 ? 0 : 4;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
if(!_isAMQP08)
{
@@ -95,7 +95,7 @@ public class ChannelOpenOkBody extends AMQMethodBodyImpl implements EncodableAMQ
return "[ChannelOpenOkBody]";
}
- public static void process(final QpidByteBuffer in,
+ public static void process(final ByteBuffer in,
final ProtocolVersion protocolVersion,
final ClientChannelMethodProcessor dispatcher)
{
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/ConfirmSelectBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/ConfirmSelectBody.java b/client/src/main/java/org/apache/qpid/framing/ConfirmSelectBody.java
index 59bdc08..3ec99e5 100644
--- a/client/src/main/java/org/apache/qpid/framing/ConfirmSelectBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/ConfirmSelectBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class ConfirmSelectBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -64,7 +65,7 @@ public class ConfirmSelectBody extends AMQMethodBodyImpl implements EncodableAMQ
return 1;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeBitfield( buffer, _nowait ? (byte)1 : (byte)0 );
}
@@ -83,7 +84,7 @@ public class ConfirmSelectBody extends AMQMethodBodyImpl implements EncodableAMQ
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer,
+ public static void process(final ByteBuffer buffer,
final ServerChannelMethodProcessor dispatcher)
{
boolean nowait = (buffer.get() & 0x01) == 0x01;
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/ConfirmSelectOkBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/ConfirmSelectOkBody.java b/client/src/main/java/org/apache/qpid/framing/ConfirmSelectOkBody.java
index a6f8317..11e46f2 100644
--- a/client/src/main/java/org/apache/qpid/framing/ConfirmSelectOkBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/ConfirmSelectOkBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class ConfirmSelectOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -58,7 +59,7 @@ public class ConfirmSelectOkBody extends AMQMethodBodyImpl implements EncodableA
return 0;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java b/client/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java
index 2550b06..117de14 100644
--- a/client/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/ConnectionCloseBody.java
@@ -27,8 +27,10 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.util.ByteBufferUtils;
public class ConnectionCloseBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -90,7 +92,7 @@ public class ConnectionCloseBody extends AMQMethodBodyImpl implements EncodableA
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeUnsignedShort( buffer, _replyCode );
writeAMQShortString( buffer, _replyText );
@@ -121,12 +123,12 @@ public class ConnectionCloseBody extends AMQMethodBodyImpl implements EncodableA
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer, final MethodProcessor dispatcher)
+ public static void process(final ByteBuffer buffer, final MethodProcessor dispatcher)
{
- int replyCode = buffer.getUnsignedShort();
+ int replyCode = ByteBufferUtils.getUnsignedShort(buffer);
AMQShortString replyText = AMQShortString.readAMQShortString(buffer);
- int classId = buffer.getUnsignedShort();
- int methodId = buffer.getUnsignedShort();
+ int classId = ByteBufferUtils.getUnsignedShort(buffer);
+ int methodId = ByteBufferUtils.getUnsignedShort(buffer);
dispatcher.receiveConnectionClose(replyCode, replyText, classId, methodId);
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/ConnectionCloseOkBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/ConnectionCloseOkBody.java b/client/src/main/java/org/apache/qpid/framing/ConnectionCloseOkBody.java
index b0e0193..d4c59f7 100644
--- a/client/src/main/java/org/apache/qpid/framing/ConnectionCloseOkBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/ConnectionCloseOkBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class ConnectionCloseOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -62,7 +63,7 @@ public class ConnectionCloseOkBody extends AMQMethodBodyImpl implements Encodabl
return 0;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java b/client/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java
index 123201c..3062233 100644
--- a/client/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/ConnectionOpenBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class ConnectionOpenBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -83,7 +84,7 @@ public class ConnectionOpenBody extends AMQMethodBodyImpl implements EncodableAM
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeAMQShortString( buffer, _virtualHost );
writeAMQShortString( buffer, _capabilities );
@@ -110,7 +111,7 @@ public class ConnectionOpenBody extends AMQMethodBodyImpl implements EncodableAM
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer, final ServerMethodProcessor dispatcher)
+ public static void process(final ByteBuffer buffer, final ServerMethodProcessor dispatcher)
{
AMQShortString virtualHost = AMQShortString.readAMQShortString(buffer);
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java b/client/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java
index 6e909d5..e0afce2 100644
--- a/client/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/ConnectionOpenOkBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class ConnectionOpenOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -69,7 +70,7 @@ public class ConnectionOpenOkBody extends AMQMethodBodyImpl implements Encodable
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeAMQShortString( buffer, _knownHosts );
}
@@ -88,7 +89,7 @@ public class ConnectionOpenOkBody extends AMQMethodBodyImpl implements Encodable
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer, final ClientMethodProcessor dispatcher)
+ public static void process(final ByteBuffer buffer, final ClientMethodProcessor dispatcher)
{
AMQShortString knownHosts = AMQShortString.readAMQShortString(buffer);
if(!dispatcher.ignoreAllButCloseOk())
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java b/client/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java
index d110f85..65657dc 100644
--- a/client/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/ConnectionRedirectBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class ConnectionRedirectBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -75,7 +76,7 @@ public class ConnectionRedirectBody extends AMQMethodBodyImpl implements Encodab
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeAMQShortString( buffer, _host );
writeAMQShortString( buffer, _knownHosts );
@@ -98,7 +99,7 @@ public class ConnectionRedirectBody extends AMQMethodBodyImpl implements Encodab
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer, final ClientMethodProcessor dispatcher)
+ public static void process(final ByteBuffer buffer, final ClientMethodProcessor dispatcher)
{
AMQShortString host = AMQShortString.readAMQShortString(buffer);
AMQShortString knownHosts = AMQShortString.readAMQShortString(buffer);
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java b/client/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java
index 2f47487..4c73279 100644
--- a/client/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/ConnectionSecureBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class ConnectionSecureBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -69,7 +70,7 @@ public class ConnectionSecureBody extends AMQMethodBodyImpl implements Encodable
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeBytes( buffer, _challenge );
}
@@ -88,7 +89,7 @@ public class ConnectionSecureBody extends AMQMethodBodyImpl implements Encodable
return buf.toString();
}
- public static void process(final QpidByteBuffer in, final ClientMethodProcessor dispatcher)
+ public static void process(final ByteBuffer in, final ClientMethodProcessor dispatcher)
throws AMQFrameDecodingException
{
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java b/client/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java
index 6566b1d..48108fb 100644
--- a/client/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/ConnectionSecureOkBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class ConnectionSecureOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -68,7 +69,7 @@ public class ConnectionSecureOkBody extends AMQMethodBodyImpl implements Encodab
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeBytes( buffer, _response );
}
@@ -87,7 +88,7 @@ public class ConnectionSecureOkBody extends AMQMethodBodyImpl implements Encodab
return buf.toString();
}
- public static void process(final QpidByteBuffer in, final ServerMethodProcessor dispatcher)
+ public static void process(final ByteBuffer in, final ServerMethodProcessor dispatcher)
{
byte[] response = EncodingUtils.readBytes(in);
if(!dispatcher.ignoreAllButCloseOk())
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java b/client/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java
index 682d968..3b82916 100644
--- a/client/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/ConnectionStartBody.java
@@ -27,8 +27,10 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.util.ByteBufferUtils;
public class ConnectionStartBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -98,7 +100,7 @@ public class ConnectionStartBody extends AMQMethodBodyImpl implements EncodableA
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeUnsignedByte( buffer, _versionMajor );
writeUnsignedByte( buffer, _versionMinor );
@@ -133,11 +135,11 @@ public class ConnectionStartBody extends AMQMethodBodyImpl implements EncodableA
return buf.toString();
}
- public static void process(final QpidByteBuffer in, final ClientMethodProcessor dispatcher)
+ public static void process(final ByteBuffer in, final ClientMethodProcessor dispatcher)
throws AMQFrameDecodingException
{
- short versionMajor = in.getUnsignedByte();
- short versionMinor = in.getUnsignedByte();
+ short versionMajor = ByteBufferUtils.getUnsignedByte(in);
+ short versionMinor = ByteBufferUtils.getUnsignedByte(in);
FieldTable serverProperties = EncodingUtils.readFieldTable(in);
byte[] mechanisms = EncodingUtils.readBytes(in);
byte[] locales = EncodingUtils.readBytes(in);
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java b/client/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java
index cd1908c..d8aa2e3 100644
--- a/client/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/ConnectionStartOkBody.java
@@ -27,8 +27,9 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
public class ConnectionStartOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -92,7 +93,7 @@ public class ConnectionStartOkBody extends AMQMethodBodyImpl implements Encodabl
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeFieldTable( buffer, _clientProperties );
writeAMQShortString( buffer, _mechanism );
@@ -123,7 +124,7 @@ public class ConnectionStartOkBody extends AMQMethodBodyImpl implements Encodabl
return buf.toString();
}
- public static void process(final QpidByteBuffer in, final ServerMethodProcessor dispatcher)
+ public static void process(final ByteBuffer in, final ServerMethodProcessor dispatcher)
throws AMQFrameDecodingException
{
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java b/client/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java
index f73385d..bd31b2f 100644
--- a/client/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/ConnectionTuneBody.java
@@ -27,8 +27,10 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.util.ByteBufferUtils;
public class ConnectionTuneBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -81,7 +83,7 @@ public class ConnectionTuneBody extends AMQMethodBodyImpl implements EncodableAM
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeUnsignedShort( buffer, _channelMax );
writeUnsignedInteger( buffer, _frameMax );
@@ -108,12 +110,12 @@ public class ConnectionTuneBody extends AMQMethodBodyImpl implements EncodableAM
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer, final ClientMethodProcessor dispatcher)
+ public static void process(final ByteBuffer buffer, final ClientMethodProcessor dispatcher)
{
- int channelMax = buffer.getUnsignedShort();
- long frameMax = buffer.getUnsignedInt();
- int heartbeat = buffer.getUnsignedShort();
+ int channelMax = ByteBufferUtils.getUnsignedShort(buffer);
+ long frameMax = ByteBufferUtils.getUnsignedInt(buffer);
+ int heartbeat = ByteBufferUtils.getUnsignedShort(buffer);
if(!dispatcher.ignoreAllButCloseOk())
{
dispatcher.receiveConnectionTune(channelMax, frameMax, heartbeat);
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java b/client/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java
index 35c2340..aef177d 100644
--- a/client/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/ConnectionTuneOkBody.java
@@ -27,8 +27,10 @@
package org.apache.qpid.framing;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
+import org.apache.qpid.util.ByteBufferUtils;
public class ConnectionTuneOkBody extends AMQMethodBodyImpl implements EncodableAMQDataBlock, AMQMethodBody
{
@@ -81,7 +83,7 @@ public class ConnectionTuneOkBody extends AMQMethodBodyImpl implements Encodable
return size;
}
- public void writeMethodPayload(QpidByteBuffer buffer)
+ public void writeMethodPayload(ByteBuffer buffer)
{
writeUnsignedShort( buffer, _channelMax );
writeUnsignedInteger( buffer, _frameMax );
@@ -108,12 +110,12 @@ public class ConnectionTuneOkBody extends AMQMethodBodyImpl implements Encodable
return buf.toString();
}
- public static void process(final QpidByteBuffer buffer, final ServerMethodProcessor dispatcher)
+ public static void process(final ByteBuffer buffer, final ServerMethodProcessor dispatcher)
{
- int channelMax = buffer.getUnsignedShort();
- long frameMax = buffer.getUnsignedInt();
- int heartbeat = buffer.getUnsignedShort();
+ int channelMax = ByteBufferUtils.getUnsignedShort(buffer);
+ long frameMax = ByteBufferUtils.getUnsignedInt(buffer);
+ int heartbeat = ByteBufferUtils.getUnsignedShort(buffer);
if(!dispatcher.ignoreAllButCloseOk())
{
dispatcher.receiveConnectionTuneOk(channelMax, frameMax, heartbeat);
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/95203e5d/client/src/main/java/org/apache/qpid/framing/ContentBody.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/framing/ContentBody.java b/client/src/main/java/org/apache/qpid/framing/ContentBody.java
index dd975ab..dbe0a55 100644
--- a/client/src/main/java/org/apache/qpid/framing/ContentBody.java
+++ b/client/src/main/java/org/apache/qpid/framing/ContentBody.java
@@ -23,28 +23,22 @@ package org.apache.qpid.framing;
import java.nio.ByteBuffer;
import org.apache.qpid.QpidException;
-import org.apache.qpid.bytebuffer.QpidByteBuffer;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.transport.ByteBufferSender;
+import org.apache.qpid.util.ByteBufferUtils;
public class ContentBody implements AMQBody
{
public static final byte TYPE = 3;
- private QpidByteBuffer _payload;
+ private ByteBuffer _payload;
public ContentBody(ByteBuffer payload)
{
- _payload = QpidByteBuffer.wrap(payload.duplicate());
- }
-
- public ContentBody(QpidByteBuffer payload)
- {
_payload = payload.duplicate();
}
-
public byte getFrameType()
{
return TYPE;
@@ -66,9 +60,7 @@ public class ContentBody implements AMQBody
{
if(_payload != null)
{
- final QpidByteBuffer duplicate = _payload.duplicate();
- sender.send(duplicate);
- duplicate.dispose();
+ sender.send(_payload.duplicate());
return _payload.remaining();
}
else
@@ -77,7 +69,7 @@ public class ContentBody implements AMQBody
}
}
- public QpidByteBuffer getPayload()
+ public ByteBuffer getPayload()
{
return _payload;
}
@@ -86,23 +78,20 @@ public class ContentBody implements AMQBody
{
if (_payload != null)
{
- _payload.dispose();
_payload = null;
}
}
- public static void process(final QpidByteBuffer in,
+ public static void process(final ByteBuffer in,
final ChannelMethodProcessor methodProcessor, final long bodySize)
{
-
- QpidByteBuffer payload = in.view(0, (int) bodySize);
+ ByteBuffer payload = ByteBufferUtils.view(in, 0, (int) bodySize);
if(!methodProcessor.ignoreAllButCloseOk())
{
methodProcessor.receiveMessageContent(payload);
}
- payload.dispose();
in.position(in.position()+(int)bodySize);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org