You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2007/09/24 16:26:16 UTC

svn commit: r578827 - in /incubator/qpid/trunk/qpid/java: client/src/main/java/org/apache/qpidity/nclient/ client/src/main/java/org/apache/qpidity/nclient/impl/ common/ common/src/main/java/org/apache/qpidity/ common/src/main/java/org/apache/qpidity/co...

Author: rhs
Date: Mon Sep 24 07:26:14 2007
New Revision: 578827

URL: http://svn.apache.org/viewvc?rev=578827&view=rev
Log:
added field table encoding/decoding

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
    incubator/qpid/trunk/qpid/java/common/generate
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractEncoder.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/BBDecoder.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/BBEncoder.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/Decoder.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/Encoder.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/FragmentDecoder.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/SizeEncoder.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
    incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java Mon Sep 24 07:26:14 2007
@@ -260,7 +260,7 @@
      *                    on the providers implementation.
      */
     public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode,
-                                 MessagePartListener listener, Map<String, ?> filter, Option... options);
+                                 MessagePartListener listener, Map<String, Object> filter, Option... options);
 
     /**
      * This method cancels a consumer. This does not affect already delivered messages, but it does
@@ -481,7 +481,7 @@
      *                          {@link Option#EXCLUSIVE}, {@link Option#PASSIVE} and  {@link Option#NO_OPTION})
      * @see Option
      */
-    public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments, Option... options);
+    public void queueDeclare(String queueName, String alternateExchange, Map<String, Object> arguments, Option... options);
 
     /**
      * Bind a queue with an exchange.
@@ -498,7 +498,7 @@
      *                     routing keys depends on the exchange implementation.
      * @param arguments    Used for backward compatibility
      */
-    public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments);
+    public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, Object> arguments);
 
     /**
      * Unbind a queue from an exchange.
@@ -508,7 +508,7 @@
      * @param routingKey   Specifies the routing key of the binding to unbind.
      * @param arguments    Used for backward compatibility
      */
-    public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments);
+    public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, Object> arguments);
 
     /**
      * This method removes all messages from a queue. It does not cancel consumers. Purged messages
@@ -573,7 +573,7 @@
      * @param arguments         Used for backward compatibility
      * @see Option
      */
-    public void exchangeDeclare(String exchangeName, String type, String alternateExchange, Map<String, ?> arguments,
+    public void exchangeDeclare(String exchangeName, String type, String alternateExchange, Map<String, Object> arguments,
                                 Option... options);
 
     /**

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java Mon Sep 24 07:26:14 2007
@@ -35,7 +35,7 @@
         }
     }
 
-    public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, MessagePartListener listener, Map<String, ?> filter, Option... options)
+    public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, MessagePartListener listener, Map<String, Object> filter, Option... options)
     {
         setMessageListener(destination,listener);
         super.messageSubscribe(queue, destination, confirmMode, acquireMode, filter, options);

Modified: incubator/qpid/trunk/qpid/java/common/generate
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/generate?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/generate (original)
+++ incubator/qpid/trunk/qpid/java/common/generate Mon Sep 24 07:26:14 2007
@@ -21,6 +21,7 @@
 
     self.line("package %s;" % self.package)
     self.line()
+    self.line("import java.util.List;")
     self.line("import java.util.Map;")
     self.line("import java.util.UUID;")
     self.line()
@@ -54,12 +55,55 @@
   "short": "int",
   "octet": "short",
   "bit": "boolean",
-  "table": "Map<String,?>",
+  "table": "Map<String,Object>",
   "timestamp": "long",
   "content": "String",
   "uuid": "UUID",
   "rfc1982-long-set": "RangeSet",
-  "long-struct": "Struct"
+  "long-struct": "Struct",
+  "signed-byte": "byte",
+  "unsigned-byte": "short",
+  "char": "char",
+  "boolean": "boolean",
+  "two-octets": "short",
+  "signed-short": "short",
+  "unsigned-short": "int",
+  "four-octets": "int",
+  "signed-int": "int",
+  "unsigned-int": "long",
+  "float": "float",
+  "utf32-char": "char",
+  "eight-octets": "long",
+  "signed-long": "long",
+  "unsigned-long": "long",
+  "double": "double",
+  "datetime": "long",
+  "sixteen-octets": "byte[]",
+  "thirty-two-octets": "byte[]",
+  "sixty-four-octets": "byte[]",
+  "_128-octets": "byte[]",
+  "short-binary": "byte[]",
+  "short-string": "String",
+  "short-utf8-string": "String",
+  "short-utf16-string": "String",
+  "short-utf32-string": "String",
+  "binary": "byte[]",
+  "string": "String",
+  "utf8-string": "String",
+  "utf16-string": "String",
+  "utf32-string": "String",
+  "long-binary": "byte[]",
+  "long-string": "String",
+  "long-utf8-string": "String",
+  "long-utf16-string": "String",
+  "long-utf32-string": "String",
+  "sequence": "List<Object>",
+  "array": "List<Object>",
+  "five-octets": "byte[]",
+  "decimal": "byte[]",
+  "nine-octets": "byte[]",
+  "long-decimal": "byte[]",
+  "void": "Void"
   }
 
 TRACKS = {
@@ -80,6 +124,54 @@
 
 def scream(*args):
   return "_".join([a.replace("-", "_").upper() for a in args])
+
+
+types = Output(out_dir, out_pkg, "Type")
+types.line("public enum Type")
+types.line("{")
+codes = {}
+for c in spec.query["amqp/constant"]:
+  if c["@class"] == "field-table-type":
+    name = c["@name"]
+    if name.startswith("field-table-"):
+      name = name[12:]
+    if name[0].isdigit():
+      name = "_" + name
+    val = c["@value"]
+    codes[val] = name
+    if c["@width"] != None:
+      width = c["@width"]
+      fixed = "true"
+    if c["@lfwidth"] != None:
+      width = c["@lfwidth"]
+      fixed = "false"
+    types.line("    %s((byte) %s, %s, %s)," %
+               (scream(name), val, width, fixed))
+types.line("    ;")
+
+types.line("    public byte code;")
+types.line("    public int width;")
+types.line("    public boolean fixed;")
+
+types.line("    Type(byte code, int width, boolean fixed)")
+types.line("    {")
+for arg in ("code", "width", "fixed"):
+  types.line("        this.%s = %s;" % (arg, arg))
+types.line("    }")
+
+types.line("    public static Type get(byte code)")
+types.line("    {")
+types.line("        switch (code)")
+types.line("        {")
+for code, name in codes.items():
+  types.line("        case (byte) %s: return %s;" % (code, scream(name)))
+types.line("        default: return null;")
+types.line("        }")
+types.line("    }")
+
+types.line("}")
+types.write()
+
 
 const = Output(out_dir, out_pkg, "Constant")
 const.line("public interface Constant")

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java Mon Sep 24 07:26:14 2007
@@ -122,6 +122,12 @@
         {
             System.out.println("received headers routing_key " + props.getRoutingKey());
         }
+        MessageProperties mp = header.get(MessageProperties.class);
+        System.out.println("MP: " + mp);
+        if (mp != null)
+        {
+            System.out.println(mp.getApplicationHeaders());
+        }
 
         this.header = header;
     }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyClient.java Mon Sep 24 07:26:14 2007
@@ -20,6 +20,8 @@
  */
 package org.apache.qpidity;
 
+import java.util.*;
+
 import org.apache.qpidity.transport.*;
 import org.apache.qpidity.transport.network.mina.MinaHandler;
 
@@ -78,9 +80,32 @@
         ssn.queueDeclare("asdf", null, null);
         ssn.sync();
 
+        Map<String,Object> nested = new LinkedHashMap<String,Object>();
+        nested.put("list", Arrays.asList("one", "two", "three"));
+        Map<String,Object> map = new LinkedHashMap<String,Object>();
+
+        map.put("str", "this is a string");
+
+        map.put("+int", 3);
+        map.put("-int", -3);
+        map.put("maxint", Integer.MAX_VALUE);
+        map.put("minint", Integer.MIN_VALUE);
+
+        map.put("+short", (short) 1);
+        map.put("-short", (short) -1);
+        map.put("maxshort", (short) Short.MAX_VALUE);
+        map.put("minshort", (short) Short.MIN_VALUE);
+
+        map.put("float", (float) 3.3);
+        map.put("double", 4.9);
+        map.put("char", 'c');
+
+        map.put("table", nested);
+        map.put("list", Arrays.asList(1, 2, 3));
+
         ssn.messageTransfer("asdf", (short) 0, (short) 1);
         ssn.header(new DeliveryProperties(),
-                   new MessageProperties());
+                   new MessageProperties().setApplicationHeaders(map));
         ssn.data("this is the data");
         ssn.endData();
 

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractDecoder.java Mon Sep 24 07:26:14 2007
@@ -20,11 +20,15 @@
  */
 package org.apache.qpidity.codec;
 
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
 import org.apache.qpidity.transport.RangeSet;
 import org.apache.qpidity.transport.Struct;
+import org.apache.qpidity.transport.Type;
 
 import static org.apache.qpidity.transport.util.Functions.*;
 
@@ -40,20 +44,37 @@
 
     private final byte major;
     private final byte minor;
+    private int count;
 
     protected AbstractDecoder(byte major, byte minor)
     {
         this.major = major;
         this.minor = minor;
+        this.count = 0;
     }
 
-    protected abstract byte get();
+    protected abstract byte doGet();
 
-    protected abstract void get(byte[] bytes);
+    protected abstract void doGet(byte[] bytes);
+
+    protected byte get()
+    {
+        clearBits();
+        byte b = doGet();
+        count += 1;
+        return b;
+    }
+
+    protected void get(byte[] bytes)
+    {
+        clearBits();
+        doGet(bytes);
+        count += bytes.length;
+    }
 
     protected short uget()
     {
-        return unsigned(get());
+        return (short) (0xFF & get());
     }
 
     private byte bits = 0x0;
@@ -81,13 +102,11 @@
 
     public short readOctet()
     {
-        clearBits();
         return uget();
     }
 
     public int readShort()
     {
-        clearBits();
         int i = uget() << 8;
         i |= uget();
         return i;
@@ -95,7 +114,6 @@
 
     public long readLong()
     {
-        clearBits();
         long l = uget() << 24;
         l |= uget() << 16;
         l |= uget() << 8;
@@ -105,15 +123,11 @@
 
     public long readLonglong()
     {
-        clearBits();
-        long l = uget() << 56;
-        l |= uget() << 48;
-        l |= uget() << 40;
-        l |= uget() << 32;
-        l |= uget() << 24;
-        l |= uget() << 16;
-        l |= uget() << 8;
-        l |= uget();
+        long l = 0;
+        for (int i = 0; i < 8; i++)
+        {
+            l |= ((long) (0xFF & get())) << (56 - i*8);
+        }
         return l;
     }
 
@@ -134,18 +148,11 @@
     public String readLongstr()
     {
         long size = readLong();
-        assert size <= Integer.MAX_VALUE;
         byte[] bytes = new byte[(int) size];
         get(bytes);
         return new String(bytes);
     }
 
-    public Map<String,?> readTable()
-    {
-        //throw new Error("TODO");
-        return null;
-    }
-
     public RangeSet readRfc1982LongSet()
     {
         int count = readShort()/8;
@@ -189,6 +196,175 @@
             Struct result = Struct.create(type);
             result.read(this, major, minor);
             return result;
+        }
+    }
+
+    public Map<String,Object> readTable()
+    {
+        long size = readLong();
+        int start = count;
+        Map<String,Object> result = new LinkedHashMap();
+        while (count < start + size)
+        {
+            String key = readShortstr();
+            byte code = get();
+            Type t = Type.get(code);
+            Object value = read(t);
+            result.put(key, value);
+        }
+        return result;
+    }
+
+    public List<Object> readSequence()
+    {
+        long size = readLong();
+        int start = count;
+        List<Object> result = new ArrayList();
+        while (count < start + size)
+        {
+            byte code = get();
+            Type t = Type.get(code);
+            Object value = read(t);
+            result.add(value);
+        }
+        return result;
+    }
+
+    public List<Object> readArray()
+    {
+        long size = readLong();
+        byte code = get();
+        Type t = Type.get(code);
+        long count = readLong();
+
+        List<Object> result = new ArrayList<Object>();
+        for (int i = 0; i < count; i++)
+        {
+            Object value = read(t);
+            result.add(value);
+        }
+        return result;
+    }
+
+    private long readSize(Type t)
+    {
+        if (t.fixed)
+        {
+            return t.width;
+        }
+        else
+        {
+            switch (t.width)
+            {
+            case 1:
+                return readOctet();
+            case 2:
+                return readShort();
+            case 4:
+                return readLong();
+            default:
+                throw new IllegalStateException("irregular width: " + t);
+            }
+        }
+    }
+
+    private byte[] readBytes(Type t)
+    {
+        long size = readSize(t);
+        byte[] result = new byte[(int) size];
+        get(result);
+        return result;
+    }
+
+    private Object read(Type t)
+    {
+        switch (t)
+        {
+        case OCTET:
+        case UNSIGNED_BYTE:
+            return readOctet();
+        case SIGNED_BYTE:
+            return get();
+        case CHAR:
+            return (char) get();
+        case BOOLEAN:
+            return get() > 0;
+
+        case TWO_OCTETS:
+        case UNSIGNED_SHORT:
+            return readShort();
+
+        case SIGNED_SHORT:
+            return (short) readShort();
+
+        case FOUR_OCTETS:
+        case UNSIGNED_INT:
+            return readLong();
+
+        case UTF32_CHAR:
+        case SIGNED_INT:
+            return (int) readLong();
+
+        case FLOAT:
+            return Float.intBitsToFloat((int) readLong());
+
+        case EIGHT_OCTETS:
+        case SIGNED_LONG:
+        case UNSIGNED_LONG:
+        case DATETIME:
+            return readLonglong();
+
+        case DOUBLE:
+            long bits = readLonglong();
+            System.out.println("double in: " + bits);
+            return Double.longBitsToDouble(bits);
+
+        case SIXTEEN_OCTETS:
+        case THIRTY_TWO_OCTETS:
+        case SIXTY_FOUR_OCTETS:
+        case _128_OCTETS:
+        case SHORT_BINARY:
+        case BINARY:
+        case LONG_BINARY:
+            return readBytes(t);
+
+        case UUID:
+            return readUuid();
+
+        case SHORT_STRING:
+        case SHORT_UTF8_STRING:
+        case SHORT_UTF16_STRING:
+        case SHORT_UTF32_STRING:
+        case STRING:
+        case UTF8_STRING:
+        case UTF16_STRING:
+        case UTF32_STRING:
+        case LONG_STRING:
+        case LONG_UTF8_STRING:
+        case LONG_UTF16_STRING:
+        case LONG_UTF32_STRING:
+            // XXX: need to do character conversion
+            return new String(readBytes(t));
+
+        case TABLE:
+            return readTable();
+        case SEQUENCE:
+            return readSequence();
+        case ARRAY:
+            return readArray();
+
+        case FIVE_OCTETS:
+        case DECIMAL:
+        case NINE_OCTETS:
+        case LONG_DECIMAL:
+            // XXX: what types are we supposed to use here?
+            return readBytes(t);
+
+        case VOID:
+            return null;
+
+        default:
+            return readBytes(t);
         }
     }
 

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractEncoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractEncoder.java?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractEncoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/AbstractEncoder.java Mon Sep 24 07:26:14 2007
@@ -22,12 +22,15 @@
 
 import java.nio.ByteBuffer;
 
+import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
 import org.apache.qpidity.transport.Range;
 import org.apache.qpidity.transport.RangeSet;
 import org.apache.qpidity.transport.Struct;
+import org.apache.qpidity.transport.Type;
 
 import static org.apache.qpidity.transport.util.Functions.*;
 
@@ -41,18 +44,57 @@
 abstract class AbstractEncoder implements Encoder
 {
 
+    private static Map<Class<?>,Type> ENCODINGS = new HashMap<Class<?>,Type>();
+    static
+    {
+        ENCODINGS.put(String.class, Type.LONG_STRING);
+        ENCODINGS.put(Long.class, Type.SIGNED_LONG);
+        ENCODINGS.put(Integer.class, Type.SIGNED_INT);
+        ENCODINGS.put(Short.class, Type.SIGNED_SHORT);
+        ENCODINGS.put(Byte.class, Type.SIGNED_BYTE);
+        ENCODINGS.put(Map.class, Type.TABLE);
+        ENCODINGS.put(List.class, Type.SEQUENCE);
+        ENCODINGS.put(Float.class, Type.FLOAT);
+        ENCODINGS.put(Double.class, Type.DOUBLE);
+        ENCODINGS.put(Character.class, Type.CHAR);
+    }
+
     private final byte major;
     private final byte minor;
+    private final boolean calcsize;
 
-    protected AbstractEncoder(byte major, byte minor)
+    protected AbstractEncoder(byte major, byte minor, boolean calcsize)
     {
         this.major = major;
         this.minor = minor;
+        this.calcsize = calcsize;
+    }
+
+    protected AbstractEncoder(byte major, byte minor)
+    {
+        this(major, minor, true);
     }
 
-    protected abstract void put(byte b);
+    protected abstract void doPut(byte b);
 
-    protected abstract void put(ByteBuffer src);
+    protected abstract void doPut(ByteBuffer src);
+
+    protected void put(byte b)
+    {
+        flushBits();
+        doPut(b);
+    }
+
+    protected void put(ByteBuffer src)
+    {
+        flushBits();
+        doPut(src);
+    }
+
+    protected void put(byte[] bytes)
+    {
+        put(ByteBuffer.wrap(bytes));
+    }
 
     private byte bits = 0x0;
     private byte nbits = 0;
@@ -76,17 +118,21 @@
     {
         if (nbits > 0)
         {
-            put(bits);
+            doPut(bits);
             bits = 0x0;
             nbits = 0;
         }
     }
 
+    public void flush()
+    {
+        flushBits();
+    }
+
     public void writeOctet(short b)
     {
         assert b < 0x100;
 
-        flushBits();
         put((byte) b);
     }
 
@@ -94,7 +140,6 @@
     {
         assert s < 0x10000;
 
-        flushBits();
         put(lsb(s >>> 8));
         put(lsb(s));
     }
@@ -103,7 +148,6 @@
     {
         assert i < 0x100000000L;
 
-        flushBits();
         put(lsb(i >>> 24));
         put(lsb(i >>> 16));
         put(lsb(i >>> 8));
@@ -112,21 +156,15 @@
 
     public void writeLonglong(long l)
     {
-        flushBits();
-        put(lsb(l >>> 56));
-        put(lsb(l >>> 48));
-        put(lsb(l >>> 40));
-        put(lsb(l >>> 32));
-        put(lsb(l >>> 24));
-        put(lsb(l >>> 16));
-        put(lsb(l >>> 8));
-        put(lsb(l));
+        for (int i = 0; i < 8; i++)
+        {
+            put(lsb(l >> (56 - i*8)));
+        }
     }
 
 
     public void writeTimestamp(long l)
     {
-        flushBits();
         writeLonglong(l);
     }
 
@@ -149,11 +187,6 @@
     }
 
 
-    public void writeTable(Map<String,?> table)
-    {
-        //throw new Error("TODO");
-    }
-
     public void writeRfc1982LongSet(RangeSet ranges)
     {
         if (ranges == null)
@@ -197,19 +230,330 @@
         }
         else
         {
-            SizeEncoder sizer = new SizeEncoder(major, minor);
-            sizer.writeShort(s.getEncodedType());
-            s.write(sizer, major, minor);
+            int size = 0;
+            if (calcsize)
+            {
+                SizeEncoder sizer = new SizeEncoder(major, minor);
+                sizer.writeShort(s.getEncodedType());
+                s.write(sizer, major, minor);
+                size = sizer.getSize();
+            }
 
-            writeLong(sizer.getSize());
+            writeLong(size);
             writeShort(s.getEncodedType());
             s.write(this, major, minor);
         }
     }
 
-    public void flush()
+    private Type encoding(Object value)
     {
-        flushBits();
+        if (value == null)
+        {
+            return Type.VOID;
+        }
+
+        Class klass = value.getClass();
+        Type type = resolve(klass);
+
+        if (type == null)
+        {
+            throw new IllegalArgumentException
+                ("unable to resolve type: " + klass + ", " + value);
+        }
+        else
+        {
+            return type;
+        }
+    }
+
+    private Type resolve(Class klass)
+    {
+        Type type = ENCODINGS.get(klass);
+        if (type != null)
+        {
+            return type;
+        }
+
+        Class sup = klass.getSuperclass();
+        if (sup != null)
+        {
+            type = resolve(klass.getSuperclass());
+
+            if (type != null)
+            {
+                return type;
+            }
+        }
+
+        for (Class iface : klass.getInterfaces())
+        {
+            type = resolve(iface);
+            if (type != null)
+            {
+                return type;
+            }
+        }
+
+        return null;
+    }
+
+    public void writeTable(Map<String,Object> table)
+    {
+        if (table == null)
+        {
+            writeLong(0);
+            return;
+        }
+
+        int size = 0;
+        if (calcsize)
+        {
+            SizeEncoder sizer = new SizeEncoder(major, minor);
+            sizer.writeTableEntries(table);
+            size = sizer.getSize();
+        }
+
+        writeLong(size);
+        writeTableEntries(table);
+    }
+
+    protected void writeTableEntries(Map<String,Object> table)
+    {
+        for (Map.Entry<String,Object> entry : table.entrySet())
+        {
+            String key = entry.getKey();
+            Object value = entry.getValue();
+            Type type = encoding(value);
+            writeShortstr(key);
+            put(type.code);
+            write(type, value);
+        }
+    }
+
+    public void writeSequence(List<Object> sequence)
+    {
+        int size = 0;
+        if (calcsize)
+        {
+            SizeEncoder sizer = new SizeEncoder(major, minor);
+            sizer.writeSequenceEntries(sequence);
+            size = sizer.getSize();
+        }
+
+        writeLong(size);
+        writeSequenceEntries(sequence);
+    }
+
+    protected void writeSequenceEntries(List<Object> sequence)
+    {
+        for (Object value : sequence)
+        {
+            Type type = encoding(value);
+            put(type.code);
+            write(type, value);
+        }
+    }
+
+    public void writeArray(List<Object> array)
+    {
+        int size = 0;
+        if (calcsize)
+        {
+            SizeEncoder sizer = new SizeEncoder(major, minor);
+            sizer.writeArrayEntries(array);
+            size = sizer.getSize();
+        }
+
+        writeLong(size);
+        writeArrayEntries(array);
+    }
+
+    protected void writeArrayEntries(List<Object> array)
+    {
+        Type type;
+
+        if (array.isEmpty())
+        {
+            type = Type.VOID;
+        }
+        else
+        {
+            type = encoding(array.get(0));
+        }
+
+        put(type.code);
+
+        for (Object value : array)
+        {
+            write(type, value);
+        }
+    }
+
+    private void writeSize(Type t, int size)
+    {
+        if (t.fixed)
+        {
+            if (size != t.width)
+            {
+                throw new IllegalArgumentException
+                    ("size does not match fixed width " + t.width + ": " +
+                     size);
+            }
+        }
+        else
+        {
+            // XXX: should check lengths
+            switch (t.width)
+            {
+            case 1:
+                writeOctet((short) size);
+                break;
+            case 2:
+                writeShort(size);
+                break;
+            case 4:
+                writeLong(size);
+                break;
+            default:
+                throw new IllegalStateException("irregular width: " + t);
+            }
+        }
+    }
+
+    private void writeBytes(Type t, byte[] bytes)
+    {
+        writeSize(t, bytes.length);
+        put(bytes);
+    }
+
+    private <T> T coerce(Class<T> klass, Object value)
+    {
+        if (klass.isInstance(value))
+        {
+            return klass.cast(value);
+        }
+        else
+        {
+            throw new IllegalArgumentException("" + value);
+        }
+    }
+
+    private void write(Type t, Object value)
+    {
+        switch (t)
+        {
+        case OCTET:
+        case UNSIGNED_BYTE:
+            writeOctet(coerce(Short.class, value));
+            break;
+        case SIGNED_BYTE:
+            put(coerce(Byte.class, value));
+            break;
+        case CHAR:
+            put((byte) ((char)coerce(Character.class, value)));
+            break;
+        case BOOLEAN:
+            if (coerce(Boolean.class, value))
+            {
+                put((byte) 1);
+            }
+            else
+            {
+                put((byte) 0);
+            }
+            break;
+
+        case TWO_OCTETS:
+        case UNSIGNED_SHORT:
+            writeShort(coerce(Integer.class, value));
+            break;
+
+        case SIGNED_SHORT:
+            writeShort(coerce(Short.class, value));
+            break;
+
+        case FOUR_OCTETS:
+        case UNSIGNED_INT:
+            writeLong(coerce(Long.class, value));
+            break;
+
+        case UTF32_CHAR:
+        case SIGNED_INT:
+            writeLong(coerce(Integer.class, value));
+            break;
+
+        case FLOAT:
+            writeLong(Float.floatToIntBits(coerce(Float.class, value)));
+            break;
+
+        case EIGHT_OCTETS:
+        case SIGNED_LONG:
+        case UNSIGNED_LONG:
+        case DATETIME:
+            writeLonglong(coerce(Long.class, value));
+            break;
+
+        case DOUBLE:
+            long bits = Double.doubleToLongBits(coerce(Double.class, value));
+            System.out.println("double out: " + bits);
+            writeLonglong(bits);
+            break;
+
+        case SIXTEEN_OCTETS:
+        case THIRTY_TWO_OCTETS:
+        case SIXTY_FOUR_OCTETS:
+        case _128_OCTETS:
+        case SHORT_BINARY:
+        case BINARY:
+        case LONG_BINARY:
+            writeBytes(t, coerce(byte[].class, value));
+            break;
+
+        case UUID:
+            writeUuid(coerce(UUID.class, value));
+            break;
+
+        case SHORT_STRING:
+        case SHORT_UTF8_STRING:
+        case SHORT_UTF16_STRING:
+        case SHORT_UTF32_STRING:
+        case STRING:
+        case UTF8_STRING:
+        case UTF16_STRING:
+        case UTF32_STRING:
+        case LONG_STRING:
+        case LONG_UTF8_STRING:
+        case LONG_UTF16_STRING:
+        case LONG_UTF32_STRING:
+            // XXX: need to do character conversion
+            writeBytes(t, coerce(String.class, value).getBytes());
+            break;
+
+        case TABLE:
+            writeTable((Map<String,Object>) coerce(Map.class, value));
+            break;
+        case SEQUENCE:
+            writeSequence(coerce(List.class, value));
+            break;
+        case ARRAY:
+            writeArray(coerce(List.class, value));
+            break;
+
+        case FIVE_OCTETS:
+        case DECIMAL:
+        case NINE_OCTETS:
+        case LONG_DECIMAL:
+            // XXX: what types are we supposed to use here?
+            writeBytes(t, coerce(byte[].class, value));
+            break;
+
+        case VOID:
+            break;
+
+        default:
+            writeBytes(t, coerce(byte[].class, value));
+            break;
+        }
     }
 
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/BBDecoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/BBDecoder.java?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/BBDecoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/BBDecoder.java Mon Sep 24 07:26:14 2007
@@ -40,12 +40,12 @@
         this.in = in;
     }
 
-    protected byte get()
+    protected byte doGet()
     {
         return in.get();
     }
 
-    protected void get(byte[] bytes)
+    protected void doGet(byte[] bytes)
     {
         in.get(bytes);
     }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/BBEncoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/BBEncoder.java?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/BBEncoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/BBEncoder.java Mon Sep 24 07:26:14 2007
@@ -39,12 +39,12 @@
         this.out = out;
     }
 
-    @Override protected void put(byte b)
+    protected void doPut(byte b)
     {
         out.put(b);
     }
 
-    @Override protected void put(ByteBuffer src)
+    protected void doPut(ByteBuffer src)
     {
         out.put(src);
     }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/Decoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/Decoder.java?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/Decoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/Decoder.java Mon Sep 24 07:26:14 2007
@@ -20,6 +20,7 @@
  */
 package org.apache.qpidity.codec;
 
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
@@ -47,12 +48,15 @@
     String readShortstr();
     String readLongstr();
 
-    Map<String,?> readTable();
     RangeSet readRfc1982LongSet();
     UUID readUuid();
 
     String readContent();
 
     Struct readLongStruct();
+
+    Map<String,Object> readTable();
+    List<Object> readSequence();
+    List<Object> readArray();
 
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/Encoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/Encoder.java?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/Encoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/Encoder.java Mon Sep 24 07:26:14 2007
@@ -20,6 +20,7 @@
  */
 package org.apache.qpidity.codec;
 
+import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
@@ -36,6 +37,8 @@
 public interface Encoder
 {
 
+    void flush();
+
     void writeBit(boolean b);
     void writeOctet(short b);
     void writeShort(int s);
@@ -47,14 +50,15 @@
     void writeShortstr(String s);
     void writeLongstr(String s);
 
-    void writeTable(Map<String,?> table);
     void writeRfc1982LongSet(RangeSet ranges);
     void writeUuid(UUID uuid);
 
     void writeContent(String c);
 
-    void flush();
-
     void writeLongStruct(Struct s);
+
+    void writeTable(Map<String,Object> table);
+    void writeSequence(List<Object> sequence);
+    void writeArray(List<Object> array);
 
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/FragmentDecoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/FragmentDecoder.java?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/FragmentDecoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/FragmentDecoder.java Mon Sep 24 07:26:14 2007
@@ -87,7 +87,7 @@
         }
     }
 
-    @Override protected byte get()
+    protected byte doGet()
     {
         preRead();
         byte b = current.get();
@@ -95,7 +95,7 @@
         return b;
     }
 
-    @Override protected void get(byte[] bytes)
+    protected void doGet(byte[] bytes)
     {
         int remaining = bytes.length;
         while (remaining > 0)

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/SizeEncoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/SizeEncoder.java?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/SizeEncoder.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/codec/SizeEncoder.java Mon Sep 24 07:26:14 2007
@@ -41,7 +41,7 @@
     }
 
     public SizeEncoder(byte major, byte minor, int size) {
-        super(major, minor);
+        super(major, minor, false);
         this.size = size;
     }
 
@@ -53,31 +53,14 @@
         this.size = size;
     }
 
-    @Override protected void put(byte b)
+    protected void doPut(byte b)
     {
         size += 1;
     }
 
-    @Override protected void put(ByteBuffer src)
+    protected void doPut(ByteBuffer src)
     {
         size += src.remaining();
-    }
-
-    @Override public void writeShortstr(String s)
-    {
-        if (s == null) { s = ""; }
-        if (s.length() > 255) {
-            throw new IllegalArgumentException(s);
-        }
-        writeOctet((byte) s.length());
-        size += s.length();
-    }
-
-    @Override public void writeLongstr(String s)
-    {
-        if (s == null) { s = ""; }
-        writeLong(s.length());
-        size += s.length();
     }
 
 }

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/Channel.java Mon Sep 24 07:26:14 2007
@@ -106,7 +106,7 @@
 
     public void error(Void v, ProtocolError error)
     {
-        error.delegate(session, sessionDelegate);
+        throw new RuntimeException(error.getMessage());
     }
 
     public void closed()

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/ConnectionDelegate.java Mon Sep 24 07:26:14 2007
@@ -117,7 +117,7 @@
           //  need error handling
         }
 
-        Map<String,?> props = new HashMap<String,String>();
+        Map<String,Object> props = new HashMap<String,Object>();
         context.connectionStartOk(props, mechanism, response, _locale);
     }
 

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/network/InputHandler.java Mon Sep 24 07:26:14 2007
@@ -27,6 +27,8 @@
 import org.apache.qpidity.transport.ProtocolHeader;
 import org.apache.qpidity.transport.Receiver;
 
+import static org.apache.qpidity.transport.util.Functions.*;
+
 import static org.apache.qpidity.transport.network.InputHandler.State.*;
 
 
@@ -147,12 +149,20 @@
             type = buf.get();
             return FRAME_HDR_SIZE1;
         case FRAME_HDR_SIZE1:
-            size = buf.get() << 8;
+            size = (0xFF & buf.get()) << 8;
             return FRAME_HDR_SIZE2;
         case FRAME_HDR_SIZE2:
-            size += buf.get();
+            size += 0xFF & buf.get();
             size -= 12;
-            return FRAME_HDR_RSVD1;
+            if (size < 0 || size > (64*1024 - 12))
+            {
+                error("bad frame size: %d", size);
+                return ERROR;
+            }
+            else
+            {
+                return FRAME_HDR_RSVD1;
+            }
         case FRAME_HDR_RSVD1:
             return expect(buf, 0, FRAME_HDR_TRACK);
         case FRAME_HDR_TRACK:
@@ -166,10 +176,10 @@
                 return FRAME_HDR_CH1;
             }
         case FRAME_HDR_CH1:
-            channel = buf.get() << 8;
+            channel = (0xFF & buf.get()) << 8;
             return FRAME_HDR_CH2;
         case FRAME_HDR_CH2:
-            channel += buf.get();
+            channel += 0xFF & buf.get();
             return FRAME_HDR_RSVD2;
         case FRAME_HDR_RSVD2:
             return expect(buf, 0, FRAME_HDR_RSVD3);

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java?rev=578827&r1=578826&r2=578827&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/transport/util/Functions.java Mon Sep 24 07:26:14 2007
@@ -32,21 +32,6 @@
 public class Functions
 {
 
-    public static final short unsigned(byte b)
-    {
-        return (short) ((0x100 + b) & 0xFF);
-    }
-
-    public static final int unsigned(short s)
-    {
-        return (0x10000 + s) & 0xFFFF;
-    }
-
-    public static final long unsigned(int i)
-    {
-        return (0x1000000000L + i) & 0xFFFFFFFFL;
-    }
-
     public static final byte lsb(int i)
     {
         return (byte) (0xFF & i);
@@ -65,13 +50,13 @@
     public static final String str(ByteBuffer buf, int limit)
     {
         StringBuilder str = new StringBuilder();
-        for (int i = 0; i < limit; i++)
+        for (int i = 0; i < buf.remaining(); i++)
         {
             if (i > 0 && i % 2 == 0)
             {
                 str.append(" ");
             }
-            str.append(String.format("%02x", buf.get(i)));
+            str.append(String.format("%02x", buf.get(buf.position() + i)));
         }
 
         return str.toString();