You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/09/09 19:47:27 UTC
svn commit: r1167311 [3/5] - in /qpid/trunk/qpid/java:
broker/src/main/java/org/apache/qpid/server/exchange/headers/
broker/src/main/java/org/apache/qpid/server/message/
broker/src/main/java/org/apache/qpid/server/output/amqp0_8/
broker/src/main/java/o...
Added: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java?rev=1167311&view=auto
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java (added)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentReader.java Fri Sep 9 17:47:22 2011
@@ -0,0 +1,674 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+import javax.jms.MessageNotReadableException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetDecoder;
+
+class TypedBytesContentReader implements TypedBytesCodes
+{
+
+ private final ByteBuffer _data;
+ private final int _position;
+ private final int _limit;
+
+
+ private static final Charset UTF8_CHARSET = Charset.forName("UTF-8");
+
+ private final CharsetDecoder _charsetDecoder = UTF8_CHARSET.newDecoder();
+
+ private int _byteArrayRemaining = -1;
+
+
+ public TypedBytesContentReader(final ByteBuffer data)
+ {
+ _data = data.duplicate();
+ _position = _data.position();
+ _limit = _data.limit();
+ }
+
+ /**
+ * Check that there is at least a certain number of bytes available to read
+ *
+ * @param len the number of bytes
+ * @throws javax.jms.MessageEOFException if there are less than len bytes available to read
+ */
+ protected void checkAvailable(int len) throws MessageEOFException
+ {
+ if (_data.remaining() < len)
+ {
+ throw new MessageEOFException("Unable to read " + len + " bytes");
+ }
+ }
+
+ protected byte readWireType() throws MessageFormatException, MessageEOFException,
+ MessageNotReadableException
+ {
+ checkAvailable(1);
+ return _data.get();
+ }
+
+ protected boolean readBoolean() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ boolean result;
+ try
+ {
+ switch (wireType)
+ {
+ case BOOLEAN_TYPE:
+ checkAvailable(1);
+ result = readBooleanImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Boolean.parseBoolean(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a boolean");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ boolean readBooleanImpl()
+ {
+ return _data.get() != 0;
+ }
+
+ protected byte readByte() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ byte result;
+ try
+ {
+ switch (wireType)
+ {
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Byte.parseByte(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a byte");
+ }
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ return result;
+ }
+
+ byte readByteImpl()
+ {
+ return _data.get();
+ }
+
+ protected short readShort() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ short result;
+ try
+ {
+ switch (wireType)
+ {
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Short.parseShort(readStringImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a short");
+ }
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ return result;
+ }
+
+ short readShortImpl()
+ {
+ return _data.getShort();
+ }
+
+ /**
+ * Note that this method reads a unicode character as two bytes from the stream
+ *
+ * @return the character read from the stream
+ * @throws javax.jms.JMSException
+ */
+ protected char readChar() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ try
+ {
+ if (wireType == NULL_STRING_TYPE)
+ {
+ throw new NullPointerException();
+ }
+
+ if (wireType != CHAR_TYPE)
+ {
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a char");
+ }
+ else
+ {
+ checkAvailable(2);
+ return readCharImpl();
+ }
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ char readCharImpl()
+ {
+ return _data.getChar();
+ }
+
+ protected int readInt() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ int result;
+ try
+ {
+ switch (wireType)
+ {
+ case INT_TYPE:
+ checkAvailable(4);
+ result = readIntImpl();
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Integer.parseInt(readStringImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to an int");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ protected int readIntImpl()
+ {
+ return _data.getInt();
+ }
+
+ protected long readLong() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ long result;
+ try
+ {
+ switch (wireType)
+ {
+ case LONG_TYPE:
+ checkAvailable(8);
+ result = readLongImpl();
+ break;
+ case INT_TYPE:
+ checkAvailable(4);
+ result = readIntImpl();
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Long.parseLong(readStringImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a long");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ long readLongImpl()
+ {
+ return _data.getLong();
+ }
+
+ protected float readFloat() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ float result;
+ try
+ {
+ switch (wireType)
+ {
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = readFloatImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Float.parseFloat(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a float");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ float readFloatImpl()
+ {
+ return _data.getFloat();
+ }
+
+ protected double readDouble() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ double result;
+ try
+ {
+ switch (wireType)
+ {
+ case DOUBLE_TYPE:
+ checkAvailable(8);
+ result = readDoubleImpl();
+ break;
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = readFloatImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Double.parseDouble(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a double");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ double readDoubleImpl()
+ {
+ return _data.getDouble();
+ }
+
+ protected String readString() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ String result;
+ try
+ {
+ switch (wireType)
+ {
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = readStringImpl();
+ break;
+ case NULL_STRING_TYPE:
+ result = null;
+ throw new NullPointerException("data is null");
+ case BOOLEAN_TYPE:
+ checkAvailable(1);
+ result = String.valueOf(readBooleanImpl());
+ break;
+ case LONG_TYPE:
+ checkAvailable(8);
+ result = String.valueOf(readLongImpl());
+ break;
+ case INT_TYPE:
+ checkAvailable(4);
+ result = String.valueOf(readIntImpl());
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = String.valueOf(readShortImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = String.valueOf(readByteImpl());
+ break;
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = String.valueOf(readFloatImpl());
+ break;
+ case DOUBLE_TYPE:
+ checkAvailable(8);
+ result = String.valueOf(readDoubleImpl());
+ break;
+ case CHAR_TYPE:
+ checkAvailable(2);
+ result = String.valueOf(readCharImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a String");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ protected String readStringImpl() throws JMSException
+ {
+ try
+ {
+ _charsetDecoder.reset();
+ ByteBuffer dup = _data.duplicate();
+ int pos = _data.position();
+ byte b;
+ while((b = _data.get()) != 0);
+ dup.limit(_data.position()-1);
+ return _charsetDecoder.decode(dup).toString();
+
+ }
+ catch (CharacterCodingException e)
+ {
+ JMSException jmse = new JMSException("Error decoding byte stream as a UTF8 string: " + e);
+ jmse.setLinkedException(e);
+ jmse.initCause(e);
+ throw jmse;
+ }
+ }
+
+ protected int readBytes(byte[] bytes) throws JMSException
+ {
+ if (bytes == null)
+ {
+ throw new IllegalArgumentException("byte array must not be null");
+ }
+ // first call
+ if (_byteArrayRemaining == -1)
+ {
+ // type discriminator checked separately so you get a MessageFormatException rather than
+ // an EOF even in the case where both would be applicable
+ checkAvailable(1);
+ byte wireType = readWireType();
+ if (wireType != BYTEARRAY_TYPE)
+ {
+ throw new MessageFormatException("Unable to convert " + wireType + " to a byte array");
+ }
+ checkAvailable(4);
+ int size = _data.getInt();
+ // length of -1 indicates null
+ if (size == -1)
+ {
+ return -1;
+ }
+ else
+ {
+ if (size > _data.remaining())
+ {
+ throw new MessageEOFException("Byte array has stated length "
+ + size
+ + " but message only contains "
+ +
+ _data.remaining()
+ + " bytes");
+ }
+ else
+ {
+ _byteArrayRemaining = size;
+ }
+ }
+ }
+ else if (_byteArrayRemaining == 0)
+ {
+ _byteArrayRemaining = -1;
+ return -1;
+ }
+
+ int returnedSize = readBytesImpl(bytes);
+ if (returnedSize < bytes.length)
+ {
+ _byteArrayRemaining = -1;
+ }
+ return returnedSize;
+ }
+
+ private int readBytesImpl(byte[] bytes)
+ {
+ int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining);
+ _byteArrayRemaining -= count;
+
+ if (count == 0)
+ {
+ return 0;
+ }
+ else
+ {
+ _data.get(bytes, 0, count);
+ return count;
+ }
+ }
+
+ protected Object readObject() throws JMSException
+ {
+ int position = _data.position();
+ byte wireType = readWireType();
+ Object result = null;
+ try
+ {
+ switch (wireType)
+ {
+ case BOOLEAN_TYPE:
+ checkAvailable(1);
+ result = readBooleanImpl();
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ case BYTEARRAY_TYPE:
+ checkAvailable(4);
+ int size = _data.getInt();
+ if (size == -1)
+ {
+ result = null;
+ }
+ else
+ {
+ _byteArrayRemaining = size;
+ byte[] bytesResult = new byte[size];
+ readBytesImpl(bytesResult);
+ result = bytesResult;
+ }
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case CHAR_TYPE:
+ checkAvailable(2);
+ result = readCharImpl();
+ break;
+ case INT_TYPE:
+ checkAvailable(4);
+ result = readIntImpl();
+ break;
+ case LONG_TYPE:
+ checkAvailable(8);
+ result = readLongImpl();
+ break;
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = readFloatImpl();
+ break;
+ case DOUBLE_TYPE:
+ checkAvailable(8);
+ result = readDoubleImpl();
+ break;
+ case NULL_STRING_TYPE:
+ result = null;
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = readStringImpl();
+ break;
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
+ }
+ }
+
+ public void reset()
+ {
+ _byteArrayRemaining = -1;
+ _data.position(_position);
+ _data.limit(_limit);
+ }
+
+ public ByteBuffer getData()
+ {
+ ByteBuffer buf = _data.duplicate();
+ buf.position(_position);
+ buf.limit(_limit);
+ return buf;
+ }
+
+ public long size()
+ {
+ return _limit - _position;
+ }
+
+ public int remaining()
+ {
+ return _data.remaining();
+ }
+
+ public void readRawBytes(final byte[] bytes, final int offset, final int count)
+ {
+ _data.get(bytes, offset, count);
+ }
+
+ public String readLengthPrefixedUTF() throws JMSException
+ {
+ try
+ {
+ short length = readShortImpl();
+ if(length == 0)
+ {
+ return "";
+ }
+ else
+ {
+ _charsetDecoder.reset();
+ ByteBuffer encodedString = _data.slice();
+ encodedString.limit(length);
+ _data.position(_data.position()+length);
+ CharBuffer string = _charsetDecoder.decode(encodedString);
+
+ return string.toString();
+ }
+ }
+ catch(CharacterCodingException e)
+ {
+ JMSException jmse = new JMSException("Error decoding byte stream as a UTF8 string: " + e);
+ jmse.setLinkedException(e);
+ jmse.initCause(e);
+ throw jmse;
+ }
+ }
+}
Added: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java?rev=1167311&view=auto
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java (added)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/TypedBytesContentWriter.java Fri Sep 9 17:47:22 2011
@@ -0,0 +1,370 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client.message;
+
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.CharacterCodingException;
+import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+
+class TypedBytesContentWriter implements TypedBytesCodes
+{
+ private final ByteArrayOutputStream _baos = new ByteArrayOutputStream();
+ private final DataOutputStream _data = new DataOutputStream(_baos);
+ private static final Charset UTF8 = Charset.forName("UTF-8");
+
+ protected void writeTypeDiscriminator(byte type) throws JMSException
+ {
+ try
+ {
+ _data.writeByte(type);
+ }
+ catch (IOException e)
+ {
+ throw handle(e);
+ }
+ }
+
+ private JMSException handle(final IOException e)
+ {
+ JMSException jmsEx = new JMSException("Unable to write value: " + e.getMessage());
+ jmsEx.setLinkedException(e);
+ return jmsEx;
+ }
+
+
+ protected void writeBoolean(boolean b) throws JMSException
+ {
+ writeTypeDiscriminator(BOOLEAN_TYPE);
+ writeBooleanImpl(b);
+ }
+
+ public void writeBooleanImpl(final boolean b) throws JMSException
+ {
+ try
+ {
+ _data.writeByte(b ? (byte) 1 : (byte) 0);
+ }
+ catch (IOException e)
+ {
+ throw handle(e);
+ }
+ }
+
+ protected void writeByte(byte b) throws JMSException
+ {
+ writeTypeDiscriminator(BYTE_TYPE);
+ writeByteImpl(b);
+ }
+
+ public void writeByteImpl(final byte b) throws JMSException
+ {
+ try
+ {
+ _data.writeByte(b);
+ }
+ catch (IOException e)
+ {
+ throw handle(e);
+ }
+ }
+
+ protected void writeShort(short i) throws JMSException
+ {
+ writeTypeDiscriminator(SHORT_TYPE);
+ writeShortImpl(i);
+ }
+
+ public void writeShortImpl(final short i) throws JMSException
+ {
+ try
+ {
+ _data.writeShort(i);
+ }
+ catch (IOException e)
+ {
+ throw handle(e);
+ }
+ }
+
+ protected void writeChar(char c) throws JMSException
+ {
+ writeTypeDiscriminator(CHAR_TYPE);
+ writeCharImpl(c);
+ }
+
+ public void writeCharImpl(final char c) throws JMSException
+ {
+ try
+ {
+ _data.writeChar(c);
+ }
+ catch (IOException e)
+ {
+ throw handle(e);
+ }
+ }
+
+ protected void writeInt(int i) throws JMSException
+ {
+ writeTypeDiscriminator(INT_TYPE);
+ writeIntImpl(i);
+ }
+
+ protected void writeIntImpl(int i) throws JMSException
+ {
+ try
+ {
+ _data.writeInt(i);
+ }
+ catch (IOException e)
+ {
+ throw handle(e);
+ }
+ }
+
+ protected void writeLong(long l) throws JMSException
+ {
+ writeTypeDiscriminator(LONG_TYPE);
+ writeLongImpl(l);
+ }
+
+ public void writeLongImpl(final long l) throws JMSException
+ {
+ try
+ {
+ _data.writeLong(l);
+ }
+ catch (IOException e)
+ {
+ throw handle(e);
+ }
+ }
+
+ protected void writeFloat(float v) throws JMSException
+ {
+ writeTypeDiscriminator(FLOAT_TYPE);
+ writeFloatImpl(v);
+ }
+
+ public void writeFloatImpl(final float v) throws JMSException
+ {
+ try
+ {
+ _data.writeFloat(v);
+ }
+ catch (IOException e)
+ {
+ throw handle(e);
+ }
+ }
+
+ protected void writeDouble(double v) throws JMSException
+ {
+ writeTypeDiscriminator(DOUBLE_TYPE);
+ writeDoubleImpl(v);
+ }
+
+ public void writeDoubleImpl(final double v) throws JMSException
+ {
+ try
+ {
+ _data.writeDouble(v);
+ }
+ catch (IOException e)
+ {
+ throw handle(e);
+ }
+ }
+
+ protected void writeString(String string) throws JMSException
+ {
+ if (string == null)
+ {
+ writeTypeDiscriminator(NULL_STRING_TYPE);
+ }
+ else
+ {
+ writeTypeDiscriminator(STRING_TYPE);
+ writeNullTerminatedStringImpl(string);
+ }
+ }
+
+ protected void writeNullTerminatedStringImpl(String string)
+ throws JMSException
+ {
+ try
+ {
+ _data.write(string.getBytes(UTF8));
+ _data.writeByte((byte) 0);
+ }
+ catch (IOException e)
+ {
+ throw handle(e);
+ }
+
+ }
+
+ protected void writeBytes(byte[] bytes) throws JMSException
+ {
+ writeBytes(bytes, 0, bytes == null ? 0 : bytes.length);
+ }
+
+ protected void writeBytes(byte[] bytes, int offset, int length) throws JMSException
+ {
+ writeTypeDiscriminator(BYTEARRAY_TYPE);
+ writeBytesImpl(bytes, offset, length);
+ }
+
+ public void writeBytesImpl(final byte[] bytes, final int offset, final int length) throws JMSException
+ {
+ try
+ {
+ if (bytes == null)
+ {
+ _data.writeInt(-1);
+ }
+ else
+ {
+ _data.writeInt(length);
+ _data.write(bytes, offset, length);
+ }
+ }
+ catch (IOException e)
+ {
+ throw handle(e);
+ }
+ }
+
+ public void writeBytesRaw(final byte[] bytes, final int offset, final int length) throws JMSException
+ {
+ try
+ {
+ if (bytes != null)
+ {
+ _data.write(bytes, offset, length);
+ }
+ }
+ catch (IOException e)
+ {
+ throw handle(e);
+ }
+ }
+
+
+ protected void writeObject(Object object) throws JMSException
+ {
+ Class clazz;
+
+ if (object == null)
+ {
+ // string handles the output of null values
+ clazz = String.class;
+ }
+ else
+ {
+ clazz = object.getClass();
+ }
+
+ if (clazz == Byte.class)
+ {
+ writeByte((Byte) object);
+ }
+ else if (clazz == Boolean.class)
+ {
+ writeBoolean((Boolean) object);
+ }
+ else if (clazz == byte[].class)
+ {
+ writeBytes((byte[]) object);
+ }
+ else if (clazz == Short.class)
+ {
+ writeShort((Short) object);
+ }
+ else if (clazz == Character.class)
+ {
+ writeChar((Character) object);
+ }
+ else if (clazz == Integer.class)
+ {
+ writeInt((Integer) object);
+ }
+ else if (clazz == Long.class)
+ {
+ writeLong((Long) object);
+ }
+ else if (clazz == Float.class)
+ {
+ writeFloat((Float) object);
+ }
+ else if (clazz == Double.class)
+ {
+ writeDouble((Double) object);
+ }
+ else if (clazz == String.class)
+ {
+ writeString((String) object);
+ }
+ else
+ {
+ throw new MessageFormatException("Only primitives plus byte arrays and String are valid types");
+ }
+ }
+
+ public ByteBuffer getData()
+ {
+ return ByteBuffer.wrap(_baos.toByteArray());
+ }
+
+ public void writeLengthPrefixedUTF(final String string) throws JMSException
+ {
+ try
+ {
+ CharsetEncoder encoder = UTF8.newEncoder();
+ java.nio.ByteBuffer encodedString = encoder.encode(CharBuffer.wrap(string));
+
+ writeShortImpl((short) encodedString.limit());
+ while(encodedString.hasRemaining())
+ {
+ _data.writeByte(encodedString.get());
+ }
+ }
+ catch (CharacterCodingException e)
+ {
+ JMSException jmse = new JMSException("Unable to encode string: " + e);
+ jmse.setLinkedException(e);
+ jmse.initCause(e);
+ throw jmse;
+ }
+ catch (IOException e)
+ {
+ throw handle(e);
+ }
+
+ }
+}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java?rev=1167311&r1=1167310&r2=1167311&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java Fri Sep 9 17:47:22 2011
@@ -87,9 +87,9 @@ public class UnprocessedMessage_0_8 exte
public void receiveBody(ContentBody body)
{
- if (body.payload != null)
+ if (body._payload != null)
{
- final long payloadSize = body.payload.remaining();
+ final long payloadSize = body._payload.length;
if (_bodies == null)
{
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1167311&r1=1167310&r2=1167311&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Fri Sep 9 17:47:22 2011
@@ -20,7 +20,9 @@
*/
package org.apache.qpid.client.protocol;
+import java.io.DataOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -524,7 +526,7 @@ public class AMQProtocolHandler implemen
public synchronized void writeFrame(AMQDataBlock frame, boolean wait)
{
- final ByteBuffer buf = frame.toNioByteBuffer();
+ final ByteBuffer buf = asByteBuffer(frame);
_writtenBytes += buf.remaining();
_sender.send(buf);
_sender.flush();
@@ -547,6 +549,39 @@ public class AMQProtocolHandler implemen
}
+ private ByteBuffer asByteBuffer(AMQDataBlock block)
+ {
+ final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize());
+
+ try
+ {
+ block.writePayload(new DataOutputStream(new OutputStream()
+ {
+
+
+ @Override
+ public void write(int b) throws IOException
+ {
+ buf.put((byte) b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException
+ {
+ buf.put(b, off, len);
+ }
+ }));
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+
+ buf.flip();
+ return buf;
+ }
+
+
/**
* Convenience method that writes a frame to the protocol session and waits for a particular response. Equivalent to
* calling getProtocolSession().write() then waiting for the response.
Modified: qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java?rev=1167311&r1=1167310&r2=1167311&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java (original)
+++ qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/util/ClassLoadingAwareObjectInputStreamTest.java Fri Sep 9 17:47:22 2011
@@ -20,13 +20,13 @@
*/
package org.apache.qpid.client.util;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.ObjectOutputStream;
-import java.lang.reflect.Proxy;
import java.util.Arrays;
import java.util.List;
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.test.utils.QpidTestCase;
public class ClassLoadingAwareObjectInputStreamTest extends QpidTestCase
@@ -37,16 +37,15 @@ public class ClassLoadingAwareObjectInpu
protected void setUp() throws Exception
{
//Create a viable input stream for instantiating the CLA OIS
- ByteBuffer buf = ByteBuffer.allocate(10);
- buf.setAutoExpand(true);
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
- ObjectOutputStream out = new ObjectOutputStream(buf.asOutputStream());
+ ObjectOutputStream out = new ObjectOutputStream(baos);
out.writeObject("testString");
out.flush();
out.close();
- buf.rewind();
- _in = buf.asInputStream();
+
+ _in = new ByteArrayInputStream(baos.toByteArray());
_claOIS = new ClassLoadingAwareObjectInputStream(_in);
}
Modified: qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageUnitTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageUnitTest.java?rev=1167311&r1=1167310&r2=1167311&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageUnitTest.java (original)
+++ qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/ObjectMessageUnitTest.java Fri Sep 9 17:47:22 2011
@@ -45,8 +45,6 @@ public class ObjectMessageUnitTest exten
_om.setObject(true);
//make the message readable
- _om.reset();
-
Object object = _om.getObject();
assertTrue("Unexpected type returned", object instanceof Boolean);
@@ -61,8 +59,6 @@ public class ObjectMessageUnitTest exten
_om.setObject("test string");
//make the message readable
- _om.reset();
-
Object object = _om.getObject();
assertTrue("Unexpected type returned", object instanceof String);
@@ -87,7 +83,6 @@ public class ObjectMessageUnitTest exten
list.add(0);
//make the message readable
- _om.reset();
//retrieve the Object
Object object = _om.getObject();
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=1167311&r1=1167310&r2=1167311&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java Fri Sep 9 17:47:22 2011
@@ -20,10 +20,9 @@
*/
package org.apache.qpid.codec;
-import java.util.ArrayList;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.SimpleByteBufferAllocator;
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.*;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQDataBlockDecoder;
@@ -61,12 +60,11 @@ public class AMQDecoder
/** Flag to indicate whether this decoder needs to handle protocol initiation. */
private boolean _expectProtocolInitiation;
- private boolean firstDecode = true;
private AMQMethodBodyFactory _bodyFactory;
- private ByteBuffer _remainingBuf;
-
+ private List<ByteArrayInputStream> _remainingBufs = new ArrayList<ByteArrayInputStream>();
+
/**
* Creates a new AMQP decoder.
*
@@ -92,62 +90,168 @@ public class AMQDecoder
_expectProtocolInitiation = expectProtocolInitiation;
}
+ private class RemainingByteArrayInputStream extends InputStream
+ {
+ private int _currentListPos;
+ private int _markPos;
- private static final SimpleByteBufferAllocator SIMPLE_BYTE_BUFFER_ALLOCATOR = new SimpleByteBufferAllocator();
- public ArrayList<AMQDataBlock> decodeBuffer(java.nio.ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException
- {
+ @Override
+ public int read() throws IOException
+ {
+ ByteArrayInputStream currentStream = _remainingBufs.get(_currentListPos);
+ if(currentStream.available() > 0)
+ {
+ return currentStream.read();
+ }
+ else if((_currentListPos == _remainingBufs.size())
+ || (++_currentListPos == _remainingBufs.size()))
+ {
+ return -1;
+ }
+ else
+ {
- // get prior remaining data from accumulator
- ArrayList<AMQDataBlock> dataBlocks = new ArrayList<AMQDataBlock>();
- ByteBuffer msg;
- // if we have a session buffer, append data to that otherwise
- // use the buffer read from the network directly
- if( _remainingBuf != null )
- {
- _remainingBuf.put(buf);
- _remainingBuf.flip();
- msg = _remainingBuf;
+ ByteArrayInputStream stream = _remainingBufs.get(_currentListPos);
+ stream.mark(0);
+ return stream.read();
+ }
}
- else
+
+ @Override
+ public int read(final byte[] b, final int off, final int len) throws IOException
{
- msg = ByteBuffer.wrap(buf);
+
+ if(_currentListPos == _remainingBufs.size())
+ {
+ return -1;
+ }
+ else
+ {
+ ByteArrayInputStream currentStream = _remainingBufs.get(_currentListPos);
+ final int available = currentStream.available();
+ int read = currentStream.read(b, off, len > available ? available : len);
+ if(read < len)
+ {
+ if(_currentListPos++ != _remainingBufs.size())
+ {
+ _remainingBufs.get(_currentListPos).mark(0);
+ }
+ int correctRead = read == -1 ? 0 : read;
+ int subRead = read(b, off+correctRead, len-correctRead);
+ if(subRead == -1)
+ {
+ return read;
+ }
+ else
+ {
+ return correctRead+subRead;
+ }
+ }
+ else
+ {
+ return len;
+ }
+ }
}
-
- if (_expectProtocolInitiation
- || (firstDecode
- && (msg.remaining() > 0)
- && (msg.get(msg.position()) == (byte)'A')))
+
+ @Override
+ public int available() throws IOException
{
- if (_piDecoder.decodable(msg.buf()))
+ int total = 0;
+ for(int i = _currentListPos; i < _remainingBufs.size(); i++)
{
- dataBlocks.add(new ProtocolInitiation(msg.buf()));
+ total += _remainingBufs.get(i).available();
}
+ return total;
}
- else
+
+ @Override
+ public void mark(final int readlimit)
{
- boolean enoughData = true;
- while (enoughData)
+ _markPos = _currentListPos;
+ final ByteArrayInputStream stream = _remainingBufs.get(_currentListPos);
+ if(stream != null)
{
- int pos = msg.position();
+ stream.mark(readlimit);
+ }
+ }
+ @Override
+ public void reset() throws IOException
+ {
+ _currentListPos = _markPos;
+ final int size = _remainingBufs.size();
+ if(_currentListPos < size)
+ {
+ _remainingBufs.get(_currentListPos).reset();
+ }
+ for(int i = _currentListPos+1; i<size; i++)
+ {
+ _remainingBufs.get(i).reset();
+ }
+ }
+ }
+
+
+ public ArrayList<AMQDataBlock> decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
+ {
+
+ // get prior remaining data from accumulator
+ ArrayList<AMQDataBlock> dataBlocks = new ArrayList<AMQDataBlock>();
+ DataInputStream msg;
+
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(buf.array(),buf.arrayOffset()+buf.position(), buf.remaining());
+ if(!_remainingBufs.isEmpty())
+ {
+ _remainingBufs.add(bais);
+ msg = new DataInputStream(new RemainingByteArrayInputStream());
+ }
+ else
+ {
+ msg = new DataInputStream(bais);
+ }
+
+ boolean enoughData = true;
+ while (enoughData)
+ {
+ if(!_expectProtocolInitiation)
+ {
enoughData = _dataBlockDecoder.decodable(msg);
- msg.position(pos);
if (enoughData)
{
dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_bodyFactory, msg));
}
- else
+ }
+ else
+ {
+ enoughData = _piDecoder.decodable(msg);
+ if (enoughData)
{
- _remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate(msg.remaining(), false);
- _remainingBuf.setAutoExpand(true);
- _remainingBuf.put(msg);
+ dataBlocks.add(new ProtocolInitiation(msg));
+ }
+
+ }
+
+ if(!enoughData)
+ {
+ if(!_remainingBufs.isEmpty())
+ {
+ _remainingBufs.remove(_remainingBufs.size()-1);
+ ListIterator<ByteArrayInputStream> iterator = _remainingBufs.listIterator();
+ while(iterator.hasNext() && iterator.next().available() == 0)
+ {
+ iterator.remove();
+ }
+ }
+ if(bais.available()!=0)
+ {
+ byte[] remaining = new byte[bais.available()];
+ bais.read(remaining);
+ _remainingBufs.add(new ByteArrayInputStream(remaining));
}
}
- }
- if(firstDecode && dataBlocks.size() > 0)
- {
- firstDecode = false;
}
return dataBlocks;
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java?rev=1167311&r1=1167310&r2=1167311&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java Fri Sep 9 17:47:22 2011
@@ -20,7 +20,9 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.AMQException;
@@ -34,7 +36,7 @@ public interface AMQBody
*/
public abstract int getSize();
- public void writePayload(ByteBuffer buffer);
+ public void writePayload(DataOutputStream buffer) throws IOException;
- void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) throws AMQException;
+ void handle(final int channelId, final AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException;
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java?rev=1167311&r1=1167310&r2=1167311&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java Fri Sep 9 17:47:22 2011
@@ -20,7 +20,10 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
/**
* A data block represents something that has a size in bytes and the ability to write itself to a byte
@@ -39,25 +42,6 @@ public abstract class AMQDataBlock imple
* Writes the datablock to the specified buffer.
* @param buffer
*/
- public abstract void writePayload(ByteBuffer buffer);
-
- public ByteBuffer toByteBuffer()
- {
- final ByteBuffer buffer = ByteBuffer.allocate((int)getSize());
-
- writePayload(buffer);
- buffer.flip();
- return buffer;
- }
-
- public java.nio.ByteBuffer toNioByteBuffer()
- {
- final java.nio.ByteBuffer buffer = java.nio.ByteBuffer.allocate((int) getSize());
-
- ByteBuffer buf = ByteBuffer.wrap(buffer);
- writePayload(buf);
- buffer.flip();
- return buffer;
- }
+ public abstract void writePayload(DataOutputStream buffer) throws IOException;
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java?rev=1167311&r1=1167310&r2=1167311&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java Fri Sep 9 17:47:22 2011
@@ -20,11 +20,12 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.DataInputStream;
+import java.io.IOException;
+
public class AMQDataBlockDecoder
{
@@ -42,27 +43,32 @@ public class AMQDataBlockDecoder
public AMQDataBlockDecoder()
{ }
- public boolean decodable(java.nio.ByteBuffer in) throws AMQFrameDecodingException
+ public boolean decodable(DataInputStream in) throws AMQFrameDecodingException, IOException
{
- final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1);
+ final int remainingAfterAttributes = in.available() - (1 + 2 + 4 + 1);
// type, channel, body length and end byte
if (remainingAfterAttributes < 0)
{
return false;
}
- in.position(in.position() + 1 + 2);
+ in.mark(8);
+ in.skip(1 + 2);
+
+
// Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt()
- final long bodySize = in.getInt() & 0xffffffffL;
+ final long bodySize = in.readInt() & 0xffffffffL;
+
+ in.reset();
return (remainingAfterAttributes >= bodySize);
}
- public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, ByteBuffer in)
- throws AMQFrameDecodingException, AMQProtocolVersionException
+ public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, DataInputStream in)
+ throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
{
- final byte type = in.get();
+ final byte type = in.readByte();
BodyFactory bodyFactory;
if (type == AMQMethodBody.TYPE)
@@ -79,8 +85,8 @@ public class AMQDataBlockDecoder
throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null);
}
- final int channel = in.getUnsignedShort();
- final long bodySize = in.getUnsignedInt();
+ final int channel = in.readUnsignedShort();
+ final long bodySize = EncodingUtils.readUnsignedInteger(in);
// bodySize can be zero
if ((channel < 0) || (bodySize < 0))
@@ -91,7 +97,7 @@ public class AMQDataBlockDecoder
AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory);
- byte marker = in.get();
+ byte marker = in.readByte();
if ((marker & 0xFF) != 0xCE)
{
throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize
@@ -101,13 +107,4 @@ public class AMQDataBlockDecoder
return frame;
}
- public boolean decodable(ByteBuffer msg) throws AMQFrameDecodingException
- {
- return decodable(msg.buf());
- }
-
- public AMQDataBlock createAndPopulateFrame(AMQMethodBodyFactory factory, java.nio.ByteBuffer msg) throws AMQProtocolVersionException, AMQFrameDecodingException
- {
- return createAndPopulateFrame(factory, ByteBuffer.wrap(msg));
- }
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java?rev=1167311&r1=1167310&r2=1167311&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java Fri Sep 9 17:47:22 2011
@@ -20,7 +20,9 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
{
@@ -36,7 +38,7 @@ public class AMQFrame extends AMQDataBlo
_bodyFrame = bodyFrame;
}
- public AMQFrame(final ByteBuffer in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException
+ public AMQFrame(final DataInputStream in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException, IOException
{
this._channel = channel;
this._bodyFrame = bodyFactory.createBody(in,bodySize);
@@ -53,13 +55,13 @@ public class AMQFrame extends AMQDataBlo
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
- buffer.put(_bodyFrame.getFrameType());
+ buffer.writeByte(_bodyFrame.getFrameType());
EncodingUtils.writeUnsignedShort(buffer, _channel);
EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize());
_bodyFrame.writePayload(buffer);
- buffer.put(FRAME_END_BYTE);
+ buffer.writeByte(FRAME_END_BYTE);
}
public final int getChannel()
@@ -77,48 +79,48 @@ public class AMQFrame extends AMQDataBlo
return "Frame channelId: " + _channel + ", bodyFrame: " + String.valueOf(_bodyFrame);
}
- public static void writeFrame(ByteBuffer buffer, final int channel, AMQBody body)
+ public static void writeFrame(DataOutputStream buffer, final int channel, AMQBody body) throws IOException
{
- buffer.put(body.getFrameType());
+ buffer.writeByte(body.getFrameType());
EncodingUtils.writeUnsignedShort(buffer, channel);
EncodingUtils.writeUnsignedInteger(buffer, body.getSize());
body.writePayload(buffer);
- buffer.put(FRAME_END_BYTE);
+ buffer.writeByte(FRAME_END_BYTE);
}
- public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2)
+ public static void writeFrames(DataOutputStream buffer, final int channel, AMQBody body1, AMQBody body2) throws IOException
{
- buffer.put(body1.getFrameType());
+ buffer.writeByte(body1.getFrameType());
EncodingUtils.writeUnsignedShort(buffer, channel);
EncodingUtils.writeUnsignedInteger(buffer, body1.getSize());
body1.writePayload(buffer);
- buffer.put(FRAME_END_BYTE);
- buffer.put(body2.getFrameType());
+ buffer.writeByte(FRAME_END_BYTE);
+ buffer.writeByte(body2.getFrameType());
EncodingUtils.writeUnsignedShort(buffer, channel);
EncodingUtils.writeUnsignedInteger(buffer, body2.getSize());
body2.writePayload(buffer);
- buffer.put(FRAME_END_BYTE);
+ buffer.writeByte(FRAME_END_BYTE);
}
- public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2, AMQBody body3)
+ public static void writeFrames(DataOutputStream buffer, final int channel, AMQBody body1, AMQBody body2, AMQBody body3) throws IOException
{
- buffer.put(body1.getFrameType());
+ buffer.writeByte(body1.getFrameType());
EncodingUtils.writeUnsignedShort(buffer, channel);
EncodingUtils.writeUnsignedInteger(buffer, body1.getSize());
body1.writePayload(buffer);
- buffer.put(FRAME_END_BYTE);
- buffer.put(body2.getFrameType());
+ buffer.writeByte(FRAME_END_BYTE);
+ buffer.writeByte(body2.getFrameType());
EncodingUtils.writeUnsignedShort(buffer, channel);
EncodingUtils.writeUnsignedInteger(buffer, body2.getSize());
body2.writePayload(buffer);
- buffer.put(FRAME_END_BYTE);
- buffer.put(body3.getFrameType());
+ buffer.writeByte(FRAME_END_BYTE);
+ buffer.writeByte(body3.getFrameType());
EncodingUtils.writeUnsignedShort(buffer, channel);
EncodingUtils.writeUnsignedInteger(buffer, body3.getSize());
body3.writePayload(buffer);
- buffer.put(FRAME_END_BYTE);
+ buffer.writeByte(FRAME_END_BYTE);
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java?rev=1167311&r1=1167310&r2=1167311&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java Fri Sep 9 17:47:22 2011
@@ -20,12 +20,14 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
public interface AMQMethodBody extends AMQBody
{
public static final byte TYPE = 1;
@@ -43,12 +45,12 @@ public interface AMQMethodBody extends A
/** @return unsigned short */
public int getMethod();
- public void writeMethodPayload(ByteBuffer buffer);
+ public void writeMethodPayload(DataOutputStream buffer) throws IOException;
public int getSize();
- public void writePayload(ByteBuffer buffer);
+ public void writePayload(DataOutputStream buffer) throws IOException;
//public abstract void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException;
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java?rev=1167311&r1=1167310&r2=1167311&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java Fri Sep 9 17:47:22 2011
@@ -20,13 +20,14 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
-
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.DataInputStream;
+import java.io.IOException;
+
public class AMQMethodBodyFactory implements BodyFactory
{
private static final Logger _log = LoggerFactory.getLogger(AMQMethodBodyFactory.class);
@@ -38,7 +39,7 @@ public class AMQMethodBodyFactory implem
_protocolSession = protocolSession;
}
- public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
+ public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException, IOException
{
return _protocolSession.getMethodRegistry().convertToBody(in, bodySize);
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java?rev=1167311&r1=1167310&r2=1167311&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java Fri Sep 9 17:47:22 2011
@@ -21,13 +21,16 @@ package org.apache.qpid.framing;
*
*/
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
public abstract class AMQMethodBodyImpl implements AMQMethodBody
{
public static final byte TYPE = 1;
@@ -98,7 +101,7 @@ public abstract class AMQMethodBodyImpl
return 2 + 2 + getBodySize();
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
EncodingUtils.writeUnsignedShort(buffer, getClazz());
EncodingUtils.writeUnsignedShort(buffer, getMethod());
@@ -106,12 +109,12 @@ public abstract class AMQMethodBodyImpl
}
- protected byte readByte(ByteBuffer buffer)
+ protected byte readByte(DataInputStream buffer) throws IOException
{
- return buffer.get();
+ return buffer.readByte();
}
- protected AMQShortString readAMQShortString(ByteBuffer buffer)
+ protected AMQShortString readAMQShortString(DataInputStream buffer) throws IOException
{
return EncodingUtils.readAMQShortString(buffer);
}
@@ -121,27 +124,27 @@ public abstract class AMQMethodBodyImpl
return EncodingUtils.encodedShortStringLength(string);
}
- protected void writeByte(ByteBuffer buffer, byte b)
+ protected void writeByte(DataOutputStream buffer, byte b) throws IOException
{
- buffer.put(b);
+ buffer.writeByte(b);
}
- protected void writeAMQShortString(ByteBuffer buffer, AMQShortString string)
+ protected void writeAMQShortString(DataOutputStream buffer, AMQShortString string) throws IOException
{
EncodingUtils.writeShortStringBytes(buffer, string);
}
- protected int readInt(ByteBuffer buffer)
+ protected int readInt(DataInputStream buffer) throws IOException
{
- return buffer.getInt();
+ return buffer.readInt();
}
- protected void writeInt(ByteBuffer buffer, int i)
+ protected void writeInt(DataOutputStream buffer, int i) throws IOException
{
- buffer.putInt(i);
+ buffer.writeInt(i);
}
- protected FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException
+ protected FieldTable readFieldTable(DataInputStream buffer) throws AMQFrameDecodingException, IOException
{
return EncodingUtils.readFieldTable(buffer);
}
@@ -151,19 +154,19 @@ public abstract class AMQMethodBodyImpl
return EncodingUtils.encodedFieldTableLength(table); //To change body of created methods use File | Settings | File Templates.
}
- protected void writeFieldTable(ByteBuffer buffer, FieldTable table)
+ protected void writeFieldTable(DataOutputStream buffer, FieldTable table) throws IOException
{
EncodingUtils.writeFieldTableBytes(buffer, table);
}
- protected long readLong(ByteBuffer buffer)
+ protected long readLong(DataInputStream buffer) throws IOException
{
- return buffer.getLong();
+ return buffer.readLong();
}
- protected void writeLong(ByteBuffer buffer, long l)
+ protected void writeLong(DataOutputStream buffer, long l) throws IOException
{
- buffer.putLong(l);
+ buffer.writeLong(l);
}
protected int getSizeOf(byte[] response)
@@ -171,87 +174,86 @@ public abstract class AMQMethodBodyImpl
return (response == null) ? 4 : response.length + 4;
}
- protected void writeBytes(ByteBuffer buffer, byte[] data)
+ protected void writeBytes(DataOutputStream buffer, byte[] data) throws IOException
{
EncodingUtils.writeBytes(buffer,data);
}
- protected byte[] readBytes(ByteBuffer buffer)
+ protected byte[] readBytes(DataInputStream buffer) throws IOException
{
return EncodingUtils.readBytes(buffer);
}
- protected short readShort(ByteBuffer buffer)
+ protected short readShort(DataInputStream buffer) throws IOException
{
return EncodingUtils.readShort(buffer);
}
- protected void writeShort(ByteBuffer buffer, short s)
+ protected void writeShort(DataOutputStream buffer, short s) throws IOException
{
EncodingUtils.writeShort(buffer, s);
}
- protected Content readContent(ByteBuffer buffer)
+ protected Content readContent(DataInputStream buffer)
{
- return null; //To change body of created methods use File | Settings | File Templates.
+ return null;
}
protected int getSizeOf(Content body)
{
- return 0; //To change body of created methods use File | Settings | File Templates.
+ return 0;
}
- protected void writeContent(ByteBuffer buffer, Content body)
+ protected void writeContent(DataOutputStream buffer, Content body)
{
- //To change body of created methods use File | Settings | File Templates.
}
- protected byte readBitfield(ByteBuffer buffer)
+ protected byte readBitfield(DataInputStream buffer) throws IOException
{
- return readByte(buffer); //To change body of created methods use File | Settings | File Templates.
+ return readByte(buffer);
}
- protected int readUnsignedShort(ByteBuffer buffer)
+ protected int readUnsignedShort(DataInputStream buffer) throws IOException
{
- return buffer.getUnsignedShort(); //To change body of created methods use File | Settings | File Templates.
+ return buffer.readUnsignedShort();
}
- protected void writeBitfield(ByteBuffer buffer, byte bitfield0)
+ protected void writeBitfield(DataOutputStream buffer, byte bitfield0) throws IOException
{
- buffer.put(bitfield0);
+ buffer.writeByte(bitfield0);
}
- protected void writeUnsignedShort(ByteBuffer buffer, int s)
+ protected void writeUnsignedShort(DataOutputStream buffer, int s) throws IOException
{
EncodingUtils.writeUnsignedShort(buffer, s);
}
- protected long readUnsignedInteger(ByteBuffer buffer)
+ protected long readUnsignedInteger(DataInputStream buffer) throws IOException
{
- return buffer.getUnsignedInt();
+ return EncodingUtils.readUnsignedInteger(buffer);
}
- protected void writeUnsignedInteger(ByteBuffer buffer, long i)
+ protected void writeUnsignedInteger(DataOutputStream buffer, long i) throws IOException
{
EncodingUtils.writeUnsignedInteger(buffer, i);
}
- protected short readUnsignedByte(ByteBuffer buffer)
+ protected short readUnsignedByte(DataInputStream buffer) throws IOException
{
- return buffer.getUnsigned();
+ return (short) buffer.readUnsignedByte();
}
- protected void writeUnsignedByte(ByteBuffer buffer, short unsignedByte)
+ protected void writeUnsignedByte(DataOutputStream buffer, short unsignedByte) throws IOException
{
EncodingUtils.writeUnsignedByte(buffer, unsignedByte);
}
- protected long readTimestamp(ByteBuffer buffer)
+ protected long readTimestamp(DataInputStream buffer) throws IOException
{
return EncodingUtils.readTimestamp(buffer);
}
- protected void writeTimestamp(ByteBuffer buffer, long t)
+ protected void writeTimestamp(DataOutputStream buffer, long t) throws IOException
{
EncodingUtils.writeTimestamp(buffer, t);
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java?rev=1167311&r1=1167310&r2=1167311&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java Fri Sep 9 17:47:22 2011
@@ -21,10 +21,11 @@
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataInputStream;
+import java.io.IOException;
public abstract interface AMQMethodBodyInstanceFactory
{
- public AMQMethodBody newInstance(ByteBuffer buffer, long size) throws AMQFrameDecodingException;
+ public AMQMethodBody newInstance(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException;
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java?rev=1167311&r1=1167310&r2=1167311&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java Fri Sep 9 17:47:22 2011
@@ -21,11 +21,12 @@
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.util.*;
import java.lang.ref.WeakReference;
@@ -199,27 +200,16 @@ public final class AMQShortString implem
}
- private AMQShortString(ByteBuffer data, final int length)
+ private AMQShortString(DataInputStream data, final int length) throws IOException
{
if (length > MAX_LENGTH)
{
throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
}
- if(data.isDirect() || data.isReadOnly())
- {
- byte[] dataBytes = new byte[length];
- data.get(dataBytes);
- _data = dataBytes;
- _offset = 0;
- }
- else
- {
-
- _data = data.array();
- _offset = data.arrayOffset() + data.position();
- data.skip(length);
-
- }
+ byte[] dataBytes = new byte[length];
+ data.read(dataBytes);
+ _data = dataBytes;
+ _offset = 0;
_length = length;
}
@@ -275,9 +265,9 @@ public final class AMQShortString implem
return new CharSubSequence(start, end);
}
- public static AMQShortString readFromBuffer(ByteBuffer buffer)
+ public static AMQShortString readFromBuffer(DataInputStream buffer) throws IOException
{
- final short length = buffer.getUnsigned();
+ final int length = buffer.readUnsignedByte();
if (length == 0)
{
return null;
@@ -303,13 +293,13 @@ public final class AMQShortString implem
}
}
- public void writeToBuffer(ByteBuffer buffer)
+ public void writeToBuffer(DataOutputStream buffer) throws IOException
{
final int size = length();
//buffer.setAutoExpand(true);
- buffer.put((byte) size);
- buffer.put(_data, _offset, size);
+ buffer.write((byte) size);
+ buffer.write(_data, _offset, size);
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java?rev=1167311&r1=1167310&r2=1167311&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java Fri Sep 9 17:47:22 2011
@@ -20,8 +20,9 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
-
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.math.BigDecimal;
/**
@@ -60,12 +61,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
{
EncodingUtils.writeLongStringBytes(buffer, (String) value);
}
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer) throws IOException
{
return EncodingUtils.readLongString(buffer);
}
@@ -106,12 +107,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
{
EncodingUtils.writeUnsignedInteger(buffer, (Long) value);
}
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer) throws IOException
{
return EncodingUtils.readUnsignedInteger(buffer);
}
@@ -137,7 +138,7 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
{
BigDecimal bd = (BigDecimal) value;
@@ -150,7 +151,7 @@ public enum AMQType
EncodingUtils.writeInteger(buffer, unscaled);
}
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer) throws IOException
{
byte places = EncodingUtils.readByte(buffer);
@@ -182,12 +183,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
{
EncodingUtils.writeLong(buffer, (Long) value);
}
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer) throws IOException
{
return EncodingUtils.readLong(buffer);
}
@@ -246,7 +247,7 @@ public enum AMQType
* @param value An instance of the type.
* @param buffer The byte buffer to write it to.
*/
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
{
// Ensure that the value is a FieldTable.
if (!(value instanceof FieldTable))
@@ -267,7 +268,7 @@ public enum AMQType
*
* @return An instance of the type.
*/
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer) throws IOException
{
try
{
@@ -301,10 +302,10 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer)
{ }
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer)
{
return null;
}
@@ -330,12 +331,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
{
EncodingUtils.writeLongstr(buffer, (byte[]) value);
}
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer) throws IOException
{
return EncodingUtils.readLongstr(buffer);
}
@@ -360,12 +361,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
{
EncodingUtils.writeLongStringBytes(buffer, (String) value);
}
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer) throws IOException
{
return EncodingUtils.readLongString(buffer);
}
@@ -391,12 +392,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
{
EncodingUtils.writeLongStringBytes(buffer, (String) value);
}
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer) throws IOException
{
return EncodingUtils.readLongString(buffer);
}
@@ -426,12 +427,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
{
EncodingUtils.writeBoolean(buffer, (Boolean) value);
}
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer) throws IOException
{
return EncodingUtils.readBoolean(buffer);
}
@@ -461,12 +462,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
{
EncodingUtils.writeChar(buffer, (Character) value);
}
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer) throws IOException
{
return EncodingUtils.readChar(buffer);
}
@@ -496,12 +497,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
{
EncodingUtils.writeByte(buffer, (Byte) value);
}
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer) throws IOException
{
return EncodingUtils.readByte(buffer);
}
@@ -535,12 +536,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
{
EncodingUtils.writeShort(buffer, (Short) value);
}
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer) throws IOException
{
return EncodingUtils.readShort(buffer);
}
@@ -577,12 +578,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
{
EncodingUtils.writeInteger(buffer, (Integer) value);
}
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer) throws IOException
{
return EncodingUtils.readInteger(buffer);
}
@@ -624,12 +625,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
{
EncodingUtils.writeLong(buffer, (Long) value);
}
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer) throws IOException
{
return EncodingUtils.readLong(buffer);
}
@@ -659,12 +660,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
{
EncodingUtils.writeFloat(buffer, (Float) value);
}
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer) throws IOException
{
return EncodingUtils.readFloat(buffer);
}
@@ -698,12 +699,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, ByteBuffer buffer)
+ public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
{
EncodingUtils.writeDouble(buffer, (Double) value);
}
- public Object readValueFromBuffer(ByteBuffer buffer)
+ public Object readValueFromBuffer(DataInputStream buffer) throws IOException
{
return EncodingUtils.readDouble(buffer);
}
@@ -770,9 +771,9 @@ public enum AMQType
* @param value An instance of the type.
* @param buffer The byte buffer to write it to.
*/
- public void writeToBuffer(Object value, ByteBuffer buffer)
+ public void writeToBuffer(Object value, DataOutputStream buffer) throws IOException
{
- buffer.put(identifier());
+ buffer.writeByte(identifier());
writeValueImpl(value, buffer);
}
@@ -782,7 +783,7 @@ public enum AMQType
* @param value An instance of the type.
* @param buffer The byte buffer to write it to.
*/
- abstract void writeValueImpl(Object value, ByteBuffer buffer);
+ abstract void writeValueImpl(Object value, DataOutputStream buffer) throws IOException;
/**
* Reads an instance of the type from a specified byte buffer.
@@ -791,5 +792,5 @@ public enum AMQType
*
* @return An instance of the type.
*/
- abstract Object readValueFromBuffer(ByteBuffer buffer);
+ abstract Object readValueFromBuffer(DataInputStream buffer) throws IOException;
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java?rev=1167311&r1=1167310&r2=1167311&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java Fri Sep 9 17:47:22 2011
@@ -20,8 +20,9 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
-
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.util.Date;
import java.util.Map;
import java.math.BigDecimal;
@@ -60,7 +61,7 @@ public class AMQTypedValue
_value = type.toNativeValue(value);
}
- private AMQTypedValue(AMQType type, ByteBuffer buffer)
+ private AMQTypedValue(AMQType type, DataInputStream buffer) throws IOException
{
_type = type;
_value = type.readValueFromBuffer(buffer);
@@ -76,7 +77,7 @@ public class AMQTypedValue
return _value;
}
- public void writeToBuffer(ByteBuffer buffer)
+ public void writeToBuffer(DataOutputStream buffer) throws IOException
{
_type.writeToBuffer(_value, buffer);
}
@@ -86,9 +87,9 @@ public class AMQTypedValue
return _type.getEncodingSize(_value);
}
- public static AMQTypedValue readFromBuffer(ByteBuffer buffer)
+ public static AMQTypedValue readFromBuffer(DataInputStream buffer) throws IOException
{
- AMQType type = AMQTypeMap.getType(buffer.get());
+ AMQType type = AMQTypeMap.getType(buffer.readByte());
return new AMQTypedValue(type, buffer);
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org