You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2015/03/12 19:19:18 UTC

qpid-proton git commit: PROTON-827: Marshal and unmarshal all basic types and reflect.Value

Repository: qpid-proton
Updated Branches:
  refs/heads/master 4653cdc6f -> 7e42628ed


PROTON-827: Marshal and unmarshal all basic types and reflect.Value

Using a type-switch approach, slightly (about 10%) faster than a previous reflection approach.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/7e42628e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/7e42628e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/7e42628e

Branch: refs/heads/master
Commit: 7e42628edb5c7d6cadc5fad1d5299aed15e11d38
Parents: 4653cdc
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Mar 10 13:44:43 2015 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Mar 12 14:18:13 2015 -0400

----------------------------------------------------------------------
 .../go/src/apache.org/proton/encoding.go        | 220 ---------
 .../bindings/go/src/apache.org/proton/error.go  |   9 +
 .../go/src/apache.org/proton/interop_test.go    | 139 +++++-
 .../go/src/apache.org/proton/marshal.go         | 157 +++++++
 .../bindings/go/src/apache.org/proton/types.go  | 115 +++--
 .../go/src/apache.org/proton/unmarshal.go       | 441 +++++++++++++++++++
 .../bindings/go/src/apache.org/proton/url.go    |  12 +-
 proton-c/src/codec/encoder.c                    |   2 +-
 8 files changed, 829 insertions(+), 266 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7e42628e/proton-c/bindings/go/src/apache.org/proton/encoding.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/apache.org/proton/encoding.go b/proton-c/bindings/go/src/apache.org/proton/encoding.go
deleted file mode 100644
index a781a68..0000000
--- a/proton-c/bindings/go/src/apache.org/proton/encoding.go
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-oor more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package proton
-
-// #include <proton/codec.h>
-import "C"
-
-import (
-	"bytes"
-	"io"
-	"reflect"
-	"unsafe"
-)
-
-//
-// Decoding from a pn_data_t
-//
-// NOTE: we use panic() to signal a decoding error, simplifies decoding logic.
-// We recover() at the highest possible level - i.e. in the exported Unmarshal or Decode.
-//
-
-// Decoder decodes AMQP values from an io.Reader.
-//
-type Decoder struct {
-	reader io.Reader
-	buffer bytes.Buffer
-}
-
-// NewDecoder returns a new decoder that reads from r.
-//
-// The decoder has it's own buffer and may read more data than required for the
-// AMQP values requested.  Use Buffered to see if there is data left in the
-// buffer.
-//
-func NewDecoder(r io.Reader) *Decoder {
-	return &Decoder{r, bytes.Buffer{}}
-}
-
-// Buffered returns a reader of the data remaining in the Decoder's buffer. The
-// reader is valid until the next call to Decode.
-//
-func (d *Decoder) Buffered() io.Reader {
-	return bytes.NewReader(d.buffer.Bytes())
-}
-
-// Decode reads the next AMQP value from the Reader and stores it in the value pointed to by v.
-//
-// See the documentation for Unmarshal for details about the conversion of AMQP into a Go value.
-//
-func (d *Decoder) Decode(v interface{}) (err error) {
-	defer func() {
-		if x := recover(); x != nil {
-			err = errorf("%v", x)
-		}
-	}()
-	data := C.pn_data(0)
-	defer C.pn_data_free(data)
-
-	var n int
-	for n == 0 {
-		n = unmarshal(data, d.buffer.Bytes(), v)
-		if n == 0 { // n == 0 means not enough data, read more
-			err = d.more()
-		}
-	}
-	d.buffer.Next(n)
-	return
-}
-
-/*
-Unmarshal decodes AMQP-encoded bytes and stores the result in the value pointed to by v.
-
-Go types that can be unmarshalled from AMQP types
-
-bool from AMQP bool
-
-int8, int16, int32, int64 from equivalent or smaller AMQP signed integer type.
-
-uint8, uint16, uint32, uint64 types from equivalent or smaller AMQP unsigned integer type.
-
-float32, float64 from equivalent or smaller AMQP float type.
-
-string, []byte from AMQP string, symbol or binary.
-
-TODO types
-
-AMQP from AMQP null, char, timestamp, decimal32/64/128, uuid, described, array, list, map
-
-Go: uint, int, array, slice, struct, map, reflect/Value
-
-Go types that cannot be unmarshalled
-
-complex64/128, uintptr, function, interface, channel
-*/
-func Unmarshal(bytes []byte, v interface{}) (n int, err error) {
-	defer func() {
-		if x := recover(); x != nil {
-			err = errorf("%v", x)
-		}
-	}()
-	data := C.pn_data(0)
-	defer C.pn_data_free(data)
-	n = unmarshal(data, bytes, v)
-	if n == 0 {
-		err = errorf("not enough data")
-	}
-	return
-}
-
-func pnDataError(data *C.pn_data_t) (code int, msg string) {
-	pnError := C.pn_data_error(data)
-	return int(C.pn_error_code(pnError)), C.GoString(C.pn_error_text(pnError))
-}
-
-// decode from bytes.
-// Return bytes decoded or 0 if we could not decode a complete object.
-//
-func decode(data *C.pn_data_t, bytes []byte) int {
-	if len(bytes) == 0 {
-		return 0
-	}
-	cBuf := (*C.char)(unsafe.Pointer(&bytes[0]))
-	n := int(C.pn_data_decode(data, cBuf, C.size_t(len(bytes))))
-	if n == int(C.PN_EOS) {
-		return 0
-	} else if n <= 0 {
-		panic(errorf("unmarshal %s", pnErrorName(n)))
-	}
-	return n
-}
-
-// more reads more data when we can't parse a complete AMQP type
-func (d *Decoder) more() error {
-	var readSize int64 = 256
-	if int64(d.buffer.Len()) > readSize { // Grow by doubling
-		readSize = int64(d.buffer.Len())
-	}
-	var n int64
-	n, err := d.buffer.ReadFrom(&io.LimitedReader{d.reader, readSize})
-	if n == 0 { // ReadFrom won't report io.EOF, just returns 0
-		if err != nil {
-			panic(err)
-		} else {
-			panic("not enough data")
-		}
-	}
-	return nil
-}
-
-// unmarshal decodes from bytes and converts into the value pointed to by v.
-// Used by Unmarshal and Decode
-//
-// Returns the number of bytes decoded or 0 if not enough data.
-//
-func unmarshal(data *C.pn_data_t, bytes []byte, v interface{}) (n int) {
-	n = decode(data, bytes)
-	if n == 0 {
-		return 0
-	}
-	ptrValue := reflect.ValueOf(v)
-	if ptrValue.Type().Kind() != reflect.Ptr {
-		panic(errorf("cannot unmarshal to %T, not a pointer", v))
-	}
-	value := ptrValue.Elem()
-	pnType, ok := pnTypes[C.pn_data_type(data)]
-	if !ok {
-		panic(errorf("unknown AMQP type code %v", C.pn_data_type(data)))
-	}
-	if pnType.getter == nil {
-		panic(errorf("cannot unmarshal AMQP type %s", pnType.name))
-	}
-	decoded := pnType.getter(data)
-	if !decoded.Type().ConvertibleTo(value.Type()) {
-		panic(errorf("cannot unmarshal AMQP %s to %s", pnType.name, value.Type()))
-	}
-	converted := decoded.Convert(value.Type())
-	value.Set(converted)
-	return
-}
-
-func bytesValue(bytes C.pn_bytes_t) reflect.Value {
-	if bytes.start == nil || bytes.size == 0 {
-		return reflect.ValueOf([]byte{})
-	}
-	return reflect.ValueOf(C.GoBytes(unsafe.Pointer(bytes.start), C.int(bytes.size)))
-}
-
-// Get functions to convert PN data to reflect.Value
-func getPnString(data *C.pn_data_t) reflect.Value { return bytesValue(C.pn_data_get_string(data)) }
-func getPnSymbol(data *C.pn_data_t) reflect.Value { return bytesValue(C.pn_data_get_symbol(data)) }
-func getPnBinary(data *C.pn_data_t) reflect.Value { return bytesValue(C.pn_data_get_binary(data)) }
-func getPnBool(data *C.pn_data_t) reflect.Value   { return reflect.ValueOf(C.pn_data_get_bool(data)) }
-func getPnByte(data *C.pn_data_t) reflect.Value   { return reflect.ValueOf(C.pn_data_get_byte(data)) }
-func getPnChar(data *C.pn_data_t) reflect.Value   { return reflect.ValueOf(C.pn_data_get_char(data)) }
-func getPnShort(data *C.pn_data_t) reflect.Value  { return reflect.ValueOf(C.pn_data_get_short(data)) }
-func getPnInt(data *C.pn_data_t) reflect.Value    { return reflect.ValueOf(C.pn_data_get_int(data)) }
-func getPnLong(data *C.pn_data_t) reflect.Value   { return reflect.ValueOf(C.pn_data_get_long(data)) }
-func getPnUbyte(data *C.pn_data_t) reflect.Value  { return reflect.ValueOf(C.pn_data_get_ubyte(data)) }
-func getPnUshort(data *C.pn_data_t) reflect.Value { return reflect.ValueOf(C.pn_data_get_ushort(data)) }
-func getPnUint(data *C.pn_data_t) reflect.Value   { return reflect.ValueOf(C.pn_data_get_uint(data)) }
-func getPnUlong(data *C.pn_data_t) reflect.Value  { return reflect.ValueOf(C.pn_data_get_ulong(data)) }
-func getPnFloat(data *C.pn_data_t) reflect.Value  { return reflect.ValueOf(C.pn_data_get_float(data)) }
-func getPnDouble(data *C.pn_data_t) reflect.Value { return reflect.ValueOf(C.pn_data_get_double(data)) }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7e42628e/proton-c/bindings/go/src/apache.org/proton/error.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/apache.org/proton/error.go b/proton-c/bindings/go/src/apache.org/proton/error.go
index 6875042..63426de 100644
--- a/proton-c/bindings/go/src/apache.org/proton/error.go
+++ b/proton-c/bindings/go/src/apache.org/proton/error.go
@@ -20,6 +20,7 @@ under the License.
 package proton
 
 // #include <proton/error.h>
+// #include <proton/codec.h>
 import "C"
 
 import (
@@ -66,3 +67,11 @@ func (err pnError) Error() string  { return fmt.Sprintf("proton: %s", string(err
 func errorf(format string, a ...interface{}) error {
 	return pnError(fmt.Sprintf(format, a...))
 }
+
+func pnDataError(data *C.pn_data_t) string {
+	err := C.pn_data_error(data)
+	if err != nil && int(C.pn_error_code(err)) != 0 {
+		return C.GoString(C.pn_error_text(err))
+	}
+	return ""
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7e42628e/proton-c/bindings/go/src/apache.org/proton/interop_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/apache.org/proton/interop_test.go b/proton-c/bindings/go/src/apache.org/proton/interop_test.go
index 35e39f0..5333055 100644
--- a/proton-c/bindings/go/src/apache.org/proton/interop_test.go
+++ b/proton-c/bindings/go/src/apache.org/proton/interop_test.go
@@ -35,6 +35,18 @@ import (
 	"testing"
 )
 
+func assertEqual(want interface{}, got interface{}) {
+	if !reflect.DeepEqual(want, got) {
+		panic(errorf("%T(%v) != %T(%v)", want, want, got, got))
+	}
+}
+
+func assertNil(err interface{}) {
+	if err != nil {
+		panic(err)
+	}
+}
+
 func getReader(name string) (r io.Reader) {
 	r, err := os.Open("../../../../../../tests/interop/" + name + ".amqp")
 	if err != nil {
@@ -51,14 +63,20 @@ func remaining(d *Decoder) string {
 // assertDecode: want is the expected value, gotPtr is a pointer to a
 // instance of the same type for Decode.
 func assertDecode(d *Decoder, want interface{}, gotPtr interface{}) {
-	err := d.Decode(gotPtr)
-	if err != nil {
-		panic(err)
-	}
+
+	assertNil(d.Decode(gotPtr))
+
 	got := reflect.ValueOf(gotPtr).Elem().Interface()
-	if !reflect.DeepEqual(want, got) {
-		panic(errorf("%T(%#v) != %T(%#v)", want, want, got, got))
-	}
+	assertEqual(want, got)
+
+	// Try round trip encoding
+	bytes, err := Marshal(want)
+	assertNil(err)
+	n, err := Unmarshal(bytes, gotPtr)
+	assertNil(err)
+	assertEqual(n, len(bytes))
+	got = reflect.ValueOf(gotPtr).Elem().Interface()
+	assertEqual(want, got)
 }
 
 func TestUnmarshal(t *testing.T) {
@@ -125,6 +143,41 @@ func TestPrimitivesCompatible(t *testing.T) {
 	assertDecode(d, 0.125, &f)
 }
 
+// assertDecodeValue: want is the expected value, decode into a reflect.Value
+func assertDecodeValue(d *Decoder, want interface{}) {
+
+	var v reflect.Value
+	assertNil(d.Decode(&v))
+
+	got := v.Interface()
+	assertEqual(want, got)
+
+	// Try round trip encoding
+	bytes, err := Marshal(v)
+	assertNil(err)
+	n, err := Unmarshal(bytes, &v)
+	assertNil(err)
+	assertEqual(n, len(bytes))
+	got = v.Interface()
+	assertEqual(want, got)
+}
+
+func TestPrimitivesValue(t *testing.T) {
+	d := NewDecoder(getReader("primitives"))
+	// Decoding into reflect.Value
+	assertDecodeValue(d, true)
+	assertDecodeValue(d, false)
+	assertDecodeValue(d, uint8(42))
+	assertDecodeValue(d, uint16(42))
+	assertDecodeValue(d, int16(-42))
+	assertDecodeValue(d, uint32(12345))
+	assertDecodeValue(d, int32(-12345))
+	assertDecodeValue(d, uint64(12345))
+	assertDecodeValue(d, int64(-12345))
+	assertDecodeValue(d, float32(0.125))
+	assertDecodeValue(d, float64(0.125))
+}
+
 func TestStrings(t *testing.T) {
 	d := NewDecoder(getReader("strings"))
 	// Test decoding as plain Go strings
@@ -166,6 +219,42 @@ func TestStrings(t *testing.T) {
 	}
 }
 
+func TestEncodeDecode(t *testing.T) {
+	type data struct {
+		s  string
+		i  int
+		u8 uint8
+		b  bool
+		f  float32
+	}
+
+	in := data{"foo", 42, 9, true, 1.234}
+
+	buf := bytes.Buffer{}
+	e := NewEncoder(&buf)
+	e.Encode(in.s)
+	e.Encode(in.i)
+	e.Encode(in.u8)
+	e.Encode(in.b)
+	e.Encode(in.f)
+
+	var out data
+	d := NewDecoder(&buf)
+	d.Decode(&out.s)
+	d.Decode(&out.i)
+	d.Decode(&out.u8)
+	d.Decode(&out.b)
+	d.Decode(&out.f)
+
+	assertEqual(in, out)
+
+	vIn := reflect.ValueOf("thing")
+	e.Encode(vIn)
+	var vOut reflect.Value
+	d.Decode(&vOut)
+	assertEqual("thing", vOut.Interface())
+}
+
 func BenchmarkDecode(b *testing.B) {
 	var buf bytes.Buffer
 	for _, f := range []string{"strings", "primitives"} {
@@ -220,3 +309,39 @@ func BenchmarkDecode(b *testing.B) {
 		d = NewDecoder(bytes.NewReader(buf.Bytes()))
 	}
 }
+
+func BenchmarkEncode(b *testing.B) {
+
+	var buf bytes.Buffer
+	buf.Grow(10000) // Avoid buffer reallocation during benchmark
+	e := NewEncoder(&buf)
+	encode := func(v interface{}) {
+		err := e.Encode(v)
+		if err != nil {
+			panic(err)
+		}
+	}
+
+	for i := 0; i < b.N; i++ {
+		// strings
+		encode([]byte("foo"))
+		encode("foo")
+		encode("bar")
+		encode([]byte(""))
+		encode("")
+		encode("")
+		// primitives
+		encode(true)
+		encode(false)
+		encode(uint8(42))
+		encode(int8(-42))
+		encode(uint16(12345))
+		encode(int16(-12345))
+		encode(uint32(123453245))
+		encode(int32(-123453245))
+		encode(uint64(123456445))
+		encode(int64(-123456445))
+		encode(float32(1.2345))
+		encode(float64(1.23456))
+	}
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7e42628e/proton-c/bindings/go/src/apache.org/proton/marshal.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/apache.org/proton/marshal.go b/proton-c/bindings/go/src/apache.org/proton/marshal.go
new file mode 100644
index 0000000..957e0cf
--- /dev/null
+++ b/proton-c/bindings/go/src/apache.org/proton/marshal.go
@@ -0,0 +1,157 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package proton
+
+// #include <proton/codec.h>
+import "C"
+
+import (
+	"io"
+	"reflect"
+	"unsafe"
+)
+
+const minEncode = 256
+
+/*
+Marshal encodes a value as AMQP.
+
+Go types are encoded as follows
+
+bool to AMQP bool.
+
+int, int8, int16, int32, int64 to equivalent AMQP signed integer type.
+
+uint, uint8, uint16, uint32, uint64 to equivalent or smaller AMQP unsigned integer type.
+
+float32, float64 to AMQP float or double.
+
+string to AMQP string.
+
+[]byte to AMQP binary.
+
+TODO types
+
+Go: array, slice, struct, map, reflect/Value
+
+Go types that cannot be marshaled
+
+complex64/128, uintptr, function, interface, channel
+
+*/
+func Marshal(v interface{}) (bytes []byte, err error) {
+	defer func() {
+		if x := recover(); x != nil {
+			err = errorf("%v", x)
+		}
+	}()
+	return marshal(make([]byte, minEncode), v)
+}
+
+func marshal(bytesIn []byte, v interface{}) (bytes []byte, err error) {
+	data := C.pn_data(0)
+	defer C.pn_data_free(data)
+	switch v := v.(type) {
+	case bool:
+		C.pn_data_put_bool(data, C.bool(v))
+	case int8:
+		C.pn_data_put_byte(data, C.int8_t(v))
+	case int16:
+		C.pn_data_put_short(data, C.int16_t(v))
+	case int32:
+		C.pn_data_put_int(data, C.int32_t(v))
+	case int64:
+		C.pn_data_put_long(data, C.int64_t(v))
+	case int:
+		if unsafe.Sizeof(0) == 8 {
+			C.pn_data_put_long(data, C.int64_t(v))
+		} else {
+			C.pn_data_put_int(data, C.int32_t(v))
+		}
+	case uint8:
+		C.pn_data_put_ubyte(data, C.uint8_t(v))
+	case uint16:
+		C.pn_data_put_ushort(data, C.uint16_t(v))
+	case uint32:
+		C.pn_data_put_uint(data, C.uint32_t(v))
+	case uint64:
+		C.pn_data_put_ulong(data, C.uint64_t(v))
+	case uint:
+		if unsafe.Sizeof(0) == 8 {
+			C.pn_data_put_ulong(data, C.uint64_t(v))
+		} else {
+			C.pn_data_put_uint(data, C.uint32_t(v))
+		}
+	case float32:
+		C.pn_data_put_float(data, C.float(v))
+	case float64:
+		C.pn_data_put_double(data, C.double(v))
+	case string:
+		C.pn_data_put_string(data, toPnBytes([]byte(v)))
+	case []byte:
+		C.pn_data_put_binary(data, toPnBytes(v))
+	case reflect.Value:
+		return marshal(bytesIn, v.Interface())
+	default:
+		panic(errorf("cannot marshal %s to AMQP", reflect.TypeOf(v)))
+	}
+	// FIXME aconway 2015-03-11: get size from proton.
+	bytes = bytesIn
+	for {
+		n := int(C.pn_data_encode(data, (*C.char)(unsafe.Pointer(&bytes[0])), C.size_t(cap(bytes))))
+		if n != int(C.PN_EOS) {
+			if n < 0 {
+				err = errorf(pnErrorName(n))
+			} else {
+				bytes = bytes[0:n]
+			}
+			return
+		}
+		bytes = make([]byte, cap(bytes)*2)
+	}
+	return
+}
+
+// Encoder encodes AMQP values to an io.Writer
+type Encoder struct {
+	writer io.Writer
+	buffer []byte
+}
+
+// New encoder returns a new encoder that writes to w.
+func NewEncoder(w io.Writer) *Encoder {
+	return &Encoder{w, make([]byte, minEncode)}
+}
+
+func (e *Encoder) Encode(v interface{}) (err error) {
+	e.buffer, err = marshal(e.buffer, v)
+	if err == nil {
+		e.writer.Write(e.buffer)
+	}
+	return
+}
+
+func toPnBytes(b []byte) C.pn_bytes_t {
+	if len(b) == 0 {
+		return C.pn_bytes_t{0, nil}
+	} else {
+		return C.pn_bytes_t{C.size_t(len(b)), (*C.char)(unsafe.Pointer(&b[0]))}
+	}
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7e42628e/proton-c/bindings/go/src/apache.org/proton/types.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/apache.org/proton/types.go b/proton-c/bindings/go/src/apache.org/proton/types.go
index 042d9eb..84d0d90 100644
--- a/proton-c/bindings/go/src/apache.org/proton/types.go
+++ b/proton-c/bindings/go/src/apache.org/proton/types.go
@@ -26,42 +26,93 @@ import (
 	"reflect"
 )
 
-type pnGetter func(data *C.pn_data_t) reflect.Value
+type pnGet func(data *C.pn_data_t) reflect.Value
+type pnPut func(data *C.pn_data_t, v interface{})
 
+// Information about a proton type
 type pnType struct {
-	code   C.pn_type_t
-	name   string
-	getter pnGetter
+	code C.pn_type_t // AMQP type code
+	name string      // AMQP type name
+
+	// unmarshal data
+	get     pnGet        // pn_data_*_get function wrapper returning a reflect.Value
+	getType reflect.Type // go type for unmarshaling into a Value
+
+	// marshal data
+	put pnPut // pn_data_*_put function wrapper taking an interface{}
 }
 
-var pnTypes = map[C.pn_type_t]pnType{
-	C.PN_NULL:       {C.PN_NULL, "null", nil},
-	C.PN_BOOL:       {C.PN_BOOL, "bool", getPnBool},
-	C.PN_UBYTE:      {C.PN_UBYTE, "ubyte", getPnUbyte},
-	C.PN_BYTE:       {C.PN_BYTE, "byte", getPnByte},
-	C.PN_USHORT:     {C.PN_USHORT, "ushort", getPnUshort},
-	C.PN_SHORT:      {C.PN_SHORT, "short", getPnShort},
-	C.PN_UINT:       {C.PN_UINT, "uint", getPnUint},
-	C.PN_INT:        {C.PN_INT, "int", getPnInt},
-	C.PN_CHAR:       {C.PN_CHAR, "char", getPnChar},
-	C.PN_ULONG:      {C.PN_ULONG, "ulong", getPnUlong},
-	C.PN_LONG:       {C.PN_LONG, "long", getPnLong},
-	C.PN_TIMESTAMP:  {C.PN_TIMESTAMP, "timestamp", nil},
-	C.PN_FLOAT:      {C.PN_FLOAT, "float", getPnFloat},
-	C.PN_DOUBLE:     {C.PN_DOUBLE, "double", getPnDouble},
-	C.PN_DECIMAL32:  {C.PN_DECIMAL32, "decimal32", nil},
-	C.PN_DECIMAL64:  {C.PN_DECIMAL64, "decimal64", nil},
-	C.PN_DECIMAL128: {C.PN_DECIMAL128, "decimal128", nil},
-	C.PN_UUID:       {C.PN_UUID, "uuid", nil},
-	C.PN_BINARY:     {C.PN_BINARY, "binary", getPnBinary},
-	C.PN_STRING:     {C.PN_STRING, "string", getPnString},
-	C.PN_SYMBOL:     {C.PN_SYMBOL, "symbol", getPnSymbol},
-	C.PN_DESCRIBED:  {C.PN_DESCRIBED, "described", nil},
-	C.PN_ARRAY:      {C.PN_ARRAY, "array", nil},
-	C.PN_LIST:       {C.PN_LIST, "list", nil},
+func (pt *pnType) String() string { return pt.name }
+
+// pnType definitions for each proton type.
+var (
+	pnNull       = pnType{code: C.PN_NULL, name: "null"}
+	pnBool       = pnType{code: C.PN_BOOL, name: "bool"}
+	pnUbyte      = pnType{code: C.PN_UBYTE, name: "ubyte"}
+	pnByte       = pnType{code: C.PN_BYTE, name: "byte"}
+	pnUshort     = pnType{code: C.PN_USHORT, name: "ushort"}
+	pnShort      = pnType{code: C.PN_SHORT, name: "short"}
+	pnChar       = pnType{code: C.PN_CHAR, name: "char"}
+	pnUint       = pnType{code: C.PN_UINT, name: "uint"}
+	pnInt        = pnType{code: C.PN_INT, name: "int"}
+	pnUlong      = pnType{code: C.PN_ULONG, name: "ulong"}
+	pnLong       = pnType{code: C.PN_LONG, name: "long"}
+	pnTimestamp  = pnType{code: C.PN_TIMESTAMP, name: "timestamp"}
+	pnFloat      = pnType{code: C.PN_FLOAT, name: "float"}
+	pnDouble     = pnType{code: C.PN_DOUBLE, name: "double"}
+	pnDecimal32  = pnType{code: C.PN_DECIMAL32, name: "decimal32"}
+	pnDecimal64  = pnType{code: C.PN_DECIMAL64, name: "decimal64"}
+	pnDecimal128 = pnType{code: C.PN_DECIMAL128, name: "decimal128"}
+	pnUuid       = pnType{code: C.PN_UUID, name: "uuid"}
+	pnBinary     = pnType{code: C.PN_BINARY, name: "binary"}
+	pnString     = pnType{code: C.PN_STRING, name: "string"}
+	pnSymbol     = pnType{code: C.PN_SYMBOL, name: "symbol"}
+	pnDescribed  = pnType{code: C.PN_DESCRIBED, name: "described"}
+	pnArray      = pnType{code: C.PN_ARRAY, name: "array"}
+	pnList       = pnType{code: C.PN_LIST, name: "list"}
+	pnMap        = pnType{code: C.PN_MAP, name: "map"}
+)
+
+// Map from pn_type_t codes to pnType structs
+var pnTypes = map[C.pn_type_t]*pnType{
+	C.PN_NULL:       &pnNull,
+	C.PN_BOOL:       &pnBool,
+	C.PN_UBYTE:      &pnUbyte,
+	C.PN_BYTE:       &pnByte,
+	C.PN_USHORT:     &pnUshort,
+	C.PN_SHORT:      &pnShort,
+	C.PN_UINT:       &pnUint,
+	C.PN_INT:        &pnInt,
+	C.PN_CHAR:       &pnChar,
+	C.PN_ULONG:      &pnUlong,
+	C.PN_LONG:       &pnLong,
+	C.PN_TIMESTAMP:  &pnTimestamp,
+	C.PN_FLOAT:      &pnFloat,
+	C.PN_DOUBLE:     &pnDouble,
+	C.PN_DECIMAL32:  &pnDecimal32,
+	C.PN_DECIMAL64:  &pnDecimal64,
+	C.PN_DECIMAL128: &pnDecimal128,
+	C.PN_UUID:       &pnUuid,
+	C.PN_BINARY:     &pnBinary,
+	C.PN_STRING:     &pnString,
+	C.PN_SYMBOL:     &pnSymbol,
+	C.PN_DESCRIBED:  &pnDescribed,
+	C.PN_ARRAY:      &pnArray,
+	C.PN_LIST:       &pnList,
+	C.PN_MAP:        &pnMap,
 }
 
-func pnTypeName(t C.pn_type_t) string {
-	name := pnTypes[t].name
-	return nonBlank(name, "unknown type")
+// Get the pnType for a pn_type_t code, panic if unknown code.
+func getPnType(code C.pn_type_t) *pnType {
+	pt, ok := pnTypes[code]
+	if !ok {
+		panic(errorf("unknown AMQP type code %v", code))
+	}
+	return pt
 }
+
+// Go types
+var (
+	bytesType = reflect.TypeOf([]byte{})
+	valueType = reflect.TypeOf(reflect.Value{})
+)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7e42628e/proton-c/bindings/go/src/apache.org/proton/unmarshal.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/apache.org/proton/unmarshal.go b/proton-c/bindings/go/src/apache.org/proton/unmarshal.go
new file mode 100644
index 0000000..4255b28
--- /dev/null
+++ b/proton-c/bindings/go/src/apache.org/proton/unmarshal.go
@@ -0,0 +1,441 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+oor more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package proton
+
+// #include <proton/codec.h>
+import "C"
+
+import (
+	"bytes"
+	"io"
+	"reflect"
+	"unsafe"
+)
+
+const minDecode = 256
+
+//
+// Decoding from a pn_data_t
+//
+// NOTE: we use panic() to signal a decoding error, simplifies decoding logic.
+// We recover() at the highest possible level - i.e. in the exported Unmarshal or Decode.
+//
+
+// Decoder decodes AMQP values from an io.Reader.
+//
+type Decoder struct {
+	reader io.Reader
+	buffer bytes.Buffer
+}
+
+// NewDecoder returns a new decoder that reads from r.
+//
+// The decoder has it's own buffer and may read more data than required for the
+// AMQP values requested.  Use Buffered to see if there is data left in the
+// buffer.
+//
+func NewDecoder(r io.Reader) *Decoder {
+	return &Decoder{r, bytes.Buffer{}}
+}
+
+// Buffered returns a reader of the data remaining in the Decoder's buffer. The
+// reader is valid until the next call to Decode.
+//
+func (d *Decoder) Buffered() io.Reader {
+	return bytes.NewReader(d.buffer.Bytes())
+}
+
+// Decode reads the next AMQP value from the Reader and stores it in the value pointed to by v.
+//
+// See the documentation for Unmarshal for details about the conversion of AMQP into a Go value.
+//
+func (d *Decoder) Decode(v interface{}) (err error) {
+	defer func() {
+		if x := recover(); x != nil {
+			err = errorf("%v", x)
+		}
+	}()
+	data := C.pn_data(0)
+	defer C.pn_data_free(data)
+	var n int
+	for n == 0 && err == nil {
+		n = unmarshal(data, d.buffer.Bytes(), v)
+		if n == 0 { // n == 0 means not enough data, read more
+			err = d.more()
+			if err != nil {
+				return
+			}
+		}
+	}
+	d.buffer.Next(n)
+	return
+}
+
+/*
+Unmarshal decodes AMQP-encoded bytes and stores the result in the value pointed to by v.
+
+Go types that can be unmarshalled from AMQP types
+
+bool from AMQP bool
+
+int, int8, int16, int32, int64 from equivalent or smaller AMQP signed integer type or char
+
+uint, uint8, uint16, uint32, uint64 types from equivalent or smaller AMQP unsigned integer type or char
+
+float32, float64 from equivalent or smaller AMQP float type.
+
+string, []byte from AMQP string, symbol or binary.
+
+TODO types
+
+AMQP from AMQP null, char, timestamp, decimal32/64/128, uuid, described, array, list, map
+
+Go: array, slice, struct, map, reflect/Value
+
+Go types that cannot be unmarshalled
+
+complex64/128, uintptr, function, interface, channel
+*/
+func Unmarshal(bytes []byte, v interface{}) (n int, err error) {
+	defer func() {
+		if x := recover(); x != nil {
+			err = errorf("%v", x)
+		}
+	}()
+	data := C.pn_data(0)
+	defer C.pn_data_free(data)
+	n = unmarshal(data, bytes, v)
+	if n == 0 {
+		err = errorf("not enough data")
+	}
+	return
+}
+
+// more reads more data when we can't parse a complete AMQP type
+func (d *Decoder) more() error {
+	var readSize int64 = minDecode
+	if int64(d.buffer.Len()) > readSize { // Grow by doubling
+		readSize = int64(d.buffer.Len())
+	}
+	var n int64
+	n, err := d.buffer.ReadFrom(io.LimitReader(d.reader, readSize))
+	if n == 0 && err == nil { // ReadFrom won't report io.EOF, just returns 0
+		err = io.EOF
+	}
+	return err
+}
+
+// unmarshal decodes from bytes and converts into the value pointed to by v.
+// Used by Unmarshal and Decode
+//
+// Returns the number of bytes decoded or 0 if not enough data.
+//
+func unmarshal(data *C.pn_data_t, bytes []byte, v interface{}) (n int) {
+	n = decode(data, bytes)
+	if n == 0 {
+		return 0
+	}
+	pnType := C.pn_data_type(data)
+
+	badUnmarshal := func() error {
+		notPtr := ""
+		if reflect.TypeOf(v).Kind() != reflect.Ptr {
+			notPtr = ", target is not a pointer"
+		}
+		return errorf("cannot unmarshal AMQP %s to %s%s", getPnType(pnType), reflect.TypeOf(v), notPtr)
+	}
+
+	switch v := v.(type) {
+	case *bool:
+		switch pnType {
+		case C.PN_BOOL:
+			*v = bool(C.pn_data_get_bool(data))
+		default:
+			panic(badUnmarshal())
+		}
+	case *int8:
+		switch pnType {
+		case C.PN_CHAR:
+			*v = int8(C.pn_data_get_char(data))
+		case C.PN_BYTE:
+			*v = int8(C.pn_data_get_byte(data))
+		default:
+			panic(badUnmarshal())
+		}
+	case *uint8:
+		switch pnType {
+		case C.PN_CHAR:
+			*v = uint8(C.pn_data_get_char(data))
+		case C.PN_UBYTE:
+			*v = uint8(C.pn_data_get_ubyte(data))
+		default:
+			panic(badUnmarshal())
+		}
+	case *int16:
+		switch pnType {
+		case C.PN_CHAR:
+			*v = int16(C.pn_data_get_char(data))
+		case C.PN_BYTE:
+			*v = int16(C.pn_data_get_byte(data))
+		case C.PN_SHORT:
+			*v = int16(C.pn_data_get_short(data))
+		default:
+			panic(badUnmarshal())
+		}
+	case *uint16:
+		switch pnType {
+		case C.PN_CHAR:
+			*v = uint16(C.pn_data_get_char(data))
+		case C.PN_UBYTE:
+			*v = uint16(C.pn_data_get_ubyte(data))
+		case C.PN_USHORT:
+			*v = uint16(C.pn_data_get_ushort(data))
+		default:
+			panic(badUnmarshal())
+		}
+	case *int32:
+		switch pnType {
+		case C.PN_CHAR:
+			*v = int32(C.pn_data_get_char(data))
+		case C.PN_BYTE:
+			*v = int32(C.pn_data_get_byte(data))
+		case C.PN_SHORT:
+			*v = int32(C.pn_data_get_short(data))
+		case C.PN_INT:
+			*v = int32(C.pn_data_get_int(data))
+		default:
+			panic(badUnmarshal())
+		}
+	case *uint32:
+		switch pnType {
+		case C.PN_CHAR:
+			*v = uint32(C.pn_data_get_char(data))
+		case C.PN_UBYTE:
+			*v = uint32(C.pn_data_get_ubyte(data))
+		case C.PN_USHORT:
+			*v = uint32(C.pn_data_get_ushort(data))
+		case C.PN_UINT:
+			*v = uint32(C.pn_data_get_uint(data))
+		default:
+			panic(badUnmarshal())
+		}
+
+	case *int64:
+		switch pnType {
+		case C.PN_CHAR:
+			*v = int64(C.pn_data_get_char(data))
+		case C.PN_BYTE:
+			*v = int64(C.pn_data_get_byte(data))
+		case C.PN_SHORT:
+			*v = int64(C.pn_data_get_short(data))
+		case C.PN_INT:
+			*v = int64(C.pn_data_get_int(data))
+		case C.PN_LONG:
+			*v = int64(C.pn_data_get_long(data))
+		default:
+			panic(badUnmarshal())
+		}
+
+	case *uint64:
+		switch pnType {
+		case C.PN_CHAR:
+			*v = uint64(C.pn_data_get_char(data))
+		case C.PN_UBYTE:
+			*v = uint64(C.pn_data_get_ubyte(data))
+		case C.PN_USHORT:
+			*v = uint64(C.pn_data_get_ushort(data))
+		case C.PN_ULONG:
+			*v = uint64(C.pn_data_get_ulong(data))
+		default:
+			panic(badUnmarshal())
+		}
+
+	case *int:
+		switch pnType {
+		case C.PN_CHAR:
+			*v = int(C.pn_data_get_char(data))
+		case C.PN_BYTE:
+			*v = int(C.pn_data_get_byte(data))
+		case C.PN_SHORT:
+			*v = int(C.pn_data_get_short(data))
+		case C.PN_INT:
+			*v = int(C.pn_data_get_int(data))
+		case C.PN_LONG:
+			if unsafe.Sizeof(0) == 8 {
+				*v = int(C.pn_data_get_long(data))
+			} else {
+				panic(badUnmarshal())
+			}
+		default:
+			panic(badUnmarshal())
+		}
+
+	case *uint:
+		switch pnType {
+		case C.PN_CHAR:
+			*v = uint(C.pn_data_get_char(data))
+		case C.PN_UBYTE:
+			*v = uint(C.pn_data_get_ubyte(data))
+		case C.PN_USHORT:
+			*v = uint(C.pn_data_get_ushort(data))
+		case C.PN_UINT:
+			*v = uint(C.pn_data_get_uint(data))
+		case C.PN_ULONG:
+			if unsafe.Sizeof(0) == 8 {
+				*v = uint(C.pn_data_get_ulong(data))
+			} else {
+				panic(badUnmarshal())
+			}
+		default:
+			panic(badUnmarshal())
+		}
+
+	case *float32:
+		switch pnType {
+		case C.PN_FLOAT:
+			*v = float32(C.pn_data_get_float(data))
+		default:
+			panic(badUnmarshal())
+		}
+
+	case *float64:
+		switch pnType {
+		case C.PN_FLOAT:
+			*v = float64(C.pn_data_get_float(data))
+		case C.PN_DOUBLE:
+			*v = float64(C.pn_data_get_double(data))
+		default:
+			panic(badUnmarshal())
+		}
+
+	case *string:
+		switch pnType {
+		case C.PN_STRING:
+			*v = goString(C.pn_data_get_string(data))
+		case C.PN_SYMBOL:
+			*v = goString(C.pn_data_get_symbol(data))
+		case C.PN_BINARY:
+			*v = goString(C.pn_data_get_binary(data))
+		default:
+			panic(badUnmarshal())
+		}
+
+	case *[]byte:
+		switch pnType {
+		case C.PN_STRING:
+			*v = goBytes(C.pn_data_get_string(data))
+		case C.PN_SYMBOL:
+			*v = goBytes(C.pn_data_get_symbol(data))
+		case C.PN_BINARY:
+			*v = goBytes(C.pn_data_get_binary(data))
+		default:
+			panic(badUnmarshal())
+		}
+
+	case *reflect.Value:
+		switch pnType {
+		case C.PN_BOOL:
+			*v = reflect.ValueOf(bool(C.pn_data_get_bool(data)))
+		case C.PN_UBYTE:
+			*v = reflect.ValueOf(uint8(C.pn_data_get_ubyte(data)))
+		case C.PN_BYTE:
+			*v = reflect.ValueOf(int8(C.pn_data_get_byte(data)))
+		case C.PN_USHORT:
+			*v = reflect.ValueOf(uint16(C.pn_data_get_ushort(data)))
+		case C.PN_SHORT:
+			*v = reflect.ValueOf(int16(C.pn_data_get_short(data)))
+		case C.PN_UINT:
+			*v = reflect.ValueOf(uint32(C.pn_data_get_uint(data)))
+		case C.PN_INT:
+			*v = reflect.ValueOf(int32(C.pn_data_get_int(data)))
+		case C.PN_CHAR:
+			*v = reflect.ValueOf(uint8(C.pn_data_get_char(data)))
+		case C.PN_ULONG:
+			*v = reflect.ValueOf(uint64(C.pn_data_get_ulong(data)))
+		case C.PN_LONG:
+			*v = reflect.ValueOf(int64(C.pn_data_get_long(data)))
+		case C.PN_FLOAT:
+			*v = reflect.ValueOf(float32(C.pn_data_get_float(data)))
+		case C.PN_DOUBLE:
+			*v = reflect.ValueOf(float64(C.pn_data_get_double(data)))
+		case C.PN_BINARY:
+			*v = valueOfBytes(C.pn_data_get_binary(data))
+		case C.PN_STRING:
+			*v = valueOfString(C.pn_data_get_string(data))
+		case C.PN_SYMBOL:
+			*v = valueOfString(C.pn_data_get_symbol(data))
+		default:
+			panic(badUnmarshal())
+		}
+
+	default:
+		panic(badUnmarshal())
+	}
+	err := pnDataError(data)
+	if err != "" {
+		panic(errorf("unmarshal %s", err))
+	}
+	return
+}
+
+// decode from bytes.
+// Return bytes decoded or 0 if we could not decode a complete object.
+//
+func decode(data *C.pn_data_t, bytes []byte) int {
+	if len(bytes) == 0 {
+		return 0
+	}
+	cBuf := (*C.char)(unsafe.Pointer(&bytes[0]))
+	n := int(C.pn_data_decode(data, cBuf, C.size_t(len(bytes))))
+	if n == int(C.PN_EOS) {
+		return 0
+	} else if n <= 0 {
+		panic(errorf("unmarshal %s", pnErrorName(n)))
+	}
+	return n
+}
+
+func valueOfBytes(bytes C.pn_bytes_t) reflect.Value {
+	if bytes.start == nil || bytes.size == 0 {
+		return reflect.ValueOf([]byte{})
+	}
+	return reflect.ValueOf(C.GoBytes(unsafe.Pointer(bytes.start), C.int(bytes.size)))
+}
+
+func valueOfString(bytes C.pn_bytes_t) reflect.Value {
+	if bytes.start == nil || bytes.size == 0 {
+		return reflect.ValueOf("")
+	}
+	return reflect.ValueOf(C.GoStringN(bytes.start, C.int(bytes.size)))
+}
+
+func goBytes(cBytes C.pn_bytes_t) (bytes []byte) {
+	if cBytes.start != nil {
+		bytes = C.GoBytes(unsafe.Pointer(cBytes.start), C.int(cBytes.size))
+	}
+	return
+}
+
+func goString(cBytes C.pn_bytes_t) (str string) {
+	if cBytes.start != nil {
+		str = C.GoStringN(cBytes.start, C.int(cBytes.size))
+	}
+	return
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7e42628e/proton-c/bindings/go/src/apache.org/proton/url.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/apache.org/proton/url.go b/proton-c/bindings/go/src/apache.org/proton/url.go
index 1511203..079dfb2 100644
--- a/proton-c/bindings/go/src/apache.org/proton/url.go
+++ b/proton-c/bindings/go/src/apache.org/proton/url.go
@@ -40,8 +40,8 @@ import (
 )
 
 const (
-	AMQP  string = "amqp"
-	AMQPS        = "amqps"
+	amqp  string = "amqp"
+	amqps        = "amqps"
 )
 
 // ParseUrl parses an AMQP URL string and returns a net/url.Url.
@@ -69,13 +69,13 @@ func ParseURL(s string) (u *url.URL, err error) {
 		return nil, errorf("bad URL %#v: %s", s, err)
 	}
 	if scheme == "" {
-		scheme = AMQP
+		scheme = amqp
 	}
 	if port == "" {
-		if scheme == AMQPS {
-			port = AMQPS
+		if scheme == amqps {
+			port = amqps
 		} else {
-			port = AMQP
+			port = amqp
 		}
 	}
 	var user *url.Userinfo

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7e42628e/proton-c/src/codec/encoder.c
----------------------------------------------------------------------
diff --git a/proton-c/src/codec/encoder.c b/proton-c/src/codec/encoder.c
index fd126be..2b9218f 100644
--- a/proton-c/src/codec/encoder.c
+++ b/proton-c/src/codec/encoder.c
@@ -152,7 +152,7 @@ static uint8_t pn_node2code(pn_encoder_t *encoder, pni_node_t *node)
 
 static size_t pn_encoder_remaining(pn_encoder_t *encoder)
 {
-  return encoder->output + encoder->size - encoder->position;
+  return encoder->output ? (encoder->output + encoder->size - encoder->position) : 0;
 }
 
 static inline int pn_encoder_writef8(pn_encoder_t *encoder, uint8_t value)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org