You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2007/09/24 16:26:16 UTC
svn commit: r578827 - in /incubator/qpid/trunk/qpid/java:
client/src/main/java/org/apache/qpidity/nclient/
client/src/main/java/org/apache/qpidity/nclient/impl/ common/
common/src/main/java/org/apache/qpidity/
common/src/main/java/org/apache/qpidity/co...
Author: rhs
Date: Mon Sep 24 07:26:14 2007
New Revision: 578827
URL: http://svn.apache.org/viewvc?rev=578827&view=rev
Log:
added field table encoding/decoding
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
incubator/qpid/trunk/qpid/java/common/generate
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractEncoder.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/BBDecoder.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/BBEncoder.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/Decoder.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/Encoder.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/FragmentDecoder.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/SizeEncoder.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java Mon Sep 24 07:26:14 2007
@@ -260,7 +260,7 @@
* on the providers implementation.
*/
public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode,
- MessagePartListener listener, Map<String, ?> filter, Option... options);
+ MessagePartListener listener, Map<String, Object> filter, Option... options);
/**
* This method cancels a consumer. This does not affect already delivered messages, but it does
@@ -481,7 +481,7 @@
* {@link Option#EXCLUSIVE}, {@link Option#PASSIVE} and {@link Option#NO_OPTION})
* @see Option
*/
- public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments, Option... options);
+ public void queueDeclare(String queueName, String alternateExchange, Map<String, Object> arguments, Option... options);
/**
* Bind a queue with an exchange.
@@ -498,7 +498,7 @@
* routing keys depends on the exchange implementation.
* @param arguments Used for backward compatibility
*/
- public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments);
+ public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, Object> arguments);
/**
* Unbind a queue from an exchange.
@@ -508,7 +508,7 @@
* @param routingKey Specifies the routing key of the binding to unbind.
* @param arguments Used for backward compatibility
*/
- public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments);
+ public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, Object> arguments);
/**
* This method removes all messages from a queue. It does not cancel consumers. Purged messages
@@ -573,7 +573,7 @@
* @param arguments Used for backward compatibility
* @see Option
*/
- public void exchangeDeclare(String exchangeName, String type, String alternateExchange, Map<String, ?> arguments,
+ public void exchangeDeclare(String exchangeName, String type, String alternateExchange, Map<String, Object> arguments,
Option... options);
/**
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java Mon Sep 24 07:26:14 2007
@@ -35,7 +35,7 @@
}
}
- public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, MessagePartListener listener, Map<String, ?> filter, Option... options)
+ public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, MessagePartListener listener, Map<String, Object> filter, Option... options)
{
setMessageListener(destination,listener);
super.messageSubscribe(queue, destination, confirmMode, acquireMode, filter, options);
Modified: incubator/qpid/trunk/qpid/java/common/generate
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/generate?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/generate (original)
+++ incubator/qpid/trunk/qpid/java/common/generate Mon Sep 24 07:26:14 2007
@@ -21,6 +21,7 @@
self.line("package %s;" % self.package)
self.line()
+ self.line("import java.util.List;")
self.line("import java.util.Map;")
self.line("import java.util.UUID;")
self.line()
@@ -54,12 +55,55 @@
"short": "int",
"octet": "short",
"bit": "boolean",
- "table": "Map<String,?>",
+ "table": "Map<String,Object>",
"timestamp": "long",
"content": "String",
"uuid": "UUID",
"rfc1982-long-set": "RangeSet",
- "long-struct": "Struct"
+ "long-struct": "Struct",
+ "signed-byte": "byte",
+ "unsigned-byte": "short",
+ "char": "char",
+ "boolean": "boolean",
+ "two-octets": "short",
+ "signed-short": "short",
+ "unsigned-short": "int",
+ "four-octets": "int",
+ "signed-int": "int",
+ "unsigned-int": "long",
+ "float": "float",
+ "utf32-char": "char",
+ "eight-octets": "long",
+ "signed-long": "long",
+ "unsigned-long": "long",
+ "double": "double",
+ "datetime": "long",
+ "sixteen-octets": "byte[]",
+ "thirty-two-octets": "byte[]",
+ "sixty-four-octets": "byte[]",
+ "_128-octets": "byte[]",
+ "short-binary": "byte[]",
+ "short-string": "String",
+ "short-utf8-string": "String",
+ "short-utf16-string": "String",
+ "short-utf32-string": "String",
+ "binary": "byte[]",
+ "string": "String",
+ "utf8-string": "String",
+ "utf16-string": "String",
+ "utf32-string": "String",
+ "long-binary": "byte[]",
+ "long-string": "String",
+ "long-utf8-string": "String",
+ "long-utf16-string": "String",
+ "long-utf32-string": "String",
+ "sequence": "List<Object>",
+ "array": "List<Object>",
+ "five-octets": "byte[]",
+ "decimal": "byte[]",
+ "nine-octets": "byte[]",
+ "long-decimal": "byte[]",
+ "void": "Void"
}
TRACKS = {
@@ -80,6 +124,54 @@
def scream(*args):
return "_".join([a.replace("-", "_").upper() for a in args])
+
+
+types = Output(out_dir, out_pkg, "Type")
+types.line("public enum Type")
+types.line("{")
+codes = {}
+for c in spec.query["amqp/constant"]:
+ if c["@class"] == "field-table-type":
+ name = c["@name"]
+ if name.startswith("field-table-"):
+ name = name[12:]
+ if name[0].isdigit():
+ name = "_" + name
+ val = c["@value"]
+ codes[val] = name
+ if c["@width"] != None:
+ width = c["@width"]
+ fixed = "true"
+ if c["@lfwidth"] != None:
+ width = c["@lfwidth"]
+ fixed = "false"
+ types.line(" %s((byte) %s, %s, %s)," %
+ (scream(name), val, width, fixed))
+types.line(" ;")
+
+types.line(" public byte code;")
+types.line(" public int width;")
+types.line(" public boolean fixed;")
+
+types.line(" Type(byte code, int width, boolean fixed)")
+types.line(" {")
+for arg in ("code", "width", "fixed"):
+ types.line(" this.%s = %s;" % (arg, arg))
+types.line(" }")
+
+types.line(" public static Type get(byte code)")
+types.line(" {")
+types.line(" switch (code)")
+types.line(" {")
+for code, name in codes.items():
+ types.line(" case (byte) %s: return %s;" % (code, scream(name)))
+types.line(" default: return null;")
+types.line(" }")
+types.line(" }")
+
+types.line("}")
+types.write()
+
const = Output(out_dir, out_pkg, "Constant")
const.line("public interface Constant")
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java Mon Sep 24 07:26:14 2007
@@ -122,6 +122,12 @@
{
System.out.println("received headers routing_key " + props.getRoutingKey());
}
+ MessageProperties mp = header.get(MessageProperties.class);
+ System.out.println("MP: " + mp);
+ if (mp != null)
+ {
+ System.out.println(mp.getApplicationHeaders());
+ }
this.header = header;
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java Mon Sep 24 07:26:14 2007
@@ -20,6 +20,8 @@
*/
package org.apache.qpidity;
+import java.util.*;
+
import org.apache.qpidity.transport.*;
import org.apache.qpidity.transport.network.mina.MinaHandler;
@@ -78,9 +80,32 @@
ssn.queueDeclare("asdf", null, null);
ssn.sync();
+ Map<String,Object> nested = new LinkedHashMap<String,Object>();
+ nested.put("list", Arrays.asList("one", "two", "three"));
+ Map<String,Object> map = new LinkedHashMap<String,Object>();
+
+ map.put("str", "this is a string");
+
+ map.put("+int", 3);
+ map.put("-int", -3);
+ map.put("maxint", Integer.MAX_VALUE);
+ map.put("minint", Integer.MIN_VALUE);
+
+ map.put("+short", (short) 1);
+ map.put("-short", (short) -1);
+ map.put("maxshort", (short) Short.MAX_VALUE);
+ map.put("minshort", (short) Short.MIN_VALUE);
+
+ map.put("float", (float) 3.3);
+ map.put("double", 4.9);
+ map.put("char", 'c');
+
+ map.put("table", nested);
+ map.put("list", Arrays.asList(1, 2, 3));
+
ssn.messageTransfer("asdf", (short) 0, (short) 1);
ssn.header(new DeliveryProperties(),
- new MessageProperties());
+ new MessageProperties().setApplicationHeaders(map));
ssn.data("this is the data");
ssn.endData();
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java Mon Sep 24 07:26:14 2007
@@ -20,11 +20,15 @@
*/
package org.apache.qpidity.codec;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.qpidity.transport.RangeSet;
import org.apache.qpidity.transport.Struct;
+import org.apache.qpidity.transport.Type;
import static org.apache.qpidity.transport.util.Functions.*;
@@ -40,20 +44,37 @@
private final byte major;
private final byte minor;
+ private int count;
protected AbstractDecoder(byte major, byte minor)
{
this.major = major;
this.minor = minor;
+ this.count = 0;
}
- protected abstract byte get();
+ protected abstract byte doGet();
- protected abstract void get(byte[] bytes);
+ protected abstract void doGet(byte[] bytes);
+
+ protected byte get()
+ {
+ clearBits();
+ byte b = doGet();
+ count += 1;
+ return b;
+ }
+
+ protected void get(byte[] bytes)
+ {
+ clearBits();
+ doGet(bytes);
+ count += bytes.length;
+ }
protected short uget()
{
- return unsigned(get());
+ return (short) (0xFF & get());
}
private byte bits = 0x0;
@@ -81,13 +102,11 @@
public short readOctet()
{
- clearBits();
return uget();
}
public int readShort()
{
- clearBits();
int i = uget() << 8;
i |= uget();
return i;
@@ -95,7 +114,6 @@
public long readLong()
{
- clearBits();
long l = uget() << 24;
l |= uget() << 16;
l |= uget() << 8;
@@ -105,15 +123,11 @@
public long readLonglong()
{
- clearBits();
- long l = uget() << 56;
- l |= uget() << 48;
- l |= uget() << 40;
- l |= uget() << 32;
- l |= uget() << 24;
- l |= uget() << 16;
- l |= uget() << 8;
- l |= uget();
+ long l = 0;
+ for (int i = 0; i < 8; i++)
+ {
+ l |= ((long) (0xFF & get())) << (56 - i*8);
+ }
return l;
}
@@ -134,18 +148,11 @@
public String readLongstr()
{
long size = readLong();
- assert size <= Integer.MAX_VALUE;
byte[] bytes = new byte[(int) size];
get(bytes);
return new String(bytes);
}
- public Map<String,?> readTable()
- {
- //throw new Error("TODO");
- return null;
- }
-
public RangeSet readRfc1982LongSet()
{
int count = readShort()/8;
@@ -189,6 +196,175 @@
Struct result = Struct.create(type);
result.read(this, major, minor);
return result;
+ }
+ }
+
+ public Map<String,Object> readTable()
+ {
+ long size = readLong();
+ int start = count;
+ Map<String,Object> result = new LinkedHashMap();
+ while (count < start + size)
+ {
+ String key = readShortstr();
+ byte code = get();
+ Type t = Type.get(code);
+ Object value = read(t);
+ result.put(key, value);
+ }
+ return result;
+ }
+
+ public List<Object> readSequence()
+ {
+ long size = readLong();
+ int start = count;
+ List<Object> result = new ArrayList();
+ while (count < start + size)
+ {
+ byte code = get();
+ Type t = Type.get(code);
+ Object value = read(t);
+ result.add(value);
+ }
+ return result;
+ }
+
+ public List<Object> readArray()
+ {
+ long size = readLong();
+ byte code = get();
+ Type t = Type.get(code);
+ long count = readLong();
+
+ List<Object> result = new ArrayList<Object>();
+ for (int i = 0; i < count; i++)
+ {
+ Object value = read(t);
+ result.add(value);
+ }
+ return result;
+ }
+
+ private long readSize(Type t)
+ {
+ if (t.fixed)
+ {
+ return t.width;
+ }
+ else
+ {
+ switch (t.width)
+ {
+ case 1:
+ return readOctet();
+ case 2:
+ return readShort();
+ case 4:
+ return readLong();
+ default:
+ throw new IllegalStateException("irregular width: " + t);
+ }
+ }
+ }
+
+ private byte[] readBytes(Type t)
+ {
+ long size = readSize(t);
+ byte[] result = new byte[(int) size];
+ get(result);
+ return result;
+ }
+
+ private Object read(Type t)
+ {
+ switch (t)
+ {
+ case OCTET:
+ case UNSIGNED_BYTE:
+ return readOctet();
+ case SIGNED_BYTE:
+ return get();
+ case CHAR:
+ return (char) get();
+ case BOOLEAN:
+ return get() > 0;
+
+ case TWO_OCTETS:
+ case UNSIGNED_SHORT:
+ return readShort();
+
+ case SIGNED_SHORT:
+ return (short) readShort();
+
+ case FOUR_OCTETS:
+ case UNSIGNED_INT:
+ return readLong();
+
+ case UTF32_CHAR:
+ case SIGNED_INT:
+ return (int) readLong();
+
+ case FLOAT:
+ return Float.intBitsToFloat((int) readLong());
+
+ case EIGHT_OCTETS:
+ case SIGNED_LONG:
+ case UNSIGNED_LONG:
+ case DATETIME:
+ return readLonglong();
+
+ case DOUBLE:
+ long bits = readLonglong();
+ System.out.println("double in: " + bits);
+ return Double.longBitsToDouble(bits);
+
+ case SIXTEEN_OCTETS:
+ case THIRTY_TWO_OCTETS:
+ case SIXTY_FOUR_OCTETS:
+ case _128_OCTETS:
+ case SHORT_BINARY:
+ case BINARY:
+ case LONG_BINARY:
+ return readBytes(t);
+
+ case UUID:
+ return readUuid();
+
+ case SHORT_STRING:
+ case SHORT_UTF8_STRING:
+ case SHORT_UTF16_STRING:
+ case SHORT_UTF32_STRING:
+ case STRING:
+ case UTF8_STRING:
+ case UTF16_STRING:
+ case UTF32_STRING:
+ case LONG_STRING:
+ case LONG_UTF8_STRING:
+ case LONG_UTF16_STRING:
+ case LONG_UTF32_STRING:
+ // XXX: need to do character conversion
+ return new String(readBytes(t));
+
+ case TABLE:
+ return readTable();
+ case SEQUENCE:
+ return readSequence();
+ case ARRAY:
+ return readArray();
+
+ case FIVE_OCTETS:
+ case DECIMAL:
+ case NINE_OCTETS:
+ case LONG_DECIMAL:
+ // XXX: what types are we supposed to use here?
+ return readBytes(t);
+
+ case VOID:
+ return null;
+
+ default:
+ return readBytes(t);
}
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractEncoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractEncoder.java?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractEncoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractEncoder.java Mon Sep 24 07:26:14 2007
@@ -22,12 +22,15 @@
import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
import org.apache.qpidity.transport.Range;
import org.apache.qpidity.transport.RangeSet;
import org.apache.qpidity.transport.Struct;
+import org.apache.qpidity.transport.Type;
import static org.apache.qpidity.transport.util.Functions.*;
@@ -41,18 +44,57 @@
abstract class AbstractEncoder implements Encoder
{
+ private static Map<Class<?>,Type> ENCODINGS = new HashMap<Class<?>,Type>();
+ static
+ {
+ ENCODINGS.put(String.class, Type.LONG_STRING);
+ ENCODINGS.put(Long.class, Type.SIGNED_LONG);
+ ENCODINGS.put(Integer.class, Type.SIGNED_INT);
+ ENCODINGS.put(Short.class, Type.SIGNED_SHORT);
+ ENCODINGS.put(Byte.class, Type.SIGNED_BYTE);
+ ENCODINGS.put(Map.class, Type.TABLE);
+ ENCODINGS.put(List.class, Type.SEQUENCE);
+ ENCODINGS.put(Float.class, Type.FLOAT);
+ ENCODINGS.put(Double.class, Type.DOUBLE);
+ ENCODINGS.put(Character.class, Type.CHAR);
+ }
+
private final byte major;
private final byte minor;
+ private final boolean calcsize;
- protected AbstractEncoder(byte major, byte minor)
+ protected AbstractEncoder(byte major, byte minor, boolean calcsize)
{
this.major = major;
this.minor = minor;
+ this.calcsize = calcsize;
+ }
+
+ protected AbstractEncoder(byte major, byte minor)
+ {
+ this(major, minor, true);
}
- protected abstract void put(byte b);
+ protected abstract void doPut(byte b);
- protected abstract void put(ByteBuffer src);
+ protected abstract void doPut(ByteBuffer src);
+
+ protected void put(byte b)
+ {
+ flushBits();
+ doPut(b);
+ }
+
+ protected void put(ByteBuffer src)
+ {
+ flushBits();
+ doPut(src);
+ }
+
+ protected void put(byte[] bytes)
+ {
+ put(ByteBuffer.wrap(bytes));
+ }
private byte bits = 0x0;
private byte nbits = 0;
@@ -76,17 +118,21 @@
{
if (nbits > 0)
{
- put(bits);
+ doPut(bits);
bits = 0x0;
nbits = 0;
}
}
+ public void flush()
+ {
+ flushBits();
+ }
+
public void writeOctet(short b)
{
assert b < 0x100;
- flushBits();
put((byte) b);
}
@@ -94,7 +140,6 @@
{
assert s < 0x10000;
- flushBits();
put(lsb(s >>> 8));
put(lsb(s));
}
@@ -103,7 +148,6 @@
{
assert i < 0x100000000L;
- flushBits();
put(lsb(i >>> 24));
put(lsb(i >>> 16));
put(lsb(i >>> 8));
@@ -112,21 +156,15 @@
public void writeLonglong(long l)
{
- flushBits();
- put(lsb(l >>> 56));
- put(lsb(l >>> 48));
- put(lsb(l >>> 40));
- put(lsb(l >>> 32));
- put(lsb(l >>> 24));
- put(lsb(l >>> 16));
- put(lsb(l >>> 8));
- put(lsb(l));
+ for (int i = 0; i < 8; i++)
+ {
+ put(lsb(l >> (56 - i*8)));
+ }
}
public void writeTimestamp(long l)
{
- flushBits();
writeLonglong(l);
}
@@ -149,11 +187,6 @@
}
- public void writeTable(Map<String,?> table)
- {
- //throw new Error("TODO");
- }
-
public void writeRfc1982LongSet(RangeSet ranges)
{
if (ranges == null)
@@ -197,19 +230,330 @@
}
else
{
- SizeEncoder sizer = new SizeEncoder(major, minor);
- sizer.writeShort(s.getEncodedType());
- s.write(sizer, major, minor);
+ int size = 0;
+ if (calcsize)
+ {
+ SizeEncoder sizer = new SizeEncoder(major, minor);
+ sizer.writeShort(s.getEncodedType());
+ s.write(sizer, major, minor);
+ size = sizer.getSize();
+ }
- writeLong(sizer.getSize());
+ writeLong(size);
writeShort(s.getEncodedType());
s.write(this, major, minor);
}
}
- public void flush()
+ private Type encoding(Object value)
{
- flushBits();
+ if (value == null)
+ {
+ return Type.VOID;
+ }
+
+ Class klass = value.getClass();
+ Type type = resolve(klass);
+
+ if (type == null)
+ {
+ throw new IllegalArgumentException
+ ("unable to resolve type: " + klass + ", " + value);
+ }
+ else
+ {
+ return type;
+ }
+ }
+
+ private Type resolve(Class klass)
+ {
+ Type type = ENCODINGS.get(klass);
+ if (type != null)
+ {
+ return type;
+ }
+
+ Class sup = klass.getSuperclass();
+ if (sup != null)
+ {
+ type = resolve(klass.getSuperclass());
+
+ if (type != null)
+ {
+ return type;
+ }
+ }
+
+ for (Class iface : klass.getInterfaces())
+ {
+ type = resolve(iface);
+ if (type != null)
+ {
+ return type;
+ }
+ }
+
+ return null;
+ }
+
+ public void writeTable(Map<String,Object> table)
+ {
+ if (table == null)
+ {
+ writeLong(0);
+ return;
+ }
+
+ int size = 0;
+ if (calcsize)
+ {
+ SizeEncoder sizer = new SizeEncoder(major, minor);
+ sizer.writeTableEntries(table);
+ size = sizer.getSize();
+ }
+
+ writeLong(size);
+ writeTableEntries(table);
+ }
+
+ protected void writeTableEntries(Map<String,Object> table)
+ {
+ for (Map.Entry<String,Object> entry : table.entrySet())
+ {
+ String key = entry.getKey();
+ Object value = entry.getValue();
+ Type type = encoding(value);
+ writeShortstr(key);
+ put(type.code);
+ write(type, value);
+ }
+ }
+
+ public void writeSequence(List<Object> sequence)
+ {
+ int size = 0;
+ if (calcsize)
+ {
+ SizeEncoder sizer = new SizeEncoder(major, minor);
+ sizer.writeSequenceEntries(sequence);
+ size = sizer.getSize();
+ }
+
+ writeLong(size);
+ writeSequenceEntries(sequence);
+ }
+
+ protected void writeSequenceEntries(List<Object> sequence)
+ {
+ for (Object value : sequence)
+ {
+ Type type = encoding(value);
+ put(type.code);
+ write(type, value);
+ }
+ }
+
+ public void writeArray(List<Object> array)
+ {
+ int size = 0;
+ if (calcsize)
+ {
+ SizeEncoder sizer = new SizeEncoder(major, minor);
+ sizer.writeArrayEntries(array);
+ size = sizer.getSize();
+ }
+
+ writeLong(size);
+ writeArrayEntries(array);
+ }
+
+ protected void writeArrayEntries(List<Object> array)
+ {
+ Type type;
+
+ if (array.isEmpty())
+ {
+ type = Type.VOID;
+ }
+ else
+ {
+ type = encoding(array.get(0));
+ }
+
+ put(type.code);
+
+ for (Object value : array)
+ {
+ write(type, value);
+ }
+ }
+
+ private void writeSize(Type t, int size)
+ {
+ if (t.fixed)
+ {
+ if (size != t.width)
+ {
+ throw new IllegalArgumentException
+ ("size does not match fixed width " + t.width + ": " +
+ size);
+ }
+ }
+ else
+ {
+ // XXX: should check lengths
+ switch (t.width)
+ {
+ case 1:
+ writeOctet((short) size);
+ break;
+ case 2:
+ writeShort(size);
+ break;
+ case 4:
+ writeLong(size);
+ break;
+ default:
+ throw new IllegalStateException("irregular width: " + t);
+ }
+ }
+ }
+
+ private void writeBytes(Type t, byte[] bytes)
+ {
+ writeSize(t, bytes.length);
+ put(bytes);
+ }
+
+ private <T> T coerce(Class<T> klass, Object value)
+ {
+ if (klass.isInstance(value))
+ {
+ return klass.cast(value);
+ }
+ else
+ {
+ throw new IllegalArgumentException("" + value);
+ }
+ }
+
+ private void write(Type t, Object value)
+ {
+ switch (t)
+ {
+ case OCTET:
+ case UNSIGNED_BYTE:
+ writeOctet(coerce(Short.class, value));
+ break;
+ case SIGNED_BYTE:
+ put(coerce(Byte.class, value));
+ break;
+ case CHAR:
+ put((byte) ((char)coerce(Character.class, value)));
+ break;
+ case BOOLEAN:
+ if (coerce(Boolean.class, value))
+ {
+ put((byte) 1);
+ }
+ else
+ {
+ put((byte) 0);
+ }
+ break;
+
+ case TWO_OCTETS:
+ case UNSIGNED_SHORT:
+ writeShort(coerce(Integer.class, value));
+ break;
+
+ case SIGNED_SHORT:
+ writeShort(coerce(Short.class, value));
+ break;
+
+ case FOUR_OCTETS:
+ case UNSIGNED_INT:
+ writeLong(coerce(Long.class, value));
+ break;
+
+ case UTF32_CHAR:
+ case SIGNED_INT:
+ writeLong(coerce(Integer.class, value));
+ break;
+
+ case FLOAT:
+ writeLong(Float.floatToIntBits(coerce(Float.class, value)));
+ break;
+
+ case EIGHT_OCTETS:
+ case SIGNED_LONG:
+ case UNSIGNED_LONG:
+ case DATETIME:
+ writeLonglong(coerce(Long.class, value));
+ break;
+
+ case DOUBLE:
+ long bits = Double.doubleToLongBits(coerce(Double.class, value));
+ System.out.println("double out: " + bits);
+ writeLonglong(bits);
+ break;
+
+ case SIXTEEN_OCTETS:
+ case THIRTY_TWO_OCTETS:
+ case SIXTY_FOUR_OCTETS:
+ case _128_OCTETS:
+ case SHORT_BINARY:
+ case BINARY:
+ case LONG_BINARY:
+ writeBytes(t, coerce(byte[].class, value));
+ break;
+
+ case UUID:
+ writeUuid(coerce(UUID.class, value));
+ break;
+
+ case SHORT_STRING:
+ case SHORT_UTF8_STRING:
+ case SHORT_UTF16_STRING:
+ case SHORT_UTF32_STRING:
+ case STRING:
+ case UTF8_STRING:
+ case UTF16_STRING:
+ case UTF32_STRING:
+ case LONG_STRING:
+ case LONG_UTF8_STRING:
+ case LONG_UTF16_STRING:
+ case LONG_UTF32_STRING:
+ // XXX: need to do character conversion
+ writeBytes(t, coerce(String.class, value).getBytes());
+ break;
+
+ case TABLE:
+ writeTable((Map<String,Object>) coerce(Map.class, value));
+ break;
+ case SEQUENCE:
+ writeSequence(coerce(List.class, value));
+ break;
+ case ARRAY:
+ writeArray(coerce(List.class, value));
+ break;
+
+ case FIVE_OCTETS:
+ case DECIMAL:
+ case NINE_OCTETS:
+ case LONG_DECIMAL:
+ // XXX: what types are we supposed to use here?
+ writeBytes(t, coerce(byte[].class, value));
+ break;
+
+ case VOID:
+ break;
+
+ default:
+ writeBytes(t, coerce(byte[].class, value));
+ break;
+ }
}
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/BBDecoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/BBDecoder.java?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/BBDecoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/BBDecoder.java Mon Sep 24 07:26:14 2007
@@ -40,12 +40,12 @@
this.in = in;
}
- protected byte get()
+ protected byte doGet()
{
return in.get();
}
- protected void get(byte[] bytes)
+ protected void doGet(byte[] bytes)
{
in.get(bytes);
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/BBEncoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/BBEncoder.java?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/BBEncoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/BBEncoder.java Mon Sep 24 07:26:14 2007
@@ -39,12 +39,12 @@
this.out = out;
}
- @Override protected void put(byte b)
+ protected void doPut(byte b)
{
out.put(b);
}
- @Override protected void put(ByteBuffer src)
+ protected void doPut(ByteBuffer src)
{
out.put(src);
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/Decoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/Decoder.java?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/Decoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/Decoder.java Mon Sep 24 07:26:14 2007
@@ -20,6 +20,7 @@
*/
package org.apache.qpidity.codec;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -47,12 +48,15 @@
String readShortstr();
String readLongstr();
- Map<String,?> readTable();
RangeSet readRfc1982LongSet();
UUID readUuid();
String readContent();
Struct readLongStruct();
+
+ Map<String,Object> readTable();
+ List<Object> readSequence();
+ List<Object> readArray();
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/Encoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/Encoder.java?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/Encoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/Encoder.java Mon Sep 24 07:26:14 2007
@@ -20,6 +20,7 @@
*/
package org.apache.qpidity.codec;
+import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -36,6 +37,8 @@
public interface Encoder
{
+ void flush();
+
void writeBit(boolean b);
void writeOctet(short b);
void writeShort(int s);
@@ -47,14 +50,15 @@
void writeShortstr(String s);
void writeLongstr(String s);
- void writeTable(Map<String,?> table);
void writeRfc1982LongSet(RangeSet ranges);
void writeUuid(UUID uuid);
void writeContent(String c);
- void flush();
-
void writeLongStruct(Struct s);
+
+ void writeTable(Map<String,Object> table);
+ void writeSequence(List<Object> sequence);
+ void writeArray(List<Object> array);
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/FragmentDecoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/FragmentDecoder.java?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/FragmentDecoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/FragmentDecoder.java Mon Sep 24 07:26:14 2007
@@ -87,7 +87,7 @@
}
}
- @Override protected byte get()
+ protected byte doGet()
{
preRead();
byte b = current.get();
@@ -95,7 +95,7 @@
return b;
}
- @Override protected void get(byte[] bytes)
+ protected void doGet(byte[] bytes)
{
int remaining = bytes.length;
while (remaining > 0)
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/SizeEncoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/SizeEncoder.java?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/SizeEncoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/SizeEncoder.java Mon Sep 24 07:26:14 2007
@@ -41,7 +41,7 @@
}
public SizeEncoder(byte major, byte minor, int size) {
- super(major, minor);
+ super(major, minor, false);
this.size = size;
}
@@ -53,31 +53,14 @@
this.size = size;
}
- @Override protected void put(byte b)
+ protected void doPut(byte b)
{
size += 1;
}
- @Override protected void put(ByteBuffer src)
+ protected void doPut(ByteBuffer src)
{
size += src.remaining();
- }
-
- @Override public void writeShortstr(String s)
- {
- if (s == null) { s = ""; }
- if (s.length() > 255) {
- throw new IllegalArgumentException(s);
- }
- writeOctet((byte) s.length());
- size += s.length();
- }
-
- @Override public void writeLongstr(String s)
- {
- if (s == null) { s = ""; }
- writeLong(s.length());
- size += s.length();
}
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java Mon Sep 24 07:26:14 2007
@@ -106,7 +106,7 @@
public void error(Void v, ProtocolError error)
{
- error.delegate(session, sessionDelegate);
+ throw new RuntimeException(error.getMessage());
}
public void closed()
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java Mon Sep 24 07:26:14 2007
@@ -117,7 +117,7 @@
// need error handling
}
- Map<String,?> props = new HashMap<String,String>();
+ Map<String,Object> props = new HashMap<String,Object>();
context.connectionStartOk(props, mechanism, response, _locale);
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java Mon Sep 24 07:26:14 2007
@@ -27,6 +27,8 @@
import org.apache.qpidity.transport.ProtocolHeader;
import org.apache.qpidity.transport.Receiver;
+import static org.apache.qpidity.transport.util.Functions.*;
+
import static org.apache.qpidity.transport.network.InputHandler.State.*;
@@ -147,12 +149,20 @@
type = buf.get();
return FRAME_HDR_SIZE1;
case FRAME_HDR_SIZE1:
- size = buf.get() << 8;
+ size = (0xFF & buf.get()) << 8;
return FRAME_HDR_SIZE2;
case FRAME_HDR_SIZE2:
- size += buf.get();
+ size += 0xFF & buf.get();
size -= 12;
- return FRAME_HDR_RSVD1;
+ if (size < 0 || size > (64*1024 - 12))
+ {
+ error("bad frame size: %d", size);
+ return ERROR;
+ }
+ else
+ {
+ return FRAME_HDR_RSVD1;
+ }
case FRAME_HDR_RSVD1:
return expect(buf, 0, FRAME_HDR_TRACK);
case FRAME_HDR_TRACK:
@@ -166,10 +176,10 @@
return FRAME_HDR_CH1;
}
case FRAME_HDR_CH1:
- channel = buf.get() << 8;
+ channel = (0xFF & buf.get()) << 8;
return FRAME_HDR_CH2;
case FRAME_HDR_CH2:
- channel += buf.get();
+ channel += 0xFF & buf.get();
return FRAME_HDR_RSVD2;
case FRAME_HDR_RSVD2:
return expect(buf, 0, FRAME_HDR_RSVD3);
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java Mon Sep 24 07:26:14 2007
@@ -32,21 +32,6 @@
public class Functions
{
- public static final short unsigned(byte b)
- {
- return (short) ((0x100 + b) & 0xFF);
- }
-
- public static final int unsigned(short s)
- {
- return (0x10000 + s) & 0xFFFF;
- }
-
- public static final long unsigned(int i)
- {
- return (0x1000000000L + i) & 0xFFFFFFFFL;
- }
-
public static final byte lsb(int i)
{
return (byte) (0xFF & i);
@@ -65,13 +50,13 @@
public static final String str(ByteBuffer buf, int limit)
{
StringBuilder str = new StringBuilder();
- for (int i = 0; i < limit; i++)
+ for (int i = 0; i < buf.remaining(); i++)
{
if (i > 0 && i % 2 == 0)
{
str.append(" ");
}
- str.append(String.format("%02x", buf.get(i)));
+ str.append(String.format("%02x", buf.get(buf.position() + i)));
}
return str.toString();