You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/09/09 19:47:27 UTC

svn commit: r1167311 [3/5] - in /qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/exchange/headers/ broker/src/main/java/org/apache/qpid/server/message/ broker/src/main/java/org/apache/qpid/server/output/amqp0_8/ broker/src/main/java/o...

Added: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java?rev=1167311&view=auto
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java (added)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java Fri Sep  9 17:47:22 2011
@@ -0,0 +1,674 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageNotReadableException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+
+class TypedBytesContentReader implements TypedBytesCodes
+{
+
+    private final ByteBuffer _data;
+    private final int _position;
+    private final int _limit;
+
+
+    private static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
+
+    private final CharsetDecoder _charsetDecoder = UTF8_CHARSET.newDecoder();
+
+    private int _byteArrayRemaining = -1;
+
+
+    public TypedBytesContentReader(final ByteBuffer data)
+    {
+        _data = data.duplicate();
+        _position = _data.position();
+        _limit = _data.limit();
+    }
+
+    /**
+     * Check that there is at least a certain number of bytes available to read
+     *
+     * @param len the number of bytes
+     * @throws javax.jms.MessageEOFException if there are less than len bytes available to read
+     */
+    protected void checkAvailable(int len) throws MessageEOFException
+    {
+        if (_data.remaining() < len)
+        {
+            throw new MessageEOFException("Unable to read " + len + " bytes");
+        }
+    }
+
+    protected byte readWireType() throws MessageFormatException, MessageEOFException,
+                                         MessageNotReadableException
+    {
+        checkAvailable(1);
+        return _data.get();
+    }
+
+    protected boolean readBoolean() throws JMSException
+    {
+        int position = _data.position();
+        byte wireType = readWireType();
+        boolean result;
+        try
+        {
+            switch (wireType)
+            {
+                case BOOLEAN_TYPE:
+                    checkAvailable(1);
+                    result = readBooleanImpl();
+                    break;
+                case STRING_TYPE:
+                    checkAvailable(1);
+                    result = Boolean.parseBoolean(readStringImpl());
+                    break;
+                default:
+                    _data.position(position);
+                    throw new MessageFormatException("Unable to convert " + wireType + " to a boolean");
+            }
+            return result;
+        }
+        catch (RuntimeException e)
+        {
+            _data.position(position);
+            throw e;
+        }
+    }
+
+    boolean readBooleanImpl()
+    {
+        return _data.get() != 0;
+    }
+
+    protected byte readByte() throws JMSException
+    {
+        int position = _data.position();
+        byte wireType = readWireType();
+        byte result;
+        try
+        {
+            switch (wireType)
+            {
+                case BYTE_TYPE:
+                    checkAvailable(1);
+                    result = readByteImpl();
+                    break;
+                case STRING_TYPE:
+                    checkAvailable(1);
+                    result = Byte.parseByte(readStringImpl());
+                    break;
+                default:
+                    _data.position(position);
+                    throw new MessageFormatException("Unable to convert " + wireType + " to a byte");
+            }
+        }
+        catch (RuntimeException e)
+        {
+            _data.position(position);
+            throw e;
+        }
+        return result;
+    }
+
+    byte readByteImpl()
+    {
+        return _data.get();
+    }
+
+    protected short readShort() throws JMSException
+    {
+        int position = _data.position();
+        byte wireType = readWireType();
+        short result;
+        try
+        {
+            switch (wireType)
+            {
+                case SHORT_TYPE:
+                    checkAvailable(2);
+                    result = readShortImpl();
+                    break;
+                case STRING_TYPE:
+                    checkAvailable(1);
+                    result = Short.parseShort(readStringImpl());
+                    break;
+                case BYTE_TYPE:
+                    checkAvailable(1);
+                    result = readByteImpl();
+                    break;
+                default:
+                    _data.position(position);
+                    throw new MessageFormatException("Unable to convert " + wireType + " to a short");
+            }
+        }
+        catch (RuntimeException e)
+        {
+            _data.position(position);
+            throw e;
+        }
+        return result;
+    }
+
+    short readShortImpl()
+    {
+        return _data.getShort();
+    }
+
+    /**
+     * Note that this method reads a unicode character as two bytes from the stream
+     *
+     * @return the character read from the stream
+     * @throws javax.jms.JMSException
+     */
+    protected char readChar() throws JMSException
+    {
+        int position = _data.position();
+        byte wireType = readWireType();
+        try
+        {
+            if (wireType == NULL_STRING_TYPE)
+            {
+                throw new NullPointerException();
+            }
+
+            if (wireType != CHAR_TYPE)
+            {
+                _data.position(position);
+                throw new MessageFormatException("Unable to convert " + wireType + " to a char");
+            }
+            else
+            {
+                checkAvailable(2);
+                return readCharImpl();
+            }
+        }
+        catch (RuntimeException e)
+        {
+            _data.position(position);
+            throw e;
+        }
+    }
+
+    char readCharImpl()
+    {
+        return _data.getChar();
+    }
+
+    protected int readInt() throws JMSException
+    {
+        int position = _data.position();
+        byte wireType = readWireType();
+        int result;
+        try
+        {
+            switch (wireType)
+            {
+                case INT_TYPE:
+                    checkAvailable(4);
+                    result = readIntImpl();
+                    break;
+                case SHORT_TYPE:
+                    checkAvailable(2);
+                    result = readShortImpl();
+                    break;
+                case STRING_TYPE:
+                    checkAvailable(1);
+                    result = Integer.parseInt(readStringImpl());
+                    break;
+                case BYTE_TYPE:
+                    checkAvailable(1);
+                    result = readByteImpl();
+                    break;
+                default:
+                    _data.position(position);
+                    throw new MessageFormatException("Unable to convert " + wireType + " to an int");
+            }
+            return result;
+        }
+        catch (RuntimeException e)
+        {
+            _data.position(position);
+            throw e;
+        }
+    }
+
+    protected int readIntImpl()
+    {
+        return _data.getInt();
+    }
+
+    protected long readLong() throws JMSException
+    {
+        int position = _data.position();
+        byte wireType = readWireType();
+        long result;
+        try
+        {
+            switch (wireType)
+            {
+                case LONG_TYPE:
+                    checkAvailable(8);
+                    result = readLongImpl();
+                    break;
+                case INT_TYPE:
+                    checkAvailable(4);
+                    result = readIntImpl();
+                    break;
+                case SHORT_TYPE:
+                    checkAvailable(2);
+                    result = readShortImpl();
+                    break;
+                case STRING_TYPE:
+                    checkAvailable(1);
+                    result = Long.parseLong(readStringImpl());
+                    break;
+                case BYTE_TYPE:
+                    checkAvailable(1);
+                    result = readByteImpl();
+                    break;
+                default:
+                    _data.position(position);
+                    throw new MessageFormatException("Unable to convert " + wireType + " to a long");
+            }
+            return result;
+        }
+        catch (RuntimeException e)
+        {
+            _data.position(position);
+            throw e;
+        }
+    }
+
+    long readLongImpl()
+    {
+        return _data.getLong();
+    }
+
+    protected float readFloat() throws JMSException
+    {
+        int position = _data.position();
+        byte wireType = readWireType();
+        float result;
+        try
+        {
+            switch (wireType)
+            {
+                case FLOAT_TYPE:
+                    checkAvailable(4);
+                    result = readFloatImpl();
+                    break;
+                case STRING_TYPE:
+                    checkAvailable(1);
+                    result = Float.parseFloat(readStringImpl());
+                    break;
+                default:
+                    _data.position(position);
+                    throw new MessageFormatException("Unable to convert " + wireType + " to a float");
+            }
+            return result;
+        }
+        catch (RuntimeException e)
+        {
+            _data.position(position);
+            throw e;
+        }
+    }
+
+    float readFloatImpl()
+    {
+        return _data.getFloat();
+    }
+
+    protected double readDouble() throws JMSException
+    {
+        int position = _data.position();
+        byte wireType = readWireType();
+        double result;
+        try
+        {
+            switch (wireType)
+            {
+                case DOUBLE_TYPE:
+                    checkAvailable(8);
+                    result = readDoubleImpl();
+                    break;
+                case FLOAT_TYPE:
+                    checkAvailable(4);
+                    result = readFloatImpl();
+                    break;
+                case STRING_TYPE:
+                    checkAvailable(1);
+                    result = Double.parseDouble(readStringImpl());
+                    break;
+                default:
+                    _data.position(position);
+                    throw new MessageFormatException("Unable to convert " + wireType + " to a double");
+            }
+            return result;
+        }
+        catch (RuntimeException e)
+        {
+            _data.position(position);
+            throw e;
+        }
+    }
+
+    double readDoubleImpl()
+    {
+        return _data.getDouble();
+    }
+
+    protected String readString() throws JMSException
+    {
+        int position = _data.position();
+        byte wireType = readWireType();
+        String result;
+        try
+        {
+            switch (wireType)
+            {
+                case STRING_TYPE:
+                    checkAvailable(1);
+                    result = readStringImpl();
+                    break;
+                case NULL_STRING_TYPE:
+                    result = null;
+                    throw new NullPointerException("data is null");
+                case BOOLEAN_TYPE:
+                    checkAvailable(1);
+                    result = String.valueOf(readBooleanImpl());
+                    break;
+                case LONG_TYPE:
+                    checkAvailable(8);
+                    result = String.valueOf(readLongImpl());
+                    break;
+                case INT_TYPE:
+                    checkAvailable(4);
+                    result = String.valueOf(readIntImpl());
+                    break;
+                case SHORT_TYPE:
+                    checkAvailable(2);
+                    result = String.valueOf(readShortImpl());
+                    break;
+                case BYTE_TYPE:
+                    checkAvailable(1);
+                    result = String.valueOf(readByteImpl());
+                    break;
+                case FLOAT_TYPE:
+                    checkAvailable(4);
+                    result = String.valueOf(readFloatImpl());
+                    break;
+                case DOUBLE_TYPE:
+                    checkAvailable(8);
+                    result = String.valueOf(readDoubleImpl());
+                    break;
+                case CHAR_TYPE:
+                    checkAvailable(2);
+                    result = String.valueOf(readCharImpl());
+                    break;
+                default:
+                    _data.position(position);
+                    throw new MessageFormatException("Unable to convert " + wireType + " to a String");
+            }
+            return result;
+        }
+        catch (RuntimeException e)
+        {
+            _data.position(position);
+            throw e;
+        }
+    }
+
+    protected String readStringImpl() throws JMSException
+    {
+        try
+        {
+            _charsetDecoder.reset();
+            ByteBuffer dup = _data.duplicate();
+            int pos = _data.position();
+            byte b;
+            while((b = _data.get()) != 0);
+            dup.limit(_data.position()-1);
+            return _charsetDecoder.decode(dup).toString();
+
+        }
+        catch (CharacterCodingException e)
+        {
+            JMSException jmse = new JMSException("Error decoding byte stream as a UTF8 string: " + e);
+            jmse.setLinkedException(e);
+            jmse.initCause(e);
+            throw jmse;
+        }
+    }
+
+    protected int readBytes(byte[] bytes) throws JMSException
+    {
+        if (bytes == null)
+        {
+            throw new IllegalArgumentException("byte array must not be null");
+        }
+        // first call
+        if (_byteArrayRemaining == -1)
+        {
+            // type discriminator checked separately so you get a MessageFormatException rather than
+            // an EOF even in the case where both would be applicable
+            checkAvailable(1);
+            byte wireType = readWireType();
+            if (wireType != BYTEARRAY_TYPE)
+            {
+                throw new MessageFormatException("Unable to convert " + wireType + " to a byte array");
+            }
+            checkAvailable(4);
+            int size = _data.getInt();
+            // length of -1 indicates null
+            if (size == -1)
+            {
+                return -1;
+            }
+            else
+            {
+                if (size > _data.remaining())
+                {
+                    throw new MessageEOFException("Byte array has stated length "
+                                                  + size
+                                                  + " but message only contains "
+                                                  +
+                                                  _data.remaining()
+                                                  + " bytes");
+                }
+                else
+                {
+                    _byteArrayRemaining = size;
+                }
+            }
+        }
+        else if (_byteArrayRemaining == 0)
+        {
+            _byteArrayRemaining = -1;
+            return -1;
+        }
+
+        int returnedSize = readBytesImpl(bytes);
+        if (returnedSize < bytes.length)
+        {
+            _byteArrayRemaining = -1;
+        }
+        return returnedSize;
+    }
+
+    private int readBytesImpl(byte[] bytes)
+    {
+        int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining);
+        _byteArrayRemaining -= count;
+
+        if (count == 0)
+        {
+            return 0;
+        }
+        else
+        {
+            _data.get(bytes, 0, count);
+            return count;
+        }
+    }
+
+    protected Object readObject() throws JMSException
+    {
+        int position = _data.position();
+        byte wireType = readWireType();
+        Object result = null;
+        try
+        {
+            switch (wireType)
+            {
+                case BOOLEAN_TYPE:
+                    checkAvailable(1);
+                    result = readBooleanImpl();
+                    break;
+                case BYTE_TYPE:
+                    checkAvailable(1);
+                    result = readByteImpl();
+                    break;
+                case BYTEARRAY_TYPE:
+                    checkAvailable(4);
+                    int size = _data.getInt();
+                    if (size == -1)
+                    {
+                        result = null;
+                    }
+                    else
+                    {
+                        _byteArrayRemaining = size;
+                        byte[] bytesResult = new byte[size];
+                        readBytesImpl(bytesResult);
+                        result = bytesResult;
+                    }
+                    break;
+                case SHORT_TYPE:
+                    checkAvailable(2);
+                    result = readShortImpl();
+                    break;
+                case CHAR_TYPE:
+                    checkAvailable(2);
+                    result = readCharImpl();
+                    break;
+                case INT_TYPE:
+                    checkAvailable(4);
+                    result = readIntImpl();
+                    break;
+                case LONG_TYPE:
+                    checkAvailable(8);
+                    result = readLongImpl();
+                    break;
+                case FLOAT_TYPE:
+                    checkAvailable(4);
+                    result = readFloatImpl();
+                    break;
+                case DOUBLE_TYPE:
+                    checkAvailable(8);
+                    result = readDoubleImpl();
+                    break;
+                case NULL_STRING_TYPE:
+                    result = null;
+                    break;
+                case STRING_TYPE:
+                    checkAvailable(1);
+                    result = readStringImpl();
+                    break;
+            }
+            return result;
+        }
+        catch (RuntimeException e)
+        {
+            _data.position(position);
+            throw e;
+        }
+    }
+
+    public void reset()
+    {
+        _byteArrayRemaining = -1;
+        _data.position(_position);
+        _data.limit(_limit);
+    }
+
+    public ByteBuffer getData()
+    {
+        ByteBuffer buf = _data.duplicate();
+        buf.position(_position);
+        buf.limit(_limit);
+        return buf;
+    }
+
+    public long size()
+    {
+        return _limit - _position;
+    }
+
+    public int remaining()
+    {
+        return _data.remaining();
+    }
+
+    public void readRawBytes(final byte[] bytes, final int offset, final int count)
+    {
+        _data.get(bytes, offset, count);
+    }
+
+    public String readLengthPrefixedUTF() throws JMSException
+    {
+        try
+        {
+            short length = readShortImpl();
+            if(length == 0)
+            {
+                return "";
+            }
+            else
+            {
+                _charsetDecoder.reset();
+                ByteBuffer encodedString = _data.slice();
+                encodedString.limit(length);
+                _data.position(_data.position()+length);
+                CharBuffer string = _charsetDecoder.decode(encodedString);
+
+                return string.toString();
+            }
+        }
+        catch(CharacterCodingException e)
+        {
+            JMSException jmse = new JMSException("Error decoding byte stream as a UTF8 string: " + e);
+            jmse.setLinkedException(e);
+            jmse.initCause(e);
+            throw jmse;
+        }
+    }
+}

Added: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java?rev=1167311&view=auto
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java (added)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java Fri Sep  9 17:47:22 2011
@@ -0,0 +1,370 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+
+class TypedBytesContentWriter implements TypedBytesCodes
+{
+    private final        ByteArrayOutputStream _baos = new ByteArrayOutputStream();
+    private final        DataOutputStream      _data = new DataOutputStream(_baos);
+    private static final Charset               UTF8 = Charset.forName("UTF-8");
+
+    protected void writeTypeDiscriminator(byte type) throws JMSException
+    {
+        try
+        {
+            _data.writeByte(type);
+        }
+        catch (IOException e)
+        {
+            throw handle(e);
+        }
+    }
+
+    private JMSException handle(final IOException e)
+    {
+        JMSException jmsEx = new JMSException("Unable to write value: " + e.getMessage());
+        jmsEx.setLinkedException(e);
+        return jmsEx;
+    }
+
+
+    protected void writeBoolean(boolean b) throws JMSException
+    {
+        writeTypeDiscriminator(BOOLEAN_TYPE);
+        writeBooleanImpl(b);
+    }
+
+    public void writeBooleanImpl(final boolean b) throws JMSException
+    {
+        try
+        {
+            _data.writeByte(b ? (byte) 1 : (byte) 0);
+        }
+        catch (IOException e)
+        {
+            throw handle(e);
+        }
+    }
+
+    protected void writeByte(byte b) throws JMSException
+    {
+        writeTypeDiscriminator(BYTE_TYPE);
+        writeByteImpl(b);
+    }
+
+    public void writeByteImpl(final byte b) throws JMSException
+    {
+        try
+        {
+            _data.writeByte(b);
+        }
+        catch (IOException e)
+        {
+            throw handle(e);
+        }
+    }
+
+    protected void writeShort(short i) throws JMSException
+    {
+        writeTypeDiscriminator(SHORT_TYPE);
+        writeShortImpl(i);
+    }
+
+    public void writeShortImpl(final short i) throws JMSException
+    {
+        try
+        {
+            _data.writeShort(i);
+        }
+        catch (IOException e)
+        {
+            throw handle(e);
+        }
+    }
+
+    protected void writeChar(char c) throws JMSException
+    {
+        writeTypeDiscriminator(CHAR_TYPE);
+        writeCharImpl(c);
+    }
+
+    public void writeCharImpl(final char c) throws JMSException
+    {
+        try
+        {
+            _data.writeChar(c);
+        }
+        catch (IOException e)
+        {
+            throw handle(e);
+        }
+    }
+
+    protected void writeInt(int i) throws JMSException
+    {
+        writeTypeDiscriminator(INT_TYPE);
+        writeIntImpl(i);
+    }
+
+    protected void writeIntImpl(int i) throws JMSException
+    {
+        try
+        {
+            _data.writeInt(i);
+        }
+        catch (IOException e)
+        {
+            throw handle(e);
+        }
+    }
+
+    protected void writeLong(long l) throws JMSException
+    {
+        writeTypeDiscriminator(LONG_TYPE);
+        writeLongImpl(l);
+    }
+
+    public void writeLongImpl(final long l) throws JMSException
+    {
+        try
+        {
+            _data.writeLong(l);
+        }
+        catch (IOException e)
+        {
+            throw handle(e);
+        }
+    }
+
+    protected void writeFloat(float v) throws JMSException
+    {
+        writeTypeDiscriminator(FLOAT_TYPE);
+        writeFloatImpl(v);
+    }
+
+    public void writeFloatImpl(final float v) throws JMSException
+    {
+        try
+        {
+            _data.writeFloat(v);
+        }
+        catch (IOException e)
+        {
+            throw handle(e);
+        }
+    }
+
+    protected void writeDouble(double v) throws JMSException
+    {
+        writeTypeDiscriminator(DOUBLE_TYPE);
+        writeDoubleImpl(v);
+    }
+
+    public void writeDoubleImpl(final double v) throws JMSException
+    {
+        try
+        {
+            _data.writeDouble(v);
+        }
+        catch (IOException e)
+        {
+            throw handle(e);
+        }
+    }
+
+    protected void writeString(String string) throws JMSException
+    {
+        if (string == null)
+        {
+            writeTypeDiscriminator(NULL_STRING_TYPE);
+        }
+        else
+        {
+            writeTypeDiscriminator(STRING_TYPE);
+            writeNullTerminatedStringImpl(string);
+        }
+    }
+
+    protected void writeNullTerminatedStringImpl(String string)
+            throws JMSException
+    {
+        try
+        {
+            _data.write(string.getBytes(UTF8));
+            _data.writeByte((byte) 0);
+        }
+        catch (IOException e)
+        {
+            throw handle(e);
+        }
+
+    }
+
+    protected void writeBytes(byte[] bytes) throws JMSException
+    {
+        writeBytes(bytes, 0, bytes == null ? 0 : bytes.length);
+    }
+
+    protected void writeBytes(byte[] bytes, int offset, int length) throws JMSException
+    {
+        writeTypeDiscriminator(BYTEARRAY_TYPE);
+        writeBytesImpl(bytes, offset, length);
+    }
+
+    public void writeBytesImpl(final byte[] bytes, final int offset, final int length) throws JMSException
+    {
+        try
+        {
+            if (bytes == null)
+            {
+                _data.writeInt(-1);
+            }
+            else
+            {
+                _data.writeInt(length);
+                _data.write(bytes, offset, length);
+            }
+        }
+        catch (IOException e)
+        {
+            throw handle(e);
+        }
+    }
+
+    public void writeBytesRaw(final byte[] bytes, final int offset, final int length) throws JMSException
+    {
+        try
+        {
+            if (bytes != null)
+            {
+                _data.write(bytes, offset, length);
+            }
+        }
+        catch (IOException e)
+        {
+            throw handle(e);
+        }
+    }
+
+
+    protected void writeObject(Object object) throws JMSException
+    {
+        Class clazz;
+
+        if (object == null)
+        {
+            // string handles the output of null values
+            clazz = String.class;
+        }
+        else
+        {
+            clazz = object.getClass();
+        }
+
+        if (clazz == Byte.class)
+        {
+            writeByte((Byte) object);
+        }
+        else if (clazz == Boolean.class)
+        {
+            writeBoolean((Boolean) object);
+        }
+        else if (clazz == byte[].class)
+        {
+            writeBytes((byte[]) object);
+        }
+        else if (clazz == Short.class)
+        {
+            writeShort((Short) object);
+        }
+        else if (clazz == Character.class)
+        {
+            writeChar((Character) object);
+        }
+        else if (clazz == Integer.class)
+        {
+            writeInt((Integer) object);
+        }
+        else if (clazz == Long.class)
+        {
+            writeLong((Long) object);
+        }
+        else if (clazz == Float.class)
+        {
+            writeFloat((Float) object);
+        }
+        else if (clazz == Double.class)
+        {
+            writeDouble((Double) object);
+        }
+        else if (clazz == String.class)
+        {
+            writeString((String) object);
+        }
+        else
+        {
+            throw new MessageFormatException("Only primitives plus byte arrays and String are valid types");
+        }
+    }
+
+    public ByteBuffer getData()
+    {
+        return ByteBuffer.wrap(_baos.toByteArray());
+    }
+
+    public void writeLengthPrefixedUTF(final String string) throws JMSException
+    {
+        try
+        {
+            CharsetEncoder encoder = UTF8.newEncoder();
+            java.nio.ByteBuffer encodedString = encoder.encode(CharBuffer.wrap(string));
+
+            writeShortImpl((short) encodedString.limit());
+            while(encodedString.hasRemaining())
+            {
+                _data.writeByte(encodedString.get());
+            }
+        }
+        catch (CharacterCodingException e)
+        {
+            JMSException jmse = new JMSException("Unable to encode string: " + e);
+            jmse.setLinkedException(e);
+            jmse.initCause(e);
+            throw jmse;
+        }
+        catch (IOException e)
+        {
+            throw handle(e);
+        }
+
+    }
+}

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java?rev=1167311&r1=1167310&r2=1167311&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java Fri Sep  9 17:47:22 2011
@@ -87,9 +87,9 @@ public class UnprocessedMessage_0_8 exte
     public void receiveBody(ContentBody body)
     {
 
-        if (body.payload != null)
+        if (body._payload != null)
         {
-            final long payloadSize = body.payload.remaining();
+            final long payloadSize = body._payload.length;
 
             if (_bodies == null)
             {

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1167311&r1=1167310&r2=1167311&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Fri Sep  9 17:47:22 2011
@@ -20,7 +20,9 @@
  */
 package org.apache.qpid.client.protocol;
 
+import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -524,7 +526,7 @@ public class AMQProtocolHandler implemen
 
     public  synchronized void writeFrame(AMQDataBlock frame, boolean wait)
     {
-        final ByteBuffer buf = frame.toNioByteBuffer();
+        final ByteBuffer buf = asByteBuffer(frame);
         _writtenBytes += buf.remaining();
         _sender.send(buf);
         _sender.flush();
@@ -547,6 +549,39 @@ public class AMQProtocolHandler implemen
 
     }
 
+    private ByteBuffer asByteBuffer(AMQDataBlock block)
+    {
+        final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize());
+
+        try
+        {
+            block.writePayload(new DataOutputStream(new OutputStream()
+            {
+
+
+                @Override
+                public void write(int b) throws IOException
+                {
+                    buf.put((byte) b);
+                }
+
+                @Override
+                public void write(byte[] b, int off, int len) throws IOException
+                {
+                    buf.put(b, off, len);
+                }
+            }));
+        }
+        catch (IOException e)
+        {
+            throw new RuntimeException(e);
+        }
+
+        buf.flip();
+        return buf;
+    }
+
+
     /**
      * Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to
      * calling getProtocolSession().write() then waiting for the response.

Modified: qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java?rev=1167311&r1=1167310&r2=1167311&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java (original)
+++ qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java Fri Sep  9 17:47:22 2011
@@ -20,13 +20,13 @@
  */
 package org.apache.qpid.client.util;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
 import java.io.InputStream;
 import java.io.ObjectOutputStream;
-import java.lang.reflect.Proxy;
 import java.util.Arrays;
 import java.util.List;
 
-import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.test.utils.QpidTestCase;
 
 public class ClassLoadingAwareObjectInputStreamTest extends QpidTestCase
@@ -37,16 +37,15 @@ public class ClassLoadingAwareObjectInpu
     protected void setUp() throws Exception
     {
         //Create a viable input stream for instantiating the CLA OIS
-        ByteBuffer buf = ByteBuffer.allocate(10);
-        buf.setAutoExpand(true);
+        ByteArrayOutputStream baos = new ByteArrayOutputStream();
 
-        ObjectOutputStream out = new ObjectOutputStream(buf.asOutputStream());
+        ObjectOutputStream out = new ObjectOutputStream(baos);
         out.writeObject("testString");
         out.flush();
         out.close();
 
-        buf.rewind();
-        _in = buf.asInputStream();
+
+        _in = new ByteArrayInputStream(baos.toByteArray());
 
         _claOIS = new ClassLoadingAwareObjectInputStream(_in);
     }

Modified: qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageUnitTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageUnitTest.java?rev=1167311&r1=1167310&r2=1167311&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageUnitTest.java (original)
+++ qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageUnitTest.java Fri Sep  9 17:47:22 2011
@@ -45,8 +45,6 @@ public class ObjectMessageUnitTest exten
         _om.setObject(true);
 
         //make the message readable
-        _om.reset();
-
         Object object = _om.getObject();
 
         assertTrue("Unexpected type returned", object instanceof Boolean);
@@ -61,8 +59,6 @@ public class ObjectMessageUnitTest exten
         _om.setObject("test string");
 
         //make the message readable
-        _om.reset();
-
         Object object = _om.getObject();
 
         assertTrue("Unexpected type returned", object instanceof String);
@@ -87,7 +83,6 @@ public class ObjectMessageUnitTest exten
         list.add(0);
 
         //make the message readable
-        _om.reset();
 
         //retrieve the Object
         Object object = _om.getObject();

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=1167311&r1=1167310&r2=1167311&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java Fri Sep  9 17:47:22 2011
@@ -20,10 +20,9 @@
  */
 package org.apache.qpid.codec;
 
-import java.util.ArrayList;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.SimpleByteBufferAllocator;
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.*;
 
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQDataBlockDecoder;
@@ -61,12 +60,11 @@ public class AMQDecoder
 
     /** Flag to indicate whether this decoder needs to handle protocol initiation. */
     private boolean _expectProtocolInitiation;
-    private boolean firstDecode = true;
 
     private AMQMethodBodyFactory _bodyFactory;
 
-    private ByteBuffer _remainingBuf;
-    
+    private List<ByteArrayInputStream> _remainingBufs = new ArrayList<ByteArrayInputStream>();
+
     /**
      * Creates a new AMQP decoder.
      *
@@ -92,62 +90,168 @@ public class AMQDecoder
         _expectProtocolInitiation = expectProtocolInitiation;
     }
 
+    private class RemainingByteArrayInputStream extends InputStream
+    {
+        private int _currentListPos;
+        private int _markPos;
 
-    private static final SimpleByteBufferAllocator SIMPLE_BYTE_BUFFER_ALLOCATOR = new SimpleByteBufferAllocator();
 
-    public ArrayList<AMQDataBlock> decodeBuffer(java.nio.ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException
-    {
+        @Override
+        public int read() throws IOException
+        {
+            ByteArrayInputStream currentStream = _remainingBufs.get(_currentListPos);
+            if(currentStream.available() > 0)
+            {
+                return currentStream.read();
+            }
+            else if((_currentListPos == _remainingBufs.size())
+                    || (++_currentListPos == _remainingBufs.size()))
+            {
+                return -1;
+            }
+            else
+            {
 
-        // get prior remaining data from accumulator
-        ArrayList<AMQDataBlock> dataBlocks = new ArrayList<AMQDataBlock>();
-        ByteBuffer msg;
-        // if we have a session buffer, append data to that otherwise
-        // use the buffer read from the network directly
-        if( _remainingBuf != null )
-        {
-            _remainingBuf.put(buf);
-            _remainingBuf.flip();
-            msg = _remainingBuf;
+                ByteArrayInputStream stream = _remainingBufs.get(_currentListPos);
+                stream.mark(0);
+                return stream.read();
+            }
         }
-        else
+
+        @Override
+        public int read(final byte[] b, final int off, final int len) throws IOException
         {
-            msg = ByteBuffer.wrap(buf);
+
+            if(_currentListPos == _remainingBufs.size())
+            {
+                return -1;
+            }
+            else
+            {
+                ByteArrayInputStream currentStream = _remainingBufs.get(_currentListPos);
+                final int available = currentStream.available();
+                int read = currentStream.read(b, off, len > available ? available : len);
+                if(read < len)
+                {
+                    if(_currentListPos++ != _remainingBufs.size())
+                    {
+                        _remainingBufs.get(_currentListPos).mark(0);
+                    }
+                    int correctRead = read == -1 ? 0 : read;
+                    int subRead = read(b, off+correctRead, len-correctRead);
+                    if(subRead == -1)
+                    {
+                        return read;
+                    }
+                    else
+                    {
+                        return correctRead+subRead;
+                    }
+                }
+                else
+                {
+                    return len;
+                }
+            }
         }
-        
-        if (_expectProtocolInitiation  
-            || (firstDecode
-                && (msg.remaining() > 0)
-                && (msg.get(msg.position()) == (byte)'A')))
+
+        @Override
+        public int available() throws IOException
         {
-            if (_piDecoder.decodable(msg.buf()))
+            int total = 0;
+            for(int i = _currentListPos; i < _remainingBufs.size(); i++)
             {
-                dataBlocks.add(new ProtocolInitiation(msg.buf()));
+                total += _remainingBufs.get(i).available();
             }
+            return total;
         }
-        else
+
+        @Override
+        public void mark(final int readlimit)
         {
-            boolean enoughData = true;
-            while (enoughData)
+            _markPos = _currentListPos;
+            final ByteArrayInputStream stream = _remainingBufs.get(_currentListPos);
+            if(stream != null)
             {
-                int pos = msg.position();
+                stream.mark(readlimit);
+            }
+        }
 
+        @Override
+        public void reset() throws IOException
+        {
+            _currentListPos = _markPos;
+            final int size = _remainingBufs.size();
+            if(_currentListPos < size)
+            {
+                _remainingBufs.get(_currentListPos).reset();
+            }
+            for(int i = _currentListPos+1; i<size; i++)
+            {
+                _remainingBufs.get(i).reset();
+            }
+        }
+    }
+
+
+    public ArrayList<AMQDataBlock> decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
+    {
+
+        // get prior remaining data from accumulator
+        ArrayList<AMQDataBlock> dataBlocks = new ArrayList<AMQDataBlock>();
+        DataInputStream msg;
+
+
+        ByteArrayInputStream bais = new ByteArrayInputStream(buf.array(),buf.arrayOffset()+buf.position(), buf.remaining());
+        if(!_remainingBufs.isEmpty())
+        {
+            _remainingBufs.add(bais);
+            msg = new DataInputStream(new RemainingByteArrayInputStream());
+        }
+        else
+        {
+            msg = new DataInputStream(bais);
+        }
+
+        boolean enoughData = true;
+        while (enoughData)
+        {
+            if(!_expectProtocolInitiation)
+            {
                 enoughData = _dataBlockDecoder.decodable(msg);
-                msg.position(pos);
                 if (enoughData)
                 {
                     dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_bodyFactory, msg));
                 }
-                else
+            }
+            else
+            {
+                enoughData = _piDecoder.decodable(msg);
+                if (enoughData)
                 {
-                    _remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate(msg.remaining(), false);
-                    _remainingBuf.setAutoExpand(true);
-                    _remainingBuf.put(msg);
+                    dataBlocks.add(new ProtocolInitiation(msg));
+                }
+
+            }
+
+            if(!enoughData)
+            {
+                if(!_remainingBufs.isEmpty())
+                {
+                    _remainingBufs.remove(_remainingBufs.size()-1);
+                    ListIterator<ByteArrayInputStream> iterator = _remainingBufs.listIterator();
+                    while(iterator.hasNext() && iterator.next().available() == 0)
+                    {
+                        iterator.remove();
+                    }
+                }
+                if(bais.available()!=0)
+                {
+                    byte[] remaining = new byte[bais.available()];
+                    bais.read(remaining);
+                    _remainingBufs.add(new ByteArrayInputStream(remaining));
                 }
             }
-        }
-        if(firstDecode && dataBlocks.size() > 0)
-        {
-            firstDecode = false;
         }
         return dataBlocks;
     }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java?rev=1167311&r1=1167310&r2=1167311&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java Fri Sep  9 17:47:22 2011
@@ -20,7 +20,9 @@
  */
 package org.apache.qpid.framing;
 
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 import org.apache.qpid.AMQException;
 
@@ -34,7 +36,7 @@ public interface AMQBody
      */
     public abstract int getSize();
     
-    public void writePayload(ByteBuffer buffer);
+    public void writePayload(DataOutputStream buffer) throws IOException;
     
-    void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) throws AMQException;
+    void handle(final int channelId, final AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException;
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java?rev=1167311&r1=1167310&r2=1167311&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java Fri Sep  9 17:47:22 2011
@@ -20,7 +20,10 @@
  */
 package org.apache.qpid.framing;
 
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
 
 /**
  * A data block represents something that has a size in bytes and the ability to write itself to a byte
@@ -39,25 +42,6 @@ public abstract class AMQDataBlock imple
      * Writes the datablock to the specified buffer.
      * @param buffer
      */
-    public abstract void writePayload(ByteBuffer buffer);
-
-    public ByteBuffer toByteBuffer()
-    {
-        final ByteBuffer buffer = ByteBuffer.allocate((int)getSize());
-
-        writePayload(buffer);    
-        buffer.flip();
-        return buffer;
-    }
-
-    public java.nio.ByteBuffer toNioByteBuffer()
-    {
-        final java.nio.ByteBuffer buffer = java.nio.ByteBuffer.allocate((int) getSize());
-
-        ByteBuffer buf = ByteBuffer.wrap(buffer);
-        writePayload(buf);    
-        buffer.flip();
-        return buffer;
-    }
+    public abstract void writePayload(DataOutputStream buffer) throws IOException;
 
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java?rev=1167311&r1=1167310&r2=1167311&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java Fri Sep  9 17:47:22 2011
@@ -20,11 +20,12 @@
  */
 package org.apache.qpid.framing;
 
-import org.apache.mina.common.ByteBuffer;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.DataInputStream;
+import java.io.IOException;
+
 public class AMQDataBlockDecoder
 {
 
@@ -42,27 +43,32 @@ public class AMQDataBlockDecoder
     public AMQDataBlockDecoder()
     { }
 
-    public boolean decodable(java.nio.ByteBuffer in) throws AMQFrameDecodingException
+    public boolean decodable(DataInputStream in) throws AMQFrameDecodingException, IOException
     {
-        final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1);
+        final int remainingAfterAttributes = in.available() - (1 + 2 + 4 + 1);
         // type, channel, body length and end byte
         if (remainingAfterAttributes < 0)
         {
             return false;
         }
 
-        in.position(in.position() + 1 + 2);
+        in.mark(8);
+        in.skip(1 + 2);
+
+
         // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt() 
-        final long bodySize = in.getInt() & 0xffffffffL; 
+        final long bodySize = in.readInt() & 0xffffffffL;
+
+        in.reset();
 
         return (remainingAfterAttributes >= bodySize);
 
     }
 
-    public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, ByteBuffer in)
-        throws AMQFrameDecodingException, AMQProtocolVersionException
+    public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, DataInputStream in)
+            throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
     {
-        final byte type = in.get();
+        final byte type = in.readByte();
 
         BodyFactory bodyFactory;
         if (type == AMQMethodBody.TYPE)
@@ -79,8 +85,8 @@ public class AMQDataBlockDecoder
             throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null);
         }
 
-        final int channel = in.getUnsignedShort();
-        final long bodySize = in.getUnsignedInt();
+        final int channel = in.readUnsignedShort();
+        final long bodySize = EncodingUtils.readUnsignedInteger(in);
 
         // bodySize can be zero
         if ((channel < 0) || (bodySize < 0))
@@ -91,7 +97,7 @@ public class AMQDataBlockDecoder
 
         AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory);
 
-        byte marker = in.get();
+        byte marker = in.readByte();
         if ((marker & 0xFF) != 0xCE)
         {
             throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize
@@ -101,13 +107,4 @@ public class AMQDataBlockDecoder
         return frame;
     }
 
-    public boolean decodable(ByteBuffer msg) throws AMQFrameDecodingException
-    {
-        return decodable(msg.buf());
-    }
-
-    public AMQDataBlock createAndPopulateFrame(AMQMethodBodyFactory factory, java.nio.ByteBuffer msg) throws AMQProtocolVersionException, AMQFrameDecodingException
-    {
-        return createAndPopulateFrame(factory, ByteBuffer.wrap(msg));
-    }
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java?rev=1167311&r1=1167310&r2=1167311&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java Fri Sep  9 17:47:22 2011
@@ -20,7 +20,9 @@
  */
 package org.apache.qpid.framing;
 
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 
 public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
 {
@@ -36,7 +38,7 @@ public class AMQFrame extends AMQDataBlo
         _bodyFrame = bodyFrame;
     }
 
-    public AMQFrame(final ByteBuffer in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException
+    public AMQFrame(final DataInputStream in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException, IOException
     {
         this._channel = channel;
         this._bodyFrame = bodyFactory.createBody(in,bodySize);
@@ -53,13 +55,13 @@ public class AMQFrame extends AMQDataBlo
     }
 
 
-    public void writePayload(ByteBuffer buffer)
+    public void writePayload(DataOutputStream buffer) throws IOException
     {
-        buffer.put(_bodyFrame.getFrameType());
+        buffer.writeByte(_bodyFrame.getFrameType());
         EncodingUtils.writeUnsignedShort(buffer, _channel);
         EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize());
         _bodyFrame.writePayload(buffer);
-        buffer.put(FRAME_END_BYTE);
+        buffer.writeByte(FRAME_END_BYTE);
     }
 
     public final int getChannel()
@@ -77,48 +79,48 @@ public class AMQFrame extends AMQDataBlo
         return "Frame channelId: " + _channel + ", bodyFrame: " + String.valueOf(_bodyFrame);
     }
 
-    public static void writeFrame(ByteBuffer buffer, final int channel, AMQBody body)
+    public static void writeFrame(DataOutputStream buffer, final int channel, AMQBody body) throws IOException
     {
-        buffer.put(body.getFrameType());
+        buffer.writeByte(body.getFrameType());
         EncodingUtils.writeUnsignedShort(buffer, channel);
         EncodingUtils.writeUnsignedInteger(buffer, body.getSize());
         body.writePayload(buffer);
-        buffer.put(FRAME_END_BYTE);
+        buffer.writeByte(FRAME_END_BYTE);
 
     }
 
-    public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2)
+    public static void writeFrames(DataOutputStream buffer, final int channel, AMQBody body1, AMQBody body2) throws IOException
     {
-        buffer.put(body1.getFrameType());
+        buffer.writeByte(body1.getFrameType());
         EncodingUtils.writeUnsignedShort(buffer, channel);
         EncodingUtils.writeUnsignedInteger(buffer, body1.getSize());
         body1.writePayload(buffer);
-        buffer.put(FRAME_END_BYTE);
-        buffer.put(body2.getFrameType());
+        buffer.writeByte(FRAME_END_BYTE);
+        buffer.writeByte(body2.getFrameType());
         EncodingUtils.writeUnsignedShort(buffer, channel);
         EncodingUtils.writeUnsignedInteger(buffer, body2.getSize());
         body2.writePayload(buffer);
-        buffer.put(FRAME_END_BYTE);
+        buffer.writeByte(FRAME_END_BYTE);
 
     }
 
-    public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2, AMQBody body3)
+    public static void writeFrames(DataOutputStream buffer, final int channel, AMQBody body1, AMQBody body2, AMQBody body3) throws IOException
     {
-        buffer.put(body1.getFrameType());
+        buffer.writeByte(body1.getFrameType());
         EncodingUtils.writeUnsignedShort(buffer, channel);
         EncodingUtils.writeUnsignedInteger(buffer, body1.getSize());
         body1.writePayload(buffer);
-        buffer.put(FRAME_END_BYTE);
-        buffer.put(body2.getFrameType());
+        buffer.writeByte(FRAME_END_BYTE);
+        buffer.writeByte(body2.getFrameType());
         EncodingUtils.writeUnsignedShort(buffer, channel);
         EncodingUtils.writeUnsignedInteger(buffer, body2.getSize());
         body2.writePayload(buffer);
-        buffer.put(FRAME_END_BYTE);
-        buffer.put(body3.getFrameType());
+        buffer.writeByte(FRAME_END_BYTE);
+        buffer.writeByte(body3.getFrameType());
         EncodingUtils.writeUnsignedShort(buffer, channel);
         EncodingUtils.writeUnsignedInteger(buffer, body3.getSize());
         body3.writePayload(buffer);
-        buffer.put(FRAME_END_BYTE);
+        buffer.writeByte(FRAME_END_BYTE);
 
     }
 

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java?rev=1167311&r1=1167310&r2=1167311&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java Fri Sep  9 17:47:22 2011
@@ -20,12 +20,14 @@
  */
 package org.apache.qpid.framing;
 
-import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQChannelException;
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQConstant;
 
+import java.io.DataOutputStream;
+import java.io.IOException;
+
 public interface AMQMethodBody extends AMQBody
 {
     public static final byte TYPE = 1;
@@ -43,12 +45,12 @@ public interface AMQMethodBody extends A
     /** @return unsigned short */
     public int getMethod();
 
-    public void writeMethodPayload(ByteBuffer buffer);
+    public void writeMethodPayload(DataOutputStream buffer) throws IOException;
 
 
     public int getSize();
 
-    public void writePayload(ByteBuffer buffer);
+    public void writePayload(DataOutputStream buffer) throws IOException;
 
     //public abstract void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException;
 

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java?rev=1167311&r1=1167310&r2=1167311&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java Fri Sep  9 17:47:22 2011
@@ -20,13 +20,14 @@
  */
 package org.apache.qpid.framing;
 
-import org.apache.mina.common.ByteBuffer;
-
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.DataInputStream;
+import java.io.IOException;
+
 public class AMQMethodBodyFactory implements BodyFactory
 {
     private static final Logger _log = LoggerFactory.getLogger(AMQMethodBodyFactory.class);
@@ -38,7 +39,7 @@ public class AMQMethodBodyFactory implem
         _protocolSession = protocolSession;
     }
 
-    public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
+    public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException, IOException
     {
         return _protocolSession.getMethodRegistry().convertToBody(in, bodySize);
     }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java?rev=1167311&r1=1167310&r2=1167311&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java Fri Sep  9 17:47:22 2011
@@ -21,13 +21,16 @@ package org.apache.qpid.framing;
  *
  */
 
-import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQChannelException;
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
 public abstract class AMQMethodBodyImpl implements AMQMethodBody
 {
     public static final byte TYPE = 1;
@@ -98,7 +101,7 @@ public abstract class AMQMethodBodyImpl 
         return 2 + 2 + getBodySize();
     }
 
-        public void writePayload(ByteBuffer buffer)
+    public void writePayload(DataOutputStream buffer) throws IOException
     {
         EncodingUtils.writeUnsignedShort(buffer, getClazz());
         EncodingUtils.writeUnsignedShort(buffer, getMethod());
@@ -106,12 +109,12 @@ public abstract class AMQMethodBodyImpl 
     }
 
 
-    protected byte readByte(ByteBuffer buffer)
+    protected byte readByte(DataInputStream buffer) throws IOException
     {
-        return buffer.get();
+        return buffer.readByte();
     }
 
-    protected AMQShortString readAMQShortString(ByteBuffer buffer)
+    protected AMQShortString readAMQShortString(DataInputStream buffer) throws IOException
     {
         return EncodingUtils.readAMQShortString(buffer);
     }
@@ -121,27 +124,27 @@ public abstract class AMQMethodBodyImpl 
         return EncodingUtils.encodedShortStringLength(string);
     }
 
-    protected void writeByte(ByteBuffer buffer, byte b)
+    protected void writeByte(DataOutputStream buffer, byte b) throws IOException
     {
-        buffer.put(b);
+        buffer.writeByte(b);
     }
 
-    protected void writeAMQShortString(ByteBuffer buffer, AMQShortString string)
+    protected void writeAMQShortString(DataOutputStream buffer, AMQShortString string) throws IOException
     {
         EncodingUtils.writeShortStringBytes(buffer, string);
     }
 
-    protected int readInt(ByteBuffer buffer)
+    protected int readInt(DataInputStream buffer) throws IOException
     {
-        return buffer.getInt();
+        return buffer.readInt();
     }
 
-    protected void writeInt(ByteBuffer buffer, int i)
+    protected void writeInt(DataOutputStream buffer, int i) throws IOException
     {
-        buffer.putInt(i);
+        buffer.writeInt(i);
     }
 
-    protected FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException
+    protected FieldTable readFieldTable(DataInputStream buffer) throws AMQFrameDecodingException, IOException
     {
         return EncodingUtils.readFieldTable(buffer);
     }
@@ -151,19 +154,19 @@ public abstract class AMQMethodBodyImpl 
         return EncodingUtils.encodedFieldTableLength(table);  //To change body of created methods use File | Settings | File Templates.
     }
 
-    protected void writeFieldTable(ByteBuffer buffer, FieldTable table)
+    protected void writeFieldTable(DataOutputStream buffer, FieldTable table) throws IOException
     {
         EncodingUtils.writeFieldTableBytes(buffer, table);
     }
 
-    protected long readLong(ByteBuffer buffer)
+    protected long readLong(DataInputStream buffer) throws IOException
     {
-        return buffer.getLong();
+        return buffer.readLong();
     }
 
-    protected void writeLong(ByteBuffer buffer, long l)
+    protected void writeLong(DataOutputStream buffer, long l) throws IOException
     {
-        buffer.putLong(l);
+        buffer.writeLong(l);
     }
 
     protected int getSizeOf(byte[] response)
@@ -171,87 +174,86 @@ public abstract class AMQMethodBodyImpl 
         return (response == null) ? 4 : response.length + 4;
     }
 
-    protected void writeBytes(ByteBuffer buffer, byte[] data)
+    protected void writeBytes(DataOutputStream buffer, byte[] data) throws IOException
     {
         EncodingUtils.writeBytes(buffer,data);
     }
 
-    protected byte[] readBytes(ByteBuffer buffer)
+    protected byte[] readBytes(DataInputStream buffer) throws IOException
     {
         return EncodingUtils.readBytes(buffer);
     }
 
-    protected short readShort(ByteBuffer buffer)
+    protected short readShort(DataInputStream buffer) throws IOException
     {
         return EncodingUtils.readShort(buffer);
     }
 
-    protected void writeShort(ByteBuffer buffer, short s)
+    protected void writeShort(DataOutputStream buffer, short s) throws IOException
     {
         EncodingUtils.writeShort(buffer, s);
     }
 
-    protected Content readContent(ByteBuffer buffer)
+    protected Content readContent(DataInputStream buffer)
     {
-        return null;  //To change body of created methods use File | Settings | File Templates.
+        return null;
     }
 
     protected int getSizeOf(Content body)
     {
-        return 0;  //To change body of created methods use File | Settings | File Templates.
+        return 0;
     }
 
-    protected void writeContent(ByteBuffer buffer, Content body)
+    protected void writeContent(DataOutputStream buffer, Content body)
     {
-        //To change body of created methods use File | Settings | File Templates.
     }
 
-    protected byte readBitfield(ByteBuffer buffer)
+    protected byte readBitfield(DataInputStream buffer) throws IOException
     {
-        return readByte(buffer);  //To change body of created methods use File | Settings | File Templates.
+        return readByte(buffer);
     }
 
-    protected int readUnsignedShort(ByteBuffer buffer)
+    protected int readUnsignedShort(DataInputStream buffer) throws IOException
     {
-        return buffer.getUnsignedShort();  //To change body of created methods use File | Settings | File Templates.
+        return buffer.readUnsignedShort();
     }
 
-    protected void writeBitfield(ByteBuffer buffer, byte bitfield0)
+    protected void writeBitfield(DataOutputStream buffer, byte bitfield0) throws IOException
     {
-        buffer.put(bitfield0);
+        buffer.writeByte(bitfield0);
     }
 
-    protected void writeUnsignedShort(ByteBuffer buffer, int s)
+    protected void writeUnsignedShort(DataOutputStream buffer, int s) throws IOException
     {
         EncodingUtils.writeUnsignedShort(buffer, s);
     }
 
-    protected long readUnsignedInteger(ByteBuffer buffer)
+    protected long readUnsignedInteger(DataInputStream buffer) throws IOException
     {
-        return buffer.getUnsignedInt();
+        return EncodingUtils.readUnsignedInteger(buffer);
     }
-    protected void writeUnsignedInteger(ByteBuffer buffer, long i)
+    protected void writeUnsignedInteger(DataOutputStream buffer, long i) throws IOException
     {
         EncodingUtils.writeUnsignedInteger(buffer, i);
     }
 
 
-    protected short readUnsignedByte(ByteBuffer buffer)
+    protected short readUnsignedByte(DataInputStream buffer) throws IOException
     {
-        return buffer.getUnsigned();
+        return (short) buffer.readUnsignedByte();
     }
 
-    protected void writeUnsignedByte(ByteBuffer buffer, short unsignedByte)
+    protected void writeUnsignedByte(DataOutputStream buffer, short unsignedByte) throws IOException
     {
         EncodingUtils.writeUnsignedByte(buffer, unsignedByte);
     }
 
-    protected long readTimestamp(ByteBuffer buffer)
+    protected long readTimestamp(DataInputStream buffer) throws IOException
     {
         return EncodingUtils.readTimestamp(buffer);
     }
 
-    protected void writeTimestamp(ByteBuffer buffer, long t)
+    protected void writeTimestamp(DataOutputStream buffer, long t) throws IOException
     {
         EncodingUtils.writeTimestamp(buffer, t);
     }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java?rev=1167311&r1=1167310&r2=1167311&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java Fri Sep  9 17:47:22 2011
@@ -21,10 +21,11 @@
 
 package org.apache.qpid.framing;
 
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataInputStream;
+import java.io.IOException;
 
 
 public abstract interface AMQMethodBodyInstanceFactory
 {
-    public AMQMethodBody newInstance(ByteBuffer buffer, long size) throws AMQFrameDecodingException;    
+    public AMQMethodBody newInstance(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException;
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java?rev=1167311&r1=1167310&r2=1167311&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java Fri Sep  9 17:47:22 2011
@@ -21,11 +21,12 @@
 
 package org.apache.qpid.framing;
 
-import org.apache.mina.common.ByteBuffer;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.util.*;
 import java.lang.ref.WeakReference;
 
@@ -199,27 +200,16 @@ public final class AMQShortString implem
 
     }
 
-    private AMQShortString(ByteBuffer data, final int length)
+    private AMQShortString(DataInputStream data, final int length) throws IOException
     {
         if (length > MAX_LENGTH)
         {
             throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
         }
-        if(data.isDirect() || data.isReadOnly())
-        {
-            byte[] dataBytes = new byte[length];
-            data.get(dataBytes);
-            _data = dataBytes;
-            _offset = 0;
-        }
-        else
-        {
-
-            _data = data.array();
-            _offset = data.arrayOffset() + data.position();
-            data.skip(length);
-
-        }
+        byte[] dataBytes = new byte[length];
+        data.read(dataBytes);
+        _data = dataBytes;
+        _offset = 0;
         _length = length;
 
     }
@@ -275,9 +265,9 @@ public final class AMQShortString implem
         return new CharSubSequence(start, end);
     }
 
-    public static AMQShortString readFromBuffer(ByteBuffer buffer)
+    public static AMQShortString readFromBuffer(DataInputStream buffer) throws IOException
     {
-        final short length = buffer.getUnsigned();
+        final int length = buffer.readUnsignedByte();
         if (length == 0)
         {
             return null;
@@ -303,13 +293,13 @@ public final class AMQShortString implem
         }
     }
 
-    public void writeToBuffer(ByteBuffer buffer)
+    public void writeToBuffer(DataOutputStream buffer) throws IOException
     {
 
         final int size = length();
         //buffer.setAutoExpand(true);
-        buffer.put((byte) size);
-        buffer.put(_data, _offset, size);
+        buffer.write((byte) size);
+        buffer.write(_data, _offset, size);
 
     }
 

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java?rev=1167311&r1=1167310&r2=1167311&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java Fri Sep  9 17:47:22 2011
@@ -20,8 +20,9 @@
  */
 package org.apache.qpid.framing;
 
-import org.apache.mina.common.ByteBuffer;
-
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.math.BigDecimal;
 
 /**
@@ -60,12 +61,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, ByteBuffer buffer)
+        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
         {
             EncodingUtils.writeLongStringBytes(buffer, (String) value);
         }
 
-        public Object readValueFromBuffer(ByteBuffer buffer)
+        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
         {
             return EncodingUtils.readLongString(buffer);
         }
@@ -106,12 +107,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, ByteBuffer buffer)
+        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
         {
             EncodingUtils.writeUnsignedInteger(buffer, (Long) value);
         }
 
-        public Object readValueFromBuffer(ByteBuffer buffer)
+        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
         {
             return EncodingUtils.readUnsignedInteger(buffer);
         }
@@ -137,7 +138,7 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, ByteBuffer buffer)
+        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
         {
             BigDecimal bd = (BigDecimal) value;
 
@@ -150,7 +151,7 @@ public enum AMQType
             EncodingUtils.writeInteger(buffer, unscaled);
         }
 
-        public Object readValueFromBuffer(ByteBuffer buffer)
+        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
         {
             byte places = EncodingUtils.readByte(buffer);
 
@@ -182,12 +183,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, ByteBuffer buffer)
+        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
         {
             EncodingUtils.writeLong(buffer, (Long) value);
         }
 
-        public Object readValueFromBuffer(ByteBuffer buffer)
+        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
         {
             return EncodingUtils.readLong(buffer);
         }
@@ -246,7 +247,7 @@ public enum AMQType
          * @param value  An instance of the type.
          * @param buffer The byte buffer to write it to.
          */
-        public void writeValueImpl(Object value, ByteBuffer buffer)
+        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
         {
             // Ensure that the value is a FieldTable.
             if (!(value instanceof FieldTable))
@@ -267,7 +268,7 @@ public enum AMQType
          *
          * @return An instance of the type.
          */
-        public Object readValueFromBuffer(ByteBuffer buffer)
+        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
         {
             try
             {
@@ -301,10 +302,10 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, ByteBuffer buffer)
+        public void writeValueImpl(Object value, DataOutputStream buffer)
         { }
 
-        public Object readValueFromBuffer(ByteBuffer buffer)
+        public Object readValueFromBuffer(DataInputStream buffer)
         {
             return null;
         }
@@ -330,12 +331,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, ByteBuffer buffer)
+        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
         {
             EncodingUtils.writeLongstr(buffer, (byte[]) value);
         }
 
-        public Object readValueFromBuffer(ByteBuffer buffer)
+        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
         {
             return EncodingUtils.readLongstr(buffer);
         }
@@ -360,12 +361,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, ByteBuffer buffer)
+        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
         {
             EncodingUtils.writeLongStringBytes(buffer, (String) value);
         }
 
-        public Object readValueFromBuffer(ByteBuffer buffer)
+        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
         {
             return EncodingUtils.readLongString(buffer);
         }
@@ -391,12 +392,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, ByteBuffer buffer)
+        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
         {
             EncodingUtils.writeLongStringBytes(buffer, (String) value);
         }
 
-        public Object readValueFromBuffer(ByteBuffer buffer)
+        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
         {
             return EncodingUtils.readLongString(buffer);
         }
@@ -426,12 +427,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, ByteBuffer buffer)
+        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
         {
             EncodingUtils.writeBoolean(buffer, (Boolean) value);
         }
 
-        public Object readValueFromBuffer(ByteBuffer buffer)
+        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
         {
             return EncodingUtils.readBoolean(buffer);
         }
@@ -461,12 +462,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, ByteBuffer buffer)
+        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
         {
             EncodingUtils.writeChar(buffer, (Character) value);
         }
 
-        public Object readValueFromBuffer(ByteBuffer buffer)
+        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
         {
             return EncodingUtils.readChar(buffer);
         }
@@ -496,12 +497,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, ByteBuffer buffer)
+        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
         {
             EncodingUtils.writeByte(buffer, (Byte) value);
         }
 
-        public Object readValueFromBuffer(ByteBuffer buffer)
+        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
         {
             return EncodingUtils.readByte(buffer);
         }
@@ -535,12 +536,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, ByteBuffer buffer)
+        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
         {
             EncodingUtils.writeShort(buffer, (Short) value);
         }
 
-        public Object readValueFromBuffer(ByteBuffer buffer)
+        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
         {
             return EncodingUtils.readShort(buffer);
         }
@@ -577,12 +578,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, ByteBuffer buffer)
+        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
         {
             EncodingUtils.writeInteger(buffer, (Integer) value);
         }
 
-        public Object readValueFromBuffer(ByteBuffer buffer)
+        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
         {
             return EncodingUtils.readInteger(buffer);
         }
@@ -624,12 +625,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, ByteBuffer buffer)
+        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
         {
             EncodingUtils.writeLong(buffer, (Long) value);
         }
 
-        public Object readValueFromBuffer(ByteBuffer buffer)
+        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
         {
             return EncodingUtils.readLong(buffer);
         }
@@ -659,12 +660,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, ByteBuffer buffer)
+        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
         {
             EncodingUtils.writeFloat(buffer, (Float) value);
         }
 
-        public Object readValueFromBuffer(ByteBuffer buffer)
+        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
         {
             return EncodingUtils.readFloat(buffer);
         }
@@ -698,12 +699,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, ByteBuffer buffer)
+        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
         {
             EncodingUtils.writeDouble(buffer, (Double) value);
         }
 
-        public Object readValueFromBuffer(ByteBuffer buffer)
+        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
         {
             return EncodingUtils.readDouble(buffer);
         }
@@ -770,9 +771,9 @@ public enum AMQType
      * @param value  An instance of the type.
      * @param buffer The byte buffer to write it to.
      */
-    public void writeToBuffer(Object value, ByteBuffer buffer)
+    public void writeToBuffer(Object value, DataOutputStream buffer) throws IOException
     {
-        buffer.put(identifier());
+        buffer.writeByte(identifier());
         writeValueImpl(value, buffer);
     }
 
@@ -782,7 +783,7 @@ public enum AMQType
      * @param value  An instance of the type.
      * @param buffer The byte buffer to write it to.
      */
-    abstract void writeValueImpl(Object value, ByteBuffer buffer);
+    abstract void writeValueImpl(Object value, DataOutputStream buffer) throws IOException;
 
     /**
      * Reads an instance of the type from a specified byte buffer.
@@ -791,5 +792,5 @@ public enum AMQType
      *
      * @return An instance of the type.
      */
-    abstract Object readValueFromBuffer(ByteBuffer buffer);
+    abstract Object readValueFromBuffer(DataInputStream buffer) throws IOException;
 }

Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java?rev=1167311&r1=1167310&r2=1167311&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java Fri Sep  9 17:47:22 2011
@@ -20,8 +20,9 @@
  */
 package org.apache.qpid.framing;
 
-import org.apache.mina.common.ByteBuffer;
-
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
 import java.util.Date;
 import java.util.Map;
 import java.math.BigDecimal;
@@ -60,7 +61,7 @@ public class AMQTypedValue
         _value = type.toNativeValue(value);
     }
 
-    private AMQTypedValue(AMQType type, ByteBuffer buffer)
+    private AMQTypedValue(AMQType type, DataInputStream buffer) throws IOException
     {
         _type = type;
         _value = type.readValueFromBuffer(buffer);
@@ -76,7 +77,7 @@ public class AMQTypedValue
         return _value;
     }
 
-    public void writeToBuffer(ByteBuffer buffer)
+    public void writeToBuffer(DataOutputStream buffer) throws IOException
     {
         _type.writeToBuffer(_value, buffer);
     }
@@ -86,9 +87,9 @@ public class AMQTypedValue
         return _type.getEncodingSize(_value);
     }
 
-    public static AMQTypedValue readFromBuffer(ByteBuffer buffer)
+    public static AMQTypedValue readFromBuffer(DataInputStream buffer) throws IOException
     {
-        AMQType type = AMQTypeMap.getType(buffer.get());
+        AMQType type = AMQTypeMap.getType(buffer.readByte());
 
         return new AMQTypedValue(type, buffer);
     }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org