You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2015/03/12 19:19:18 UTC
qpid-proton git commit: PROTON-827: Marshal and unmarshal all basic
types and reflect.Value
Repository: qpid-proton
Updated Branches:
refs/heads/master 4653cdc6f -> 7e42628ed
PROTON-827: Marshal and unmarshal all basic types and reflect.Value
Using a type-switch approach, slightly (about 10%) faster than a previous reflection approach.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/7e42628e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/7e42628e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/7e42628e
Branch: refs/heads/master
Commit: 7e42628edb5c7d6cadc5fad1d5299aed15e11d38
Parents: 4653cdc
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Mar 10 13:44:43 2015 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Mar 12 14:18:13 2015 -0400
----------------------------------------------------------------------
.../go/src/apache.org/proton/encoding.go | 220 ---------
.../bindings/go/src/apache.org/proton/error.go | 9 +
.../go/src/apache.org/proton/interop_test.go | 139 +++++-
.../go/src/apache.org/proton/marshal.go | 157 +++++++
.../bindings/go/src/apache.org/proton/types.go | 115 +++--
.../go/src/apache.org/proton/unmarshal.go | 441 +++++++++++++++++++
.../bindings/go/src/apache.org/proton/url.go | 12 +-
proton-c/src/codec/encoder.c | 2 +-
8 files changed, 829 insertions(+), 266 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7e42628e/proton-c/bindings/go/src/apache.org/proton/encoding.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/apache.org/proton/encoding.go b/proton-c/bindings/go/src/apache.org/proton/encoding.go
deleted file mode 100644
index a781a68..0000000
--- a/proton-c/bindings/go/src/apache.org/proton/encoding.go
+++ /dev/null
@@ -1,220 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-oor more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package proton
-
-// #include <proton/codec.h>
-import "C"
-
-import (
- "bytes"
- "io"
- "reflect"
- "unsafe"
-)
-
-//
-// Decoding from a pn_data_t
-//
-// NOTE: we use panic() to signal a decoding error, simplifies decoding logic.
-// We recover() at the highest possible level - i.e. in the exported Unmarshal or Decode.
-//
-
-// Decoder decodes AMQP values from an io.Reader.
-//
-type Decoder struct {
- reader io.Reader
- buffer bytes.Buffer
-}
-
-// NewDecoder returns a new decoder that reads from r.
-//
-// The decoder has it's own buffer and may read more data than required for the
-// AMQP values requested. Use Buffered to see if there is data left in the
-// buffer.
-//
-func NewDecoder(r io.Reader) *Decoder {
- return &Decoder{r, bytes.Buffer{}}
-}
-
-// Buffered returns a reader of the data remaining in the Decoder's buffer. The
-// reader is valid until the next call to Decode.
-//
-func (d *Decoder) Buffered() io.Reader {
- return bytes.NewReader(d.buffer.Bytes())
-}
-
-// Decode reads the next AMQP value from the Reader and stores it in the value pointed to by v.
-//
-// See the documentation for Unmarshal for details about the conversion of AMQP into a Go value.
-//
-func (d *Decoder) Decode(v interface{}) (err error) {
- defer func() {
- if x := recover(); x != nil {
- err = errorf("%v", x)
- }
- }()
- data := C.pn_data(0)
- defer C.pn_data_free(data)
-
- var n int
- for n == 0 {
- n = unmarshal(data, d.buffer.Bytes(), v)
- if n == 0 { // n == 0 means not enough data, read more
- err = d.more()
- }
- }
- d.buffer.Next(n)
- return
-}
-
-/*
-Unmarshal decodes AMQP-encoded bytes and stores the result in the value pointed to by v.
-
-Go types that can be unmarshalled from AMQP types
-
-bool from AMQP bool
-
-int8, int16, int32, int64 from equivalent or smaller AMQP signed integer type.
-
-uint8, uint16, uint32, uint64 types from equivalent or smaller AMQP unsigned integer type.
-
-float32, float64 from equivalent or smaller AMQP float type.
-
-string, []byte from AMQP string, symbol or binary.
-
-TODO types
-
-AMQP from AMQP null, char, timestamp, decimal32/64/128, uuid, described, array, list, map
-
-Go: uint, int, array, slice, struct, map, reflect/Value
-
-Go types that cannot be unmarshalled
-
-complex64/128, uintptr, function, interface, channel
-*/
-func Unmarshal(bytes []byte, v interface{}) (n int, err error) {
- defer func() {
- if x := recover(); x != nil {
- err = errorf("%v", x)
- }
- }()
- data := C.pn_data(0)
- defer C.pn_data_free(data)
- n = unmarshal(data, bytes, v)
- if n == 0 {
- err = errorf("not enough data")
- }
- return
-}
-
-func pnDataError(data *C.pn_data_t) (code int, msg string) {
- pnError := C.pn_data_error(data)
- return int(C.pn_error_code(pnError)), C.GoString(C.pn_error_text(pnError))
-}
-
-// decode from bytes.
-// Return bytes decoded or 0 if we could not decode a complete object.
-//
-func decode(data *C.pn_data_t, bytes []byte) int {
- if len(bytes) == 0 {
- return 0
- }
- cBuf := (*C.char)(unsafe.Pointer(&bytes[0]))
- n := int(C.pn_data_decode(data, cBuf, C.size_t(len(bytes))))
- if n == int(C.PN_EOS) {
- return 0
- } else if n <= 0 {
- panic(errorf("unmarshal %s", pnErrorName(n)))
- }
- return n
-}
-
-// more reads more data when we can't parse a complete AMQP type
-func (d *Decoder) more() error {
- var readSize int64 = 256
- if int64(d.buffer.Len()) > readSize { // Grow by doubling
- readSize = int64(d.buffer.Len())
- }
- var n int64
- n, err := d.buffer.ReadFrom(&io.LimitedReader{d.reader, readSize})
- if n == 0 { // ReadFrom won't report io.EOF, just returns 0
- if err != nil {
- panic(err)
- } else {
- panic("not enough data")
- }
- }
- return nil
-}
-
-// unmarshal decodes from bytes and converts into the value pointed to by v.
-// Used by Unmarshal and Decode
-//
-// Returns the number of bytes decoded or 0 if not enough data.
-//
-func unmarshal(data *C.pn_data_t, bytes []byte, v interface{}) (n int) {
- n = decode(data, bytes)
- if n == 0 {
- return 0
- }
- ptrValue := reflect.ValueOf(v)
- if ptrValue.Type().Kind() != reflect.Ptr {
- panic(errorf("cannot unmarshal to %T, not a pointer", v))
- }
- value := ptrValue.Elem()
- pnType, ok := pnTypes[C.pn_data_type(data)]
- if !ok {
- panic(errorf("unknown AMQP type code %v", C.pn_data_type(data)))
- }
- if pnType.getter == nil {
- panic(errorf("cannot unmarshal AMQP type %s", pnType.name))
- }
- decoded := pnType.getter(data)
- if !decoded.Type().ConvertibleTo(value.Type()) {
- panic(errorf("cannot unmarshal AMQP %s to %s", pnType.name, value.Type()))
- }
- converted := decoded.Convert(value.Type())
- value.Set(converted)
- return
-}
-
-func bytesValue(bytes C.pn_bytes_t) reflect.Value {
- if bytes.start == nil || bytes.size == 0 {
- return reflect.ValueOf([]byte{})
- }
- return reflect.ValueOf(C.GoBytes(unsafe.Pointer(bytes.start), C.int(bytes.size)))
-}
-
-// Get functions to convert PN data to reflect.Value
-func getPnString(data *C.pn_data_t) reflect.Value { return bytesValue(C.pn_data_get_string(data)) }
-func getPnSymbol(data *C.pn_data_t) reflect.Value { return bytesValue(C.pn_data_get_symbol(data)) }
-func getPnBinary(data *C.pn_data_t) reflect.Value { return bytesValue(C.pn_data_get_binary(data)) }
-func getPnBool(data *C.pn_data_t) reflect.Value { return reflect.ValueOf(C.pn_data_get_bool(data)) }
-func getPnByte(data *C.pn_data_t) reflect.Value { return reflect.ValueOf(C.pn_data_get_byte(data)) }
-func getPnChar(data *C.pn_data_t) reflect.Value { return reflect.ValueOf(C.pn_data_get_char(data)) }
-func getPnShort(data *C.pn_data_t) reflect.Value { return reflect.ValueOf(C.pn_data_get_short(data)) }
-func getPnInt(data *C.pn_data_t) reflect.Value { return reflect.ValueOf(C.pn_data_get_int(data)) }
-func getPnLong(data *C.pn_data_t) reflect.Value { return reflect.ValueOf(C.pn_data_get_long(data)) }
-func getPnUbyte(data *C.pn_data_t) reflect.Value { return reflect.ValueOf(C.pn_data_get_ubyte(data)) }
-func getPnUshort(data *C.pn_data_t) reflect.Value { return reflect.ValueOf(C.pn_data_get_ushort(data)) }
-func getPnUint(data *C.pn_data_t) reflect.Value { return reflect.ValueOf(C.pn_data_get_uint(data)) }
-func getPnUlong(data *C.pn_data_t) reflect.Value { return reflect.ValueOf(C.pn_data_get_ulong(data)) }
-func getPnFloat(data *C.pn_data_t) reflect.Value { return reflect.ValueOf(C.pn_data_get_float(data)) }
-func getPnDouble(data *C.pn_data_t) reflect.Value { return reflect.ValueOf(C.pn_data_get_double(data)) }
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7e42628e/proton-c/bindings/go/src/apache.org/proton/error.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/apache.org/proton/error.go b/proton-c/bindings/go/src/apache.org/proton/error.go
index 6875042..63426de 100644
--- a/proton-c/bindings/go/src/apache.org/proton/error.go
+++ b/proton-c/bindings/go/src/apache.org/proton/error.go
@@ -20,6 +20,7 @@ under the License.
package proton
// #include <proton/error.h>
+// #include <proton/codec.h>
import "C"
import (
@@ -66,3 +67,11 @@ func (err pnError) Error() string { return fmt.Sprintf("proton: %s", string(err
func errorf(format string, a ...interface{}) error {
return pnError(fmt.Sprintf(format, a...))
}
+
+func pnDataError(data *C.pn_data_t) string {
+ err := C.pn_data_error(data)
+ if err != nil && int(C.pn_error_code(err)) != 0 {
+ return C.GoString(C.pn_error_text(err))
+ }
+ return ""
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7e42628e/proton-c/bindings/go/src/apache.org/proton/interop_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/apache.org/proton/interop_test.go b/proton-c/bindings/go/src/apache.org/proton/interop_test.go
index 35e39f0..5333055 100644
--- a/proton-c/bindings/go/src/apache.org/proton/interop_test.go
+++ b/proton-c/bindings/go/src/apache.org/proton/interop_test.go
@@ -35,6 +35,18 @@ import (
"testing"
)
+func assertEqual(want interface{}, got interface{}) {
+ if !reflect.DeepEqual(want, got) {
+ panic(errorf("%T(%v) != %T(%v)", want, want, got, got))
+ }
+}
+
+func assertNil(err interface{}) {
+ if err != nil {
+ panic(err)
+ }
+}
+
func getReader(name string) (r io.Reader) {
r, err := os.Open("../../../../../../tests/interop/" + name + ".amqp")
if err != nil {
@@ -51,14 +63,20 @@ func remaining(d *Decoder) string {
// assertDecode: want is the expected value, gotPtr is a pointer to a
// instance of the same type for Decode.
func assertDecode(d *Decoder, want interface{}, gotPtr interface{}) {
- err := d.Decode(gotPtr)
- if err != nil {
- panic(err)
- }
+
+ assertNil(d.Decode(gotPtr))
+
got := reflect.ValueOf(gotPtr).Elem().Interface()
- if !reflect.DeepEqual(want, got) {
- panic(errorf("%T(%#v) != %T(%#v)", want, want, got, got))
- }
+ assertEqual(want, got)
+
+ // Try round trip encoding
+ bytes, err := Marshal(want)
+ assertNil(err)
+ n, err := Unmarshal(bytes, gotPtr)
+ assertNil(err)
+ assertEqual(n, len(bytes))
+ got = reflect.ValueOf(gotPtr).Elem().Interface()
+ assertEqual(want, got)
}
func TestUnmarshal(t *testing.T) {
@@ -125,6 +143,41 @@ func TestPrimitivesCompatible(t *testing.T) {
assertDecode(d, 0.125, &f)
}
+// assertDecodeValue: want is the expected value, decode into a reflect.Value
+func assertDecodeValue(d *Decoder, want interface{}) {
+
+ var v reflect.Value
+ assertNil(d.Decode(&v))
+
+ got := v.Interface()
+ assertEqual(want, got)
+
+ // Try round trip encoding
+ bytes, err := Marshal(v)
+ assertNil(err)
+ n, err := Unmarshal(bytes, &v)
+ assertNil(err)
+ assertEqual(n, len(bytes))
+ got = v.Interface()
+ assertEqual(want, got)
+}
+
+func TestPrimitivesValue(t *testing.T) {
+ d := NewDecoder(getReader("primitives"))
+ // Decoding into reflect.Value
+ assertDecodeValue(d, true)
+ assertDecodeValue(d, false)
+ assertDecodeValue(d, uint8(42))
+ assertDecodeValue(d, uint16(42))
+ assertDecodeValue(d, int16(-42))
+ assertDecodeValue(d, uint32(12345))
+ assertDecodeValue(d, int32(-12345))
+ assertDecodeValue(d, uint64(12345))
+ assertDecodeValue(d, int64(-12345))
+ assertDecodeValue(d, float32(0.125))
+ assertDecodeValue(d, float64(0.125))
+}
+
func TestStrings(t *testing.T) {
d := NewDecoder(getReader("strings"))
// Test decoding as plain Go strings
@@ -166,6 +219,42 @@ func TestStrings(t *testing.T) {
}
}
+func TestEncodeDecode(t *testing.T) {
+ type data struct {
+ s string
+ i int
+ u8 uint8
+ b bool
+ f float32
+ }
+
+ in := data{"foo", 42, 9, true, 1.234}
+
+ buf := bytes.Buffer{}
+ e := NewEncoder(&buf)
+ e.Encode(in.s)
+ e.Encode(in.i)
+ e.Encode(in.u8)
+ e.Encode(in.b)
+ e.Encode(in.f)
+
+ var out data
+ d := NewDecoder(&buf)
+ d.Decode(&out.s)
+ d.Decode(&out.i)
+ d.Decode(&out.u8)
+ d.Decode(&out.b)
+ d.Decode(&out.f)
+
+ assertEqual(in, out)
+
+ vIn := reflect.ValueOf("thing")
+ e.Encode(vIn)
+ var vOut reflect.Value
+ d.Decode(&vOut)
+ assertEqual("thing", vOut.Interface())
+}
+
func BenchmarkDecode(b *testing.B) {
var buf bytes.Buffer
for _, f := range []string{"strings", "primitives"} {
@@ -220,3 +309,39 @@ func BenchmarkDecode(b *testing.B) {
d = NewDecoder(bytes.NewReader(buf.Bytes()))
}
}
+
+func BenchmarkEncode(b *testing.B) {
+
+ var buf bytes.Buffer
+ buf.Grow(10000) // Avoid buffer reallocation during benchmark
+ e := NewEncoder(&buf)
+ encode := func(v interface{}) {
+ err := e.Encode(v)
+ if err != nil {
+ panic(err)
+ }
+ }
+
+ for i := 0; i < b.N; i++ {
+ // strings
+ encode([]byte("foo"))
+ encode("foo")
+ encode("bar")
+ encode([]byte(""))
+ encode("")
+ encode("")
+ // primitives
+ encode(true)
+ encode(false)
+ encode(uint8(42))
+ encode(int8(-42))
+ encode(uint16(12345))
+ encode(int16(-12345))
+ encode(uint32(123453245))
+ encode(int32(-123453245))
+ encode(uint64(123456445))
+ encode(int64(-123456445))
+ encode(float32(1.2345))
+ encode(float64(1.23456))
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7e42628e/proton-c/bindings/go/src/apache.org/proton/marshal.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/apache.org/proton/marshal.go b/proton-c/bindings/go/src/apache.org/proton/marshal.go
new file mode 100644
index 0000000..957e0cf
--- /dev/null
+++ b/proton-c/bindings/go/src/apache.org/proton/marshal.go
@@ -0,0 +1,157 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package proton
+
+// #include <proton/codec.h>
+import "C"
+
+import (
+ "io"
+ "reflect"
+ "unsafe"
+)
+
+const minEncode = 256
+
+/*
+Marshal encodes a value as AMQP.
+
+Go types are encoded as follows
+
+bool to AMQP bool.
+
+int, int8, int16, int32, int64 to equivalent AMQP signed integer type.
+
+uint, uint8, uint16, uint32, uint64 to equivalent or smaller AMQP unsigned integer type.
+
+float32, float64 to AMQP float or double.
+
+string to AMQP string.
+
+[]byte to AMQP binary.
+
+TODO types
+
+Go: array, slice, struct, map, reflect/Value
+
+Go types that cannot be marshaled
+
+complex64/128, uintptr, function, interface, channel
+
+*/
+func Marshal(v interface{}) (bytes []byte, err error) {
+ defer func() {
+ if x := recover(); x != nil {
+ err = errorf("%v", x)
+ }
+ }()
+ return marshal(make([]byte, minEncode), v)
+}
+
+func marshal(bytesIn []byte, v interface{}) (bytes []byte, err error) {
+ data := C.pn_data(0)
+ defer C.pn_data_free(data)
+ switch v := v.(type) {
+ case bool:
+ C.pn_data_put_bool(data, C.bool(v))
+ case int8:
+ C.pn_data_put_byte(data, C.int8_t(v))
+ case int16:
+ C.pn_data_put_short(data, C.int16_t(v))
+ case int32:
+ C.pn_data_put_int(data, C.int32_t(v))
+ case int64:
+ C.pn_data_put_long(data, C.int64_t(v))
+ case int:
+ if unsafe.Sizeof(0) == 8 {
+ C.pn_data_put_long(data, C.int64_t(v))
+ } else {
+ C.pn_data_put_int(data, C.int32_t(v))
+ }
+ case uint8:
+ C.pn_data_put_ubyte(data, C.uint8_t(v))
+ case uint16:
+ C.pn_data_put_ushort(data, C.uint16_t(v))
+ case uint32:
+ C.pn_data_put_uint(data, C.uint32_t(v))
+ case uint64:
+ C.pn_data_put_ulong(data, C.uint64_t(v))
+ case uint:
+ if unsafe.Sizeof(0) == 8 {
+ C.pn_data_put_ulong(data, C.uint64_t(v))
+ } else {
+ C.pn_data_put_uint(data, C.uint32_t(v))
+ }
+ case float32:
+ C.pn_data_put_float(data, C.float(v))
+ case float64:
+ C.pn_data_put_double(data, C.double(v))
+ case string:
+ C.pn_data_put_string(data, toPnBytes([]byte(v)))
+ case []byte:
+ C.pn_data_put_binary(data, toPnBytes(v))
+ case reflect.Value:
+ return marshal(bytesIn, v.Interface())
+ default:
+ panic(errorf("cannot marshal %s to AMQP", reflect.TypeOf(v)))
+ }
+ // FIXME aconway 2015-03-11: get size from proton.
+ bytes = bytesIn
+ for {
+ n := int(C.pn_data_encode(data, (*C.char)(unsafe.Pointer(&bytes[0])), C.size_t(cap(bytes))))
+ if n != int(C.PN_EOS) {
+ if n < 0 {
+ err = errorf(pnErrorName(n))
+ } else {
+ bytes = bytes[0:n]
+ }
+ return
+ }
+ bytes = make([]byte, cap(bytes)*2)
+ }
+ return
+}
+
+// Encoder encodes AMQP values to an io.Writer
+type Encoder struct {
+ writer io.Writer
+ buffer []byte
+}
+
+// New encoder returns a new encoder that writes to w.
+func NewEncoder(w io.Writer) *Encoder {
+ return &Encoder{w, make([]byte, minEncode)}
+}
+
+func (e *Encoder) Encode(v interface{}) (err error) {
+ e.buffer, err = marshal(e.buffer, v)
+ if err == nil {
+ e.writer.Write(e.buffer)
+ }
+ return
+}
+
+func toPnBytes(b []byte) C.pn_bytes_t {
+ if len(b) == 0 {
+ return C.pn_bytes_t{0, nil}
+ } else {
+ return C.pn_bytes_t{C.size_t(len(b)), (*C.char)(unsafe.Pointer(&b[0]))}
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7e42628e/proton-c/bindings/go/src/apache.org/proton/types.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/apache.org/proton/types.go b/proton-c/bindings/go/src/apache.org/proton/types.go
index 042d9eb..84d0d90 100644
--- a/proton-c/bindings/go/src/apache.org/proton/types.go
+++ b/proton-c/bindings/go/src/apache.org/proton/types.go
@@ -26,42 +26,93 @@ import (
"reflect"
)
-type pnGetter func(data *C.pn_data_t) reflect.Value
+type pnGet func(data *C.pn_data_t) reflect.Value
+type pnPut func(data *C.pn_data_t, v interface{})
+// Information about a proton type
type pnType struct {
- code C.pn_type_t
- name string
- getter pnGetter
+ code C.pn_type_t // AMQP type code
+ name string // AMQP type name
+
+ // unmarshal data
+ get pnGet // pn_data_*_get function wrapper returning a reflect.Value
+ getType reflect.Type // go type for unmarshaling into a Value
+
+ // marshal data
+ put pnPut // pn_data_*_put function wrapper taking an interface{}
}
-var pnTypes = map[C.pn_type_t]pnType{
- C.PN_NULL: {C.PN_NULL, "null", nil},
- C.PN_BOOL: {C.PN_BOOL, "bool", getPnBool},
- C.PN_UBYTE: {C.PN_UBYTE, "ubyte", getPnUbyte},
- C.PN_BYTE: {C.PN_BYTE, "byte", getPnByte},
- C.PN_USHORT: {C.PN_USHORT, "ushort", getPnUshort},
- C.PN_SHORT: {C.PN_SHORT, "short", getPnShort},
- C.PN_UINT: {C.PN_UINT, "uint", getPnUint},
- C.PN_INT: {C.PN_INT, "int", getPnInt},
- C.PN_CHAR: {C.PN_CHAR, "char", getPnChar},
- C.PN_ULONG: {C.PN_ULONG, "ulong", getPnUlong},
- C.PN_LONG: {C.PN_LONG, "long", getPnLong},
- C.PN_TIMESTAMP: {C.PN_TIMESTAMP, "timestamp", nil},
- C.PN_FLOAT: {C.PN_FLOAT, "float", getPnFloat},
- C.PN_DOUBLE: {C.PN_DOUBLE, "double", getPnDouble},
- C.PN_DECIMAL32: {C.PN_DECIMAL32, "decimal32", nil},
- C.PN_DECIMAL64: {C.PN_DECIMAL64, "decimal64", nil},
- C.PN_DECIMAL128: {C.PN_DECIMAL128, "decimal128", nil},
- C.PN_UUID: {C.PN_UUID, "uuid", nil},
- C.PN_BINARY: {C.PN_BINARY, "binary", getPnBinary},
- C.PN_STRING: {C.PN_STRING, "string", getPnString},
- C.PN_SYMBOL: {C.PN_SYMBOL, "symbol", getPnSymbol},
- C.PN_DESCRIBED: {C.PN_DESCRIBED, "described", nil},
- C.PN_ARRAY: {C.PN_ARRAY, "array", nil},
- C.PN_LIST: {C.PN_LIST, "list", nil},
+func (pt *pnType) String() string { return pt.name }
+
+// pnType definitions for each proton type.
+var (
+ pnNull = pnType{code: C.PN_NULL, name: "null"}
+ pnBool = pnType{code: C.PN_BOOL, name: "bool"}
+ pnUbyte = pnType{code: C.PN_UBYTE, name: "ubyte"}
+ pnByte = pnType{code: C.PN_BYTE, name: "byte"}
+ pnUshort = pnType{code: C.PN_USHORT, name: "ushort"}
+ pnShort = pnType{code: C.PN_SHORT, name: "short"}
+ pnChar = pnType{code: C.PN_CHAR, name: "char"}
+ pnUint = pnType{code: C.PN_UINT, name: "uint"}
+ pnInt = pnType{code: C.PN_INT, name: "int"}
+ pnUlong = pnType{code: C.PN_ULONG, name: "ulong"}
+ pnLong = pnType{code: C.PN_LONG, name: "long"}
+ pnTimestamp = pnType{code: C.PN_TIMESTAMP, name: "timestamp"}
+ pnFloat = pnType{code: C.PN_FLOAT, name: "float"}
+ pnDouble = pnType{code: C.PN_DOUBLE, name: "double"}
+ pnDecimal32 = pnType{code: C.PN_DECIMAL32, name: "decimal32"}
+ pnDecimal64 = pnType{code: C.PN_DECIMAL64, name: "decimal64"}
+ pnDecimal128 = pnType{code: C.PN_DECIMAL128, name: "decimal128"}
+ pnUuid = pnType{code: C.PN_UUID, name: "uuid"}
+ pnBinary = pnType{code: C.PN_BINARY, name: "binary"}
+ pnString = pnType{code: C.PN_STRING, name: "string"}
+ pnSymbol = pnType{code: C.PN_SYMBOL, name: "symbol"}
+ pnDescribed = pnType{code: C.PN_DESCRIBED, name: "described"}
+ pnArray = pnType{code: C.PN_ARRAY, name: "array"}
+ pnList = pnType{code: C.PN_LIST, name: "list"}
+ pnMap = pnType{code: C.PN_MAP, name: "map"}
+)
+
+// Map from pn_type_t codes to pnType structs
+var pnTypes = map[C.pn_type_t]*pnType{
+ C.PN_NULL: &pnNull,
+ C.PN_BOOL: &pnBool,
+ C.PN_UBYTE: &pnUbyte,
+ C.PN_BYTE: &pnByte,
+ C.PN_USHORT: &pnUshort,
+ C.PN_SHORT: &pnShort,
+ C.PN_UINT: &pnUint,
+ C.PN_INT: &pnInt,
+ C.PN_CHAR: &pnChar,
+ C.PN_ULONG: &pnUlong,
+ C.PN_LONG: &pnLong,
+ C.PN_TIMESTAMP: &pnTimestamp,
+ C.PN_FLOAT: &pnFloat,
+ C.PN_DOUBLE: &pnDouble,
+ C.PN_DECIMAL32: &pnDecimal32,
+ C.PN_DECIMAL64: &pnDecimal64,
+ C.PN_DECIMAL128: &pnDecimal128,
+ C.PN_UUID: &pnUuid,
+ C.PN_BINARY: &pnBinary,
+ C.PN_STRING: &pnString,
+ C.PN_SYMBOL: &pnSymbol,
+ C.PN_DESCRIBED: &pnDescribed,
+ C.PN_ARRAY: &pnArray,
+ C.PN_LIST: &pnList,
+ C.PN_MAP: &pnMap,
}
-func pnTypeName(t C.pn_type_t) string {
- name := pnTypes[t].name
- return nonBlank(name, "unknown type")
+// Get the pnType for a pn_type_t code, panic if unknown code.
+func getPnType(code C.pn_type_t) *pnType {
+ pt, ok := pnTypes[code]
+ if !ok {
+ panic(errorf("unknown AMQP type code %v", code))
+ }
+ return pt
}
+
+// Go types
+var (
+ bytesType = reflect.TypeOf([]byte{})
+ valueType = reflect.TypeOf(reflect.Value{})
+)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7e42628e/proton-c/bindings/go/src/apache.org/proton/unmarshal.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/apache.org/proton/unmarshal.go b/proton-c/bindings/go/src/apache.org/proton/unmarshal.go
new file mode 100644
index 0000000..4255b28
--- /dev/null
+++ b/proton-c/bindings/go/src/apache.org/proton/unmarshal.go
@@ -0,0 +1,441 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+oor more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package proton
+
+// #include <proton/codec.h>
+import "C"
+
+import (
+ "bytes"
+ "io"
+ "reflect"
+ "unsafe"
+)
+
+const minDecode = 256
+
+//
+// Decoding from a pn_data_t
+//
+// NOTE: we use panic() to signal a decoding error, simplifies decoding logic.
+// We recover() at the highest possible level - i.e. in the exported Unmarshal or Decode.
+//
+
+// Decoder decodes AMQP values from an io.Reader.
+//
+type Decoder struct {
+ reader io.Reader
+ buffer bytes.Buffer
+}
+
+// NewDecoder returns a new decoder that reads from r.
+//
+// The decoder has it's own buffer and may read more data than required for the
+// AMQP values requested. Use Buffered to see if there is data left in the
+// buffer.
+//
+func NewDecoder(r io.Reader) *Decoder {
+ return &Decoder{r, bytes.Buffer{}}
+}
+
+// Buffered returns a reader of the data remaining in the Decoder's buffer. The
+// reader is valid until the next call to Decode.
+//
+func (d *Decoder) Buffered() io.Reader {
+ return bytes.NewReader(d.buffer.Bytes())
+}
+
+// Decode reads the next AMQP value from the Reader and stores it in the value pointed to by v.
+//
+// See the documentation for Unmarshal for details about the conversion of AMQP into a Go value.
+//
+func (d *Decoder) Decode(v interface{}) (err error) {
+ defer func() {
+ if x := recover(); x != nil {
+ err = errorf("%v", x)
+ }
+ }()
+ data := C.pn_data(0)
+ defer C.pn_data_free(data)
+ var n int
+ for n == 0 && err == nil {
+ n = unmarshal(data, d.buffer.Bytes(), v)
+ if n == 0 { // n == 0 means not enough data, read more
+ err = d.more()
+ if err != nil {
+ return
+ }
+ }
+ }
+ d.buffer.Next(n)
+ return
+}
+
+/*
+Unmarshal decodes AMQP-encoded bytes and stores the result in the value pointed to by v.
+
+Go types that can be unmarshalled from AMQP types
+
+bool from AMQP bool
+
+int, int8, int16, int32, int64 from equivalent or smaller AMQP signed integer type or char
+
+uint, uint8, uint16, uint32, uint64 types from equivalent or smaller AMQP unsigned integer type or char
+
+float32, float64 from equivalent or smaller AMQP float type.
+
+string, []byte from AMQP string, symbol or binary.
+
+TODO types
+
+AMQP from AMQP null, char, timestamp, decimal32/64/128, uuid, described, array, list, map
+
+Go: array, slice, struct, map, reflect/Value
+
+Go types that cannot be unmarshalled
+
+complex64/128, uintptr, function, interface, channel
+*/
+func Unmarshal(bytes []byte, v interface{}) (n int, err error) {
+ defer func() {
+ if x := recover(); x != nil {
+ err = errorf("%v", x)
+ }
+ }()
+ data := C.pn_data(0)
+ defer C.pn_data_free(data)
+ n = unmarshal(data, bytes, v)
+ if n == 0 {
+ err = errorf("not enough data")
+ }
+ return
+}
+
+// more reads more data when we can't parse a complete AMQP type
+func (d *Decoder) more() error {
+ var readSize int64 = minDecode
+ if int64(d.buffer.Len()) > readSize { // Grow by doubling
+ readSize = int64(d.buffer.Len())
+ }
+ var n int64
+ n, err := d.buffer.ReadFrom(io.LimitReader(d.reader, readSize))
+ if n == 0 && err == nil { // ReadFrom won't report io.EOF, just returns 0
+ err = io.EOF
+ }
+ return err
+}
+
+// unmarshal decodes from bytes and converts into the value pointed to by v.
+// Used by Unmarshal and Decode
+//
+// Returns the number of bytes decoded or 0 if not enough data.
+//
+func unmarshal(data *C.pn_data_t, bytes []byte, v interface{}) (n int) {
+ n = decode(data, bytes)
+ if n == 0 {
+ return 0
+ }
+ pnType := C.pn_data_type(data)
+
+ badUnmarshal := func() error {
+ notPtr := ""
+ if reflect.TypeOf(v).Kind() != reflect.Ptr {
+ notPtr = ", target is not a pointer"
+ }
+ return errorf("cannot unmarshal AMQP %s to %s%s", getPnType(pnType), reflect.TypeOf(v), notPtr)
+ }
+
+ switch v := v.(type) {
+ case *bool:
+ switch pnType {
+ case C.PN_BOOL:
+ *v = bool(C.pn_data_get_bool(data))
+ default:
+ panic(badUnmarshal())
+ }
+ case *int8:
+ switch pnType {
+ case C.PN_CHAR:
+ *v = int8(C.pn_data_get_char(data))
+ case C.PN_BYTE:
+ *v = int8(C.pn_data_get_byte(data))
+ default:
+ panic(badUnmarshal())
+ }
+ case *uint8:
+ switch pnType {
+ case C.PN_CHAR:
+ *v = uint8(C.pn_data_get_char(data))
+ case C.PN_UBYTE:
+ *v = uint8(C.pn_data_get_ubyte(data))
+ default:
+ panic(badUnmarshal())
+ }
+ case *int16:
+ switch pnType {
+ case C.PN_CHAR:
+ *v = int16(C.pn_data_get_char(data))
+ case C.PN_BYTE:
+ *v = int16(C.pn_data_get_byte(data))
+ case C.PN_SHORT:
+ *v = int16(C.pn_data_get_short(data))
+ default:
+ panic(badUnmarshal())
+ }
+ case *uint16:
+ switch pnType {
+ case C.PN_CHAR:
+ *v = uint16(C.pn_data_get_char(data))
+ case C.PN_UBYTE:
+ *v = uint16(C.pn_data_get_ubyte(data))
+ case C.PN_USHORT:
+ *v = uint16(C.pn_data_get_ushort(data))
+ default:
+ panic(badUnmarshal())
+ }
+ case *int32:
+ switch pnType {
+ case C.PN_CHAR:
+ *v = int32(C.pn_data_get_char(data))
+ case C.PN_BYTE:
+ *v = int32(C.pn_data_get_byte(data))
+ case C.PN_SHORT:
+ *v = int32(C.pn_data_get_short(data))
+ case C.PN_INT:
+ *v = int32(C.pn_data_get_int(data))
+ default:
+ panic(badUnmarshal())
+ }
+ case *uint32:
+ switch pnType {
+ case C.PN_CHAR:
+ *v = uint32(C.pn_data_get_char(data))
+ case C.PN_UBYTE:
+ *v = uint32(C.pn_data_get_ubyte(data))
+ case C.PN_USHORT:
+ *v = uint32(C.pn_data_get_ushort(data))
+ case C.PN_UINT:
+ *v = uint32(C.pn_data_get_uint(data))
+ default:
+ panic(badUnmarshal())
+ }
+
+ case *int64:
+ switch pnType {
+ case C.PN_CHAR:
+ *v = int64(C.pn_data_get_char(data))
+ case C.PN_BYTE:
+ *v = int64(C.pn_data_get_byte(data))
+ case C.PN_SHORT:
+ *v = int64(C.pn_data_get_short(data))
+ case C.PN_INT:
+ *v = int64(C.pn_data_get_int(data))
+ case C.PN_LONG:
+ *v = int64(C.pn_data_get_long(data))
+ default:
+ panic(badUnmarshal())
+ }
+
+ case *uint64:
+ switch pnType {
+ case C.PN_CHAR:
+ *v = uint64(C.pn_data_get_char(data))
+ case C.PN_UBYTE:
+ *v = uint64(C.pn_data_get_ubyte(data))
+ case C.PN_USHORT:
+ *v = uint64(C.pn_data_get_ushort(data))
+ case C.PN_ULONG:
+ *v = uint64(C.pn_data_get_ulong(data))
+ default:
+ panic(badUnmarshal())
+ }
+
+ case *int:
+ switch pnType {
+ case C.PN_CHAR:
+ *v = int(C.pn_data_get_char(data))
+ case C.PN_BYTE:
+ *v = int(C.pn_data_get_byte(data))
+ case C.PN_SHORT:
+ *v = int(C.pn_data_get_short(data))
+ case C.PN_INT:
+ *v = int(C.pn_data_get_int(data))
+ case C.PN_LONG:
+ if unsafe.Sizeof(0) == 8 {
+ *v = int(C.pn_data_get_long(data))
+ } else {
+ panic(badUnmarshal())
+ }
+ default:
+ panic(badUnmarshal())
+ }
+
+ case *uint:
+ switch pnType {
+ case C.PN_CHAR:
+ *v = uint(C.pn_data_get_char(data))
+ case C.PN_UBYTE:
+ *v = uint(C.pn_data_get_ubyte(data))
+ case C.PN_USHORT:
+ *v = uint(C.pn_data_get_ushort(data))
+ case C.PN_UINT:
+ *v = uint(C.pn_data_get_uint(data))
+ case C.PN_ULONG:
+ if unsafe.Sizeof(0) == 8 {
+ *v = uint(C.pn_data_get_ulong(data))
+ } else {
+ panic(badUnmarshal())
+ }
+ default:
+ panic(badUnmarshal())
+ }
+
+ case *float32:
+ switch pnType {
+ case C.PN_FLOAT:
+ *v = float32(C.pn_data_get_float(data))
+ default:
+ panic(badUnmarshal())
+ }
+
+ case *float64:
+ switch pnType {
+ case C.PN_FLOAT:
+ *v = float64(C.pn_data_get_float(data))
+ case C.PN_DOUBLE:
+ *v = float64(C.pn_data_get_double(data))
+ default:
+ panic(badUnmarshal())
+ }
+
+ case *string:
+ switch pnType {
+ case C.PN_STRING:
+ *v = goString(C.pn_data_get_string(data))
+ case C.PN_SYMBOL:
+ *v = goString(C.pn_data_get_symbol(data))
+ case C.PN_BINARY:
+ *v = goString(C.pn_data_get_binary(data))
+ default:
+ panic(badUnmarshal())
+ }
+
+ case *[]byte:
+ switch pnType {
+ case C.PN_STRING:
+ *v = goBytes(C.pn_data_get_string(data))
+ case C.PN_SYMBOL:
+ *v = goBytes(C.pn_data_get_symbol(data))
+ case C.PN_BINARY:
+ *v = goBytes(C.pn_data_get_binary(data))
+ default:
+ panic(badUnmarshal())
+ }
+
+ case *reflect.Value:
+ switch pnType {
+ case C.PN_BOOL:
+ *v = reflect.ValueOf(bool(C.pn_data_get_bool(data)))
+ case C.PN_UBYTE:
+ *v = reflect.ValueOf(uint8(C.pn_data_get_ubyte(data)))
+ case C.PN_BYTE:
+ *v = reflect.ValueOf(int8(C.pn_data_get_byte(data)))
+ case C.PN_USHORT:
+ *v = reflect.ValueOf(uint16(C.pn_data_get_ushort(data)))
+ case C.PN_SHORT:
+ *v = reflect.ValueOf(int16(C.pn_data_get_short(data)))
+ case C.PN_UINT:
+ *v = reflect.ValueOf(uint32(C.pn_data_get_uint(data)))
+ case C.PN_INT:
+ *v = reflect.ValueOf(int32(C.pn_data_get_int(data)))
+ case C.PN_CHAR:
+ *v = reflect.ValueOf(uint8(C.pn_data_get_char(data)))
+ case C.PN_ULONG:
+ *v = reflect.ValueOf(uint64(C.pn_data_get_ulong(data)))
+ case C.PN_LONG:
+ *v = reflect.ValueOf(int64(C.pn_data_get_long(data)))
+ case C.PN_FLOAT:
+ *v = reflect.ValueOf(float32(C.pn_data_get_float(data)))
+ case C.PN_DOUBLE:
+ *v = reflect.ValueOf(float64(C.pn_data_get_double(data)))
+ case C.PN_BINARY:
+ *v = valueOfBytes(C.pn_data_get_binary(data))
+ case C.PN_STRING:
+ *v = valueOfString(C.pn_data_get_string(data))
+ case C.PN_SYMBOL:
+ *v = valueOfString(C.pn_data_get_symbol(data))
+ default:
+ panic(badUnmarshal())
+ }
+
+ default:
+ panic(badUnmarshal())
+ }
+ err := pnDataError(data)
+ if err != "" {
+ panic(errorf("unmarshal %s", err))
+ }
+ return
+}
+
+// decode from bytes.
+// Return bytes decoded or 0 if we could not decode a complete object.
+//
+func decode(data *C.pn_data_t, bytes []byte) int {
+ if len(bytes) == 0 {
+ return 0
+ }
+ cBuf := (*C.char)(unsafe.Pointer(&bytes[0]))
+ n := int(C.pn_data_decode(data, cBuf, C.size_t(len(bytes))))
+ if n == int(C.PN_EOS) {
+ return 0
+ } else if n <= 0 {
+ panic(errorf("unmarshal %s", pnErrorName(n)))
+ }
+ return n
+}
+
+func valueOfBytes(bytes C.pn_bytes_t) reflect.Value {
+ if bytes.start == nil || bytes.size == 0 {
+ return reflect.ValueOf([]byte{})
+ }
+ return reflect.ValueOf(C.GoBytes(unsafe.Pointer(bytes.start), C.int(bytes.size)))
+}
+
+func valueOfString(bytes C.pn_bytes_t) reflect.Value {
+ if bytes.start == nil || bytes.size == 0 {
+ return reflect.ValueOf("")
+ }
+ return reflect.ValueOf(C.GoStringN(bytes.start, C.int(bytes.size)))
+}
+
+func goBytes(cBytes C.pn_bytes_t) (bytes []byte) {
+ if cBytes.start != nil {
+ bytes = C.GoBytes(unsafe.Pointer(cBytes.start), C.int(cBytes.size))
+ }
+ return
+}
+
+func goString(cBytes C.pn_bytes_t) (str string) {
+ if cBytes.start != nil {
+ str = C.GoStringN(cBytes.start, C.int(cBytes.size))
+ }
+ return
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7e42628e/proton-c/bindings/go/src/apache.org/proton/url.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/apache.org/proton/url.go b/proton-c/bindings/go/src/apache.org/proton/url.go
index 1511203..079dfb2 100644
--- a/proton-c/bindings/go/src/apache.org/proton/url.go
+++ b/proton-c/bindings/go/src/apache.org/proton/url.go
@@ -40,8 +40,8 @@ import (
)
const (
- AMQP string = "amqp"
- AMQPS = "amqps"
+ amqp string = "amqp"
+ amqps = "amqps"
)
// ParseUrl parses an AMQP URL string and returns a net/url.Url.
@@ -69,13 +69,13 @@ func ParseURL(s string) (u *url.URL, err error) {
return nil, errorf("bad URL %#v: %s", s, err)
}
if scheme == "" {
- scheme = AMQP
+ scheme = amqp
}
if port == "" {
- if scheme == AMQPS {
- port = AMQPS
+ if scheme == amqps {
+ port = amqps
} else {
- port = AMQP
+ port = amqp
}
}
var user *url.Userinfo
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/7e42628e/proton-c/src/codec/encoder.c
----------------------------------------------------------------------
diff --git a/proton-c/src/codec/encoder.c b/proton-c/src/codec/encoder.c
index fd126be..2b9218f 100644
--- a/proton-c/src/codec/encoder.c
+++ b/proton-c/src/codec/encoder.c
@@ -152,7 +152,7 @@ static uint8_t pn_node2code(pn_encoder_t *encoder, pni_node_t *node)
static size_t pn_encoder_remaining(pn_encoder_t *encoder)
{
- return encoder->output + encoder->size - encoder->position;
+ return encoder->output ? (encoder->output + encoder->size - encoder->position) : 0;
}
static inline int pn_encoder_writef8(pn_encoder_t *encoder, uint8_t value)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org