You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2006/12/11 21:16:27 UTC
svn commit: r485852 - in /incubator/qpid/trunk/qpid/java/client/src:
main/java/org/apache/qpid/client/message/JMSStreamMessage.java
test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java
Author: rgreig
Date: Mon Dec 11 12:16:26 2006
New Revision: 485852
URL: http://svn.apache.org/viewvc?view=rev&rev=485852
Log:
QPID-102: add type conversion logic in StreamMessage.
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java?view=diff&rev=485852&r1=485851&r2=485852
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java Mon Dec 11 12:16:26 2006
@@ -20,16 +20,17 @@
*/
package org.apache.qpid.client.message;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.AMQException;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.ContentHeaderBody;
-import javax.jms.JMSException;
-import javax.jms.MessageEOFException;
-import javax.jms.MessageFormatException;
-import javax.jms.StreamMessage;
+import javax.jms.*;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.HashSet;
/**
* @author Apache Software Foundation
@@ -70,6 +71,69 @@
private static final byte STRING_TYPE = (byte) 10;
/**
+ * Maps from type on the wire to set of valid types that can be converted to
+ */
+ private static final Map<Byte, Set<Byte>> _typeConversionMap;
+
+ static
+ {
+ _typeConversionMap = new HashMap<Byte, Set<Byte>>();
+ Set<Byte> set = new HashSet<Byte>();
+ set.add(BOOLEAN_TYPE);
+ set.add(STRING_TYPE);
+ _typeConversionMap.put(BOOLEAN_TYPE, set);
+ set = new HashSet<Byte>();
+ set.add(BYTE_TYPE);
+ set.add(SHORT_TYPE);
+ set.add(INT_TYPE);
+ set.add(LONG_TYPE);
+ set.add(STRING_TYPE);
+ _typeConversionMap.put(BYTE_TYPE, set);
+ set = new HashSet<Byte>();
+ set.add(SHORT_TYPE);
+ set.add(INT_TYPE);
+ set.add(LONG_TYPE);
+ set.add(STRING_TYPE);
+ _typeConversionMap.put(SHORT_TYPE, set);
+ set = new HashSet<Byte>();
+ set.add(CHAR_TYPE);
+ set.add(STRING_TYPE);
+ _typeConversionMap.put(CHAR_TYPE, set);
+ set = new HashSet<Byte>();
+ set.add(INT_TYPE);
+ set.add(LONG_TYPE);
+ set.add(STRING_TYPE);
+ _typeConversionMap.put(INT_TYPE, set);
+ set = new HashSet<Byte>();
+ set.add(LONG_TYPE);
+ set.add(STRING_TYPE);
+ _typeConversionMap.put(LONG_TYPE, set);
+ set = new HashSet<Byte>();
+ set.add(FLOAT_TYPE);
+ set.add(DOUBLE_TYPE);
+ set.add(STRING_TYPE);
+ _typeConversionMap.put(FLOAT_TYPE, set);
+ set = new HashSet<Byte>();
+ set.add(DOUBLE_TYPE);
+ set.add(STRING_TYPE);
+ _typeConversionMap.put(DOUBLE_TYPE, set);
+ set = new HashSet<Byte>();
+ set.add(BOOLEAN_TYPE);
+ set.add(BYTE_TYPE);
+ set.add(SHORT_TYPE);
+ set.add(CHAR_TYPE);
+ set.add(INT_TYPE);
+ set.add(LONG_TYPE);
+ set.add(FLOAT_TYPE);
+ set.add(DOUBLE_TYPE);
+ set.add(STRING_TYPE);
+ _typeConversionMap.put(STRING_TYPE, set);
+ set = new HashSet<Byte>();
+ set.add(BYTEARRAY_TYPE);
+ _typeConversionMap.put(BYTEARRAY_TYPE, set);
+ }
+
+ /**
* This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read
* a byte array in multiple chunks, hence this is used to track how much is left to be read
*/
@@ -79,7 +143,7 @@
{
this(null);
}
-
+
/**
* Construct a stream message with existing data.
*
@@ -103,25 +167,38 @@
return MIME_TYPE;
}
- private void readAndCheckType(byte type) throws MessageFormatException
+ private byte readAndCheckType() throws MessageFormatException, MessageEOFException,
+ MessageNotReadableException
{
- if (_data.get() != type)
- {
- throw new MessageFormatException("Type " + _typeNames[type - 1] + " not found next in stream");
- }
+ checkReadable();
+ checkAvailable(1);
+ return _data.get();
}
- private void writeTypeDiscriminator(byte type)
+ private void writeTypeDiscriminator(byte type) throws MessageNotWriteableException
{
+ checkWritable();
_data.put(type);
}
public boolean readBoolean() throws JMSException
{
- checkReadable();
- checkAvailable(2);
- readAndCheckType(BOOLEAN_TYPE);
- return readBooleanImpl();
+ byte wireType = readAndCheckType();
+ boolean result = false;
+ switch (wireType)
+ {
+ case BOOLEAN_TYPE:
+ checkAvailable(1);
+ result = readBooleanImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Boolean.parseBoolean(readStringImpl());
+ break;
+ default:
+ throw new MessageFormatException("Unable to convert " + wireType + " to a boolean");
+ }
+ return result;
}
private boolean readBooleanImpl()
@@ -131,10 +208,22 @@
public byte readByte() throws JMSException
{
- checkReadable();
- checkAvailable(2);
- readAndCheckType(BYTE_TYPE);
- return readByteImpl();
+ byte wireType = readAndCheckType();
+ byte result = (byte)0;
+ switch (wireType)
+ {
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Byte.parseByte(readStringImpl());
+ break;
+ default:
+ throw new MessageFormatException("Unable to convert " + wireType + " to a byte");
+ }
+ return result;
}
private byte readByteImpl()
@@ -144,10 +233,26 @@
public short readShort() throws JMSException
{
- checkReadable();
- checkAvailable(3);
- readAndCheckType(SHORT_TYPE);
- return readShortImpl();
+ byte wireType = readAndCheckType();
+ short result = (short) 0;
+ switch (wireType)
+ {
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Short.parseShort(readStringImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ default:
+ throw new MessageFormatException("Unable to convert " + wireType + " to a short");
+ }
+ return result;
}
private short readShortImpl()
@@ -163,10 +268,16 @@
*/
public char readChar() throws JMSException
{
- checkReadable();
- checkAvailable(3);
- readAndCheckType(CHAR_TYPE);
- return readCharImpl();
+ byte wireType = readAndCheckType();
+ if (wireType != CHAR_TYPE)
+ {
+ throw new MessageFormatException("Unable to convert " + wireType + " to a char");
+ }
+ else
+ {
+ checkAvailable(2);
+ return readCharImpl();
+ }
}
private char readCharImpl()
@@ -176,10 +287,30 @@
public int readInt() throws JMSException
{
- checkReadable();
- checkAvailable(5);
- readAndCheckType(INT_TYPE);
- return readIntImpl();
+ byte wireType = readAndCheckType();
+ int result = 0;
+ switch (wireType)
+ {
+ case INT_TYPE:
+ checkAvailable(4);
+ result = readIntImpl();
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Integer.parseInt(readStringImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ default:
+ throw new MessageFormatException("Unable to convert " + wireType + " to an int");
+ }
+ return result;
}
private int readIntImpl()
@@ -189,10 +320,34 @@
public long readLong() throws JMSException
{
- checkReadable();
- checkAvailable(9);
- readAndCheckType(LONG_TYPE);
- return readLongImpl();
+ byte wireType = readAndCheckType();
+ long result = 0L;
+ switch (wireType)
+ {
+ case LONG_TYPE:
+ checkAvailable(8);
+ result = readLongImpl();
+ break;
+ case INT_TYPE:
+ checkAvailable(4);
+ result = readIntImpl();
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Long.parseLong(readStringImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ default:
+ throw new MessageFormatException("Unable to convert " + wireType + " to a long");
+ }
+ return result;
}
private long readLongImpl()
@@ -202,10 +357,22 @@
public float readFloat() throws JMSException
{
- checkReadable();
- checkAvailable(5);
- readAndCheckType(FLOAT_TYPE);
- return readFloatImpl();
+ byte wireType = readAndCheckType();
+ float result = 0f;
+ switch (wireType)
+ {
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = readFloatImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Float.parseFloat(readStringImpl());
+ break;
+ default:
+ throw new MessageFormatException("Unable to convert " + wireType + " to a float");
+ }
+ return result;
}
private float readFloatImpl()
@@ -215,10 +382,26 @@
public double readDouble() throws JMSException
{
- checkReadable();
- checkAvailable(9);
- readAndCheckType(DOUBLE_TYPE);
- return readDoubleImpl();
+ byte wireType = readAndCheckType();
+ double result = 0d;
+ switch (wireType)
+ {
+ case DOUBLE_TYPE:
+ checkAvailable(8);
+ result = readDoubleImpl();
+ break;
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = readFloatImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Double.parseDouble(readStringImpl());
+ break;
+ default:
+ throw new MessageFormatException("Unable to convert " + wireType + " to a double");
+ }
+ return result;
}
private double readDoubleImpl()
@@ -228,12 +411,50 @@
public String readString() throws JMSException
{
- checkReadable();
- // we check only for one byte plus the type byte since theoretically the string could be only a
- // single byte when using UTF-8 encoding
- checkAvailable(2);
- readAndCheckType(STRING_TYPE);
- return readStringImpl();
+ byte wireType = readAndCheckType();
+ String result = null;
+ switch (wireType)
+ {
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = readStringImpl();
+ break;
+ case BOOLEAN_TYPE:
+ checkAvailable(1);
+ result = String.valueOf(readBooleanImpl());
+ break;
+ case LONG_TYPE:
+ checkAvailable(8);
+ result = String.valueOf(readLongImpl());
+ break;
+ case INT_TYPE:
+ checkAvailable(4);
+ result = String.valueOf(readIntImpl());
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = String.valueOf(readShortImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = String.valueOf(readByteImpl());
+ break;
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = String.valueOf(readFloatImpl());
+ break;
+ case DOUBLE_TYPE:
+ checkAvailable(8);
+ result = String.valueOf(readDoubleImpl());
+ break;
+ case CHAR_TYPE:
+ checkAvailable(2);
+ result = String.valueOf(readCharImpl());
+ break;
+ default:
+ throw new MessageFormatException("Unable to convert " + wireType + " to a String");
+ }
+ return result;
}
private String readStringImpl() throws JMSException
@@ -260,9 +481,15 @@
// first call
if (_byteArrayRemaining == -1)
{
- // type discriminator plus array size
- checkAvailable(5);
- readAndCheckType(BYTEARRAY_TYPE);
+ // type discriminator checked separately so you get a MessageFormatException rather than
+ // an EOF even in the case where both would be applicable
+ checkAvailable(1);
+ byte wireType = readAndCheckType();
+ if (wireType != BYTEARRAY_TYPE)
+ {
+ throw new MessageFormatException("Unable to convert " + wireType + " to a byte array");
+ }
+ checkAvailable(4);
int size = _data.getInt();
// size of -1 indicates null
if (size == -1)
@@ -292,7 +519,7 @@
_byteArrayRemaining -= count;
if (_byteArrayRemaining == 0)
{
- _byteArrayRemaining = -1;
+ _byteArrayRemaining = -1;
}
if (count == 0)
{
@@ -307,16 +534,16 @@
public Object readObject() throws JMSException
{
- checkReadable();
- checkAvailable(1);
- byte type = _data.get();
+ byte wireType = readAndCheckType();
Object result = null;
- switch (type)
+ switch (wireType)
{
case BOOLEAN_TYPE:
+ checkAvailable(1);
result = readBooleanImpl();
break;
case BYTE_TYPE:
+ checkAvailable(1);
result = readByteImpl();
break;
case BYTEARRAY_TYPE:
@@ -334,24 +561,31 @@
}
break;
case SHORT_TYPE:
+ checkAvailable(2);
result = readShortImpl();
break;
case CHAR_TYPE:
+ checkAvailable(2);
result = readCharImpl();
break;
case INT_TYPE:
+ checkAvailable(4);
result = readIntImpl();
break;
case LONG_TYPE:
+ checkAvailable(8);
result = readLongImpl();
break;
case FLOAT_TYPE:
+ checkAvailable(4);
result = readFloatImpl();
break;
case DOUBLE_TYPE:
+ checkAvailable(8);
result = readDoubleImpl();
break;
case STRING_TYPE:
+ checkAvailable(1);
result = readStringImpl();
break;
}
@@ -360,63 +594,54 @@
public void writeBoolean(boolean b) throws JMSException
{
- checkWritable();
writeTypeDiscriminator(BOOLEAN_TYPE);
_data.put(b ? (byte) 1 : (byte) 0);
}
public void writeByte(byte b) throws JMSException
{
- checkWritable();
writeTypeDiscriminator(BYTE_TYPE);
_data.put(b);
}
public void writeShort(short i) throws JMSException
{
- checkWritable();
writeTypeDiscriminator(SHORT_TYPE);
_data.putShort(i);
}
public void writeChar(char c) throws JMSException
{
- checkWritable();
writeTypeDiscriminator(CHAR_TYPE);
_data.putChar(c);
}
public void writeInt(int i) throws JMSException
{
- checkWritable();
writeTypeDiscriminator(INT_TYPE);
_data.putInt(i);
}
public void writeLong(long l) throws JMSException
{
- checkWritable();
writeTypeDiscriminator(LONG_TYPE);
_data.putLong(l);
}
public void writeFloat(float v) throws JMSException
{
- checkWritable();
writeTypeDiscriminator(FLOAT_TYPE);
_data.putFloat(v);
}
public void writeDouble(double v) throws JMSException
{
- checkWritable();
writeTypeDiscriminator(DOUBLE_TYPE);
_data.putDouble(v);
}
public void writeString(String string) throws JMSException
{
- checkWritable();
writeTypeDiscriminator(STRING_TYPE);
try
{
@@ -434,13 +659,11 @@
public void writeBytes(byte[] bytes) throws JMSException
{
- checkWritable();
writeBytes(bytes, 0, bytes == null?0:bytes.length);
}
public void writeBytes(byte[] bytes, int offset, int length) throws JMSException
{
- checkWritable();
writeTypeDiscriminator(BYTEARRAY_TYPE);
if (bytes == null)
{
Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java?view=diff&rev=485852&r1=485851&r2=485852
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java Mon Dec 11 12:16:26 2006
@@ -24,10 +24,7 @@
import org.apache.qpid.client.message.JMSStreamMessage;
import org.apache.qpid.client.message.TestMessageHelper;
-import javax.jms.MessageNotReadableException;
-import javax.jms.MessageNotWriteableException;
-import javax.jms.MessageFormatException;
-import javax.jms.MessageEOFException;
+import javax.jms.*;
import java.util.HashMap;
/**
@@ -240,7 +237,7 @@
len = bm.readBytes(result);
assertEquals(1, len);
len = bm.readBytes(result);
- assertEquals(2, len);
+ assertEquals(2, len);
}
public void testEOFByte() throws Exception
@@ -418,13 +415,181 @@
fail("expected MessageEOFException, got " + e);
}
}
-
+
public void testToBodyStringWithNull() throws Exception
{
JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
bm.reset();
String result = bm.toBodyString();
assertNull(result);
+ }
+
+ private void checkConversionsFail(StreamMessage sm, int[] conversions) throws JMSException
+ {
+ for (int conversion : conversions)
+ {
+ try
+ {
+ switch (conversion)
+ {
+ case 0:
+ sm.readBoolean();
+ break;
+ case 1:
+ sm.readByte();
+ break;
+ case 2:
+ sm.readShort();
+ break;
+ case 3:
+ sm.readChar();
+ break;
+ case 4:
+ sm.readInt();
+ break;
+ case 5:
+ sm.readLong();
+ break;
+ case 6:
+ sm.readFloat();
+ break;
+ case 7:
+ sm.readDouble();
+ break;
+ case 8:
+ sm.readString();
+ break;
+ case 9:
+ sm.readBytes(new byte[3]);
+ break;
+ }
+ fail("MessageFormatException was not thrown");
+ }
+ catch (MessageFormatException e)
+ {
+ // PASS
+ }
+ sm.reset();
+ }
+ }
+ public void testBooleanConversions() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeBoolean(true);
+ bm.reset();
+ String result = bm.readString();
+ assertEquals("true", result);
+ bm.reset();
+ checkConversionsFail(bm, new int[]{1,2,3,4,5,6,7,9});
+ }
+
+ public void testByteConversions() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeByte((byte) 43);
+ bm.reset();
+ assertEquals(43, bm.readShort());
+ bm.reset();
+ assertEquals(43, bm.readInt());
+ bm.reset();
+ assertEquals(43, bm.readLong());
+ bm.reset();
+ String result = bm.readString();
+ assertEquals("43", result);
+ bm.reset();
+ checkConversionsFail(bm, new int[]{0, 3, 6, 7, 9});
+ }
+
+ public void testShortConversions() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeShort((short) 87);
+ bm.reset();
+ assertEquals(87, bm.readInt());
+ bm.reset();
+ assertEquals(87, bm.readLong());
+ bm.reset();
+ assertEquals("87", bm.readString());
+ bm.reset();
+ checkConversionsFail(bm, new int[]{0, 1, 3, 6, 7, });
+ }
+
+ public void testCharConversions() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeChar('d');
+ bm.reset();
+ assertEquals("d", bm.readString());
+ bm.reset();
+ checkConversionsFail(bm, new int[]{0, 1, 2, 4, 5, 6, 7, 9});
+ }
+
+ public void testIntConversions() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeInt(167);
+ bm.reset();
+ assertEquals(167, bm.readLong());
+ bm.reset();
+ assertEquals("167", bm.readString());
+ bm.reset();
+ checkConversionsFail(bm, new int[]{0, 1, 2, 3, 6, 7, 9});
+ }
+
+ public void testLongConversions() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeLong(1678);
+ bm.reset();
+ assertEquals("1678", bm.readString());
+ bm.reset();
+ checkConversionsFail(bm, new int[]{0, 1, 2, 3, 4, 6, 7, 9});
+ }
+
+ public void testFloatConversions() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeFloat(6.2f);
+ bm.reset();
+ assertEquals(6.2d, bm.readDouble(), 0.01);
+ bm.reset();
+ assertEquals("6.2", bm.readString());
+ bm.reset();
+ checkConversionsFail(bm, new int[]{0, 1, 2, 3, 4, 5, 9});
+ }
+
+ public void testDoubleConversions() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeDouble(88.35d);
+ bm.reset();
+ assertEquals("88.35", bm.readString());
+ bm.reset();
+ checkConversionsFail(bm, new int[]{0, 1, 2, 3, 4, 5, 6, 9});
+ }
+
+ public void testStringConversions() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeString("true");
+ bm.reset();
+ assertEquals(true, bm.readBoolean());
+ bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeString("2");
+ bm.reset();
+ assertEquals((byte)2, bm.readByte());
+ bm.reset();
+ assertEquals((short)2, bm.readShort());
+ bm.reset();
+ assertEquals((int)2, bm.readInt());
+ bm.reset();
+ assertEquals((long)2, bm.readLong());
+ bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeString("5.7");
+ bm.reset();
+ assertEquals(5.7f, bm.readFloat());
+ bm.reset();
+ assertEquals(5.7d, bm.readDouble());
}
public static junit.framework.Test suite()