You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/06/09 01:26:13 UTC
[50/50] [abbrv] qpid-proton git commit: Merge branch 'master' into go1
Merge branch 'master' into go1
Filter and described type support
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/847a83cc
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/847a83cc
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/847a83cc
Branch: refs/heads/go1
Commit: 847a83cc632bf4b69c6d1a57716aac1912d7b1c5
Parents: 569b4fc c31e2ec
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Jun 8 21:24:27 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Jun 8 21:24:27 2017 -0400
----------------------------------------------------------------------
amqp/interop_test.go | 8 --
amqp/marshal.go | 54 ++++++++-----
amqp/marshal_test.go | 90 +++++++++++++++++++++
amqp/message.go | 120 +++++++++++++++++++--------
amqp/message_test.go | 47 +++++++++--
amqp/types.go | 29 ++++++-
amqp/types_test.go | 197 +++++++++++++++++++++++++++++++++++++++++++++
amqp/unmarshal.go | 197 ++++++++++++++++++++++++++++++---------------
electron/link.go | 42 +++++++---
electron/link_test.go | 6 +-
proton/wrappers.go | 37 ++++++---
11 files changed, 673 insertions(+), 154 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/847a83cc/amqp/interop_test.go
----------------------------------------------------------------------
diff --cc amqp/interop_test.go
index b3e27bc,0000000..a5fb92e
mode 100644,000000..100644
--- a/amqp/interop_test.go
+++ b/amqp/interop_test.go
@@@ -1,385 -1,0 +1,377 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+// Test that conversion of Go type to/from AMQP is compatible with other
+// bindings.
+//
+package amqp
+
+import (
+ "bytes"
- "fmt"
+ "io"
+ "io/ioutil"
+ "os"
+ "reflect"
+ "strings"
+ "testing"
+)
+
- func checkEqual(want interface{}, got interface{}) error {
- if !reflect.DeepEqual(want, got) {
- return fmt.Errorf("%#v != %#v", want, got)
- }
- return nil
- }
-
+func getReader(t *testing.T, name string) (r io.Reader) {
+ dir := os.Getenv("PN_INTEROP_DIR")
+ if dir == "" {
+ t.Skip("no PN_INTEROP_DIR in environment")
+ }
+ r, err := os.Open(dir + "/" + name + ".amqp")
+ if err != nil {
+ t.Fatalf("can't open %#v: %v", name, err)
+ }
+ return
+}
+
+func remaining(d *Decoder) string {
+ remainder, _ := ioutil.ReadAll(io.MultiReader(d.Buffered(), d.reader))
+ return string(remainder)
+}
+
+// checkDecode: want is the expected value, gotPtr is a pointer to a
+// instance of the same type for Decode.
+func checkDecode(d *Decoder, want interface{}, gotPtr interface{}, t *testing.T) {
+
+ if err := d.Decode(gotPtr); err != nil {
+ t.Error("Decode failed", err)
+ return
+ }
+ got := reflect.ValueOf(gotPtr).Elem().Interface()
+ if err := checkEqual(want, got); err != nil {
+ t.Error("Decode bad value:", err)
+ return
+ }
+
+ // Try round trip encoding
+ bytes, err := Marshal(want, nil)
+ if err != nil {
+ t.Error("Marshal failed", err)
+ return
+ }
+ n, err := Unmarshal(bytes, gotPtr)
+ if err != nil {
+ t.Error("Unmarshal failed", err)
+ return
+ }
+ if err := checkEqual(n, len(bytes)); err != nil {
+ t.Error("Bad unmarshal length", err)
+ return
+ }
+ got = reflect.ValueOf(gotPtr).Elem().Interface()
+ if err = checkEqual(want, got); err != nil {
+ t.Error("Bad unmarshal value", err)
+ return
+ }
+}
+
+func TestUnmarshal(t *testing.T) {
+ bytes, err := ioutil.ReadAll(getReader(t, "strings"))
+ if err != nil {
+ t.Error(err)
+ }
+ for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", "", ""} {
+ var got string
+ n, err := Unmarshal(bytes, &got)
+ if err != nil {
+ t.Error(err)
+ }
+ if want != got {
+ t.Errorf("%#v != %#v", want, got)
+ }
+ bytes = bytes[n:]
+ }
+}
+
+func TestPrimitivesExact(t *testing.T) {
+ d := NewDecoder(getReader(t, "primitives"))
+ // Decoding into exact types
+ var b bool
+ checkDecode(d, true, &b, t)
+ checkDecode(d, false, &b, t)
+ var u8 uint8
+ checkDecode(d, uint8(42), &u8, t)
+ var u16 uint16
+ checkDecode(d, uint16(42), &u16, t)
+ var i16 int16
+ checkDecode(d, int16(-42), &i16, t)
+ var u32 uint32
+ checkDecode(d, uint32(12345), &u32, t)
+ var i32 int32
+ checkDecode(d, int32(-12345), &i32, t)
+ var u64 uint64
+ checkDecode(d, uint64(12345), &u64, t)
+ var i64 int64
+ checkDecode(d, int64(-12345), &i64, t)
+ var f32 float32
+ checkDecode(d, float32(0.125), &f32, t)
+ var f64 float64
+ checkDecode(d, float64(0.125), &f64, t)
+}
+
+func TestPrimitivesCompatible(t *testing.T) {
+ d := NewDecoder(getReader(t, "primitives"))
+ // Decoding into compatible types
+ var b bool
+ var i int
+ var u uint
+ var f float64
+ checkDecode(d, true, &b, t)
+ checkDecode(d, false, &b, t)
+ checkDecode(d, uint(42), &u, t)
+ checkDecode(d, uint(42), &u, t)
+ checkDecode(d, -42, &i, t)
+ checkDecode(d, uint(12345), &u, t)
+ checkDecode(d, -12345, &i, t)
+ checkDecode(d, uint(12345), &u, t)
+ checkDecode(d, -12345, &i, t)
+ checkDecode(d, 0.125, &f, t)
+ checkDecode(d, 0.125, &f, t)
+}
+
+// checkDecodeValue: want is the expected value, decode into a reflect.Value
+func checkDecodeInterface(d *Decoder, want interface{}, t *testing.T) {
+
+ var got, got2 interface{}
+ if err := d.Decode(&got); err != nil {
+ t.Error("Decode failed", err)
+ return
+ }
+ if err := checkEqual(want, got); err != nil {
+ t.Error(err)
+ return
+ }
+ // Try round trip encoding
+ bytes, err := Marshal(got, nil)
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ n, err := Unmarshal(bytes, &got2)
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ if err := checkEqual(n, len(bytes)); err != nil {
+ t.Error(err)
+ return
+ }
+ if err := checkEqual(want, got2); err != nil {
+ t.Error(err)
+ return
+ }
+}
+
+func TestPrimitivesInterface(t *testing.T) {
+ d := NewDecoder(getReader(t, "primitives"))
+ checkDecodeInterface(d, true, t)
+ checkDecodeInterface(d, false, t)
+ checkDecodeInterface(d, uint8(42), t)
+ checkDecodeInterface(d, uint16(42), t)
+ checkDecodeInterface(d, int16(-42), t)
+ checkDecodeInterface(d, uint32(12345), t)
+ checkDecodeInterface(d, int32(-12345), t)
+ checkDecodeInterface(d, uint64(12345), t)
+ checkDecodeInterface(d, int64(-12345), t)
+ checkDecodeInterface(d, float32(0.125), t)
+ checkDecodeInterface(d, float64(0.125), t)
+}
+
+func TestStrings(t *testing.T) {
+ d := NewDecoder(getReader(t, "strings"))
+ // Test decoding as plain Go strings
+ for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", "", ""} {
+ var got string
+ checkDecode(d, want, &got, t)
+ }
+ remains := remaining(d)
+ if remains != "" {
+ t.Errorf("leftover: %s", remains)
+ }
+
+ // Test decoding as specific string types
+ d = NewDecoder(getReader(t, "strings"))
+ var bytes []byte
+ var str, sym string
+ checkDecode(d, []byte("abc\000defg"), &bytes, t)
+ checkDecode(d, "abcdefg", &str, t)
+ checkDecode(d, "abcdefg", &sym, t)
+ checkDecode(d, make([]byte, 0), &bytes, t)
+ checkDecode(d, "", &str, t)
+ checkDecode(d, "", &sym, t)
+ remains = remaining(d)
+ if remains != "" {
+ t.Fatalf("leftover: %s", remains)
+ }
+
+ // Test some error handling
+ d = NewDecoder(getReader(t, "strings"))
+ var s string
+ err := d.Decode(s)
+ if err == nil {
+ t.Fatal("Expected error")
+ }
+ if !strings.Contains(err.Error(), "not a pointer") {
+ t.Error(err)
+ }
+ var i int
+ err = d.Decode(&i)
+ if !strings.Contains(err.Error(), "cannot unmarshal") {
+ t.Error(err)
+ }
+ _, err = Unmarshal([]byte{}, nil)
+ if !strings.Contains(err.Error(), "not enough data") {
+ t.Error(err)
+ }
+ _, err = Unmarshal([]byte("foobar"), nil)
+ if !strings.Contains(err.Error(), "invalid-argument") {
+ t.Error(err)
+ }
+}
+
+func TestEncodeDecode(t *testing.T) {
+ type data struct {
+ s string
+ i int
+ u8 uint8
+ b bool
+ f float32
+ v interface{}
+ }
+
+ in := data{"foo", 42, 9, true, 1.234, "thing"}
+
+ buf := bytes.Buffer{}
+ e := NewEncoder(&buf)
+ if err := e.Encode(in.s); err != nil {
+ t.Error(err)
+ }
+ if err := e.Encode(in.i); err != nil {
+ t.Error(err)
+ }
+ if err := e.Encode(in.u8); err != nil {
+ t.Error(err)
+ }
+ if err := e.Encode(in.b); err != nil {
+ t.Error(err)
+ }
+ if err := e.Encode(in.f); err != nil {
+ t.Error(err)
+ }
+ if err := e.Encode(in.v); err != nil {
+ t.Error(err)
+ }
+
+ var out data
+ d := NewDecoder(&buf)
+ if err := d.Decode(&out.s); err != nil {
+ t.Error(err)
+ }
+ if err := d.Decode(&out.i); err != nil {
+ t.Error(err)
+ }
+ if err := d.Decode(&out.u8); err != nil {
+ t.Error(err)
+ }
+ if err := d.Decode(&out.b); err != nil {
+ t.Error(err)
+ }
+ if err := d.Decode(&out.f); err != nil {
+ t.Error(err)
+ }
+ if err := d.Decode(&out.v); err != nil {
+ t.Error(err)
+ }
+
+ if err := checkEqual(in, out); err != nil {
+ t.Error(err)
+ }
+}
+
+func TestMap(t *testing.T) {
+ d := NewDecoder(getReader(t, "maps"))
+
+ // Generic map
+ var m Map
+ checkDecode(d, Map{"one": int32(1), "two": int32(2), "three": int32(3)}, &m, t)
+
+ // Interface as map
+ var i interface{}
+ checkDecode(d, Map{int32(1): "one", int32(2): "two", int32(3): "three"}, &i, t)
+
+ d = NewDecoder(getReader(t, "maps"))
+ // Specific typed map
+ var m2 map[string]int
+ checkDecode(d, map[string]int{"one": 1, "two": 2, "three": 3}, &m2, t)
+
+ // Nested map
+ m = Map{int64(1): "one", "two": int32(2), true: Map{uint8(1): true, uint8(2): false}}
+ bytes, err := Marshal(m, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ _, err = Unmarshal(bytes, &i)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if err = checkEqual(m, i); err != nil {
+ t.Fatal(err)
+ }
+}
+
+func TestList(t *testing.T) {
+ d := NewDecoder(getReader(t, "lists"))
+ var l List
+ checkDecode(d, List{int32(32), "foo", true}, &l, t)
+ checkDecode(d, List{}, &l, t)
+}
+
+// TODO aconway 2015-09-08: the message.amqp file seems to be incorrectly coded as
+// as an AMQP string *inside* an AMQP binary?? Skip the test for now.
+func TODO_TestMessage(t *testing.T) {
+ bytes, err := ioutil.ReadAll(getReader(t, "message"))
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ m, err := DecodeMessage(bytes)
+ if err != nil {
+ t.Fatal(err)
+ } else {
+ if err := checkEqual(m.Body(), "hello"); err != nil {
+ t.Error(err)
+ }
+ }
+
+ m2 := NewMessageWith("hello")
+ bytes2, err := m2.Encode(nil)
+ if err != nil {
+ t.Error(err)
+ } else {
+ if err = checkEqual(bytes, bytes2); err != nil {
+ t.Error(err)
+ }
+ }
+}
+
+// TODO aconway 2015-03-13: finish the full interop test
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/847a83cc/amqp/marshal.go
----------------------------------------------------------------------
diff --cc amqp/marshal.go
index b6adf90,0000000..ca5e380
mode 100644,000000..100644
--- a/amqp/marshal.go
+++ b/amqp/marshal.go
@@@ -1,271 -1,0 +1,281 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package amqp
+
+// #include <proton/codec.h>
+import "C"
+
+import (
+ "fmt"
+ "io"
+ "reflect"
+ "unsafe"
+)
+
+// Error returned if Go data cannot be marshaled as an AMQP type.
+type MarshalError struct {
+ // The Go type.
+ GoType reflect.Type
+ s string
+}
+
+func (e MarshalError) Error() string { return e.s }
+
+func newMarshalError(v interface{}, s string) *MarshalError {
+ t := reflect.TypeOf(v)
+ return &MarshalError{GoType: t, s: fmt.Sprintf("cannot marshal %s: %s", t, s)}
+}
+
+func dataMarshalError(v interface{}, data *C.pn_data_t) error {
+ if pe := PnError(C.pn_data_error(data)); pe != nil {
+ return newMarshalError(v, pe.Error())
+ }
+ return nil
+}
+
++func recoverMarshal(err *error) {
++ if r := recover(); r != nil {
++ if merr, ok := r.(*MarshalError); ok {
++ *err = merr
++ } else {
++ panic(r)
++ }
++ }
++}
++
+/*
+Marshal encodes a Go value as AMQP data in buffer.
+If buffer is nil, or is not large enough, a new buffer is created.
+
+Returns the buffer used for encoding with len() adjusted to the actual size of data.
+
+Go types are encoded as follows
+
+ +-------------------------------------+--------------------------------------------+
+ |Go type |AMQP type |
+ +-------------------------------------+--------------------------------------------+
+ |bool |bool |
+ +-------------------------------------+--------------------------------------------+
+ |int8, int16, int32, int64 (int) |byte, short, int, long (int or long) |
+ +-------------------------------------+--------------------------------------------+
+ |uint8, uint16, uint32, uint64 (uint) |ubyte, ushort, uint, ulong (uint or ulong) |
+ +-------------------------------------+--------------------------------------------+
+ |float32, float64 |float, double. |
+ +-------------------------------------+--------------------------------------------+
+ |string |string |
+ +-------------------------------------+--------------------------------------------+
+ |[]byte, Binary |binary |
+ +-------------------------------------+--------------------------------------------+
+ |Symbol |symbol |
+ +-------------------------------------+--------------------------------------------+
+ |interface{} |the contained type |
+ +-------------------------------------+--------------------------------------------+
+ |nil |null |
+ +-------------------------------------+--------------------------------------------+
+ |map[K]T |map with K and T converted as above |
+ +-------------------------------------+--------------------------------------------+
+ |Map |map, may have mixed types for keys, values |
+ +-------------------------------------+--------------------------------------------+
+ |[]T |list with T converted as above |
+ +-------------------------------------+--------------------------------------------+
+ |List |list, may have mixed types values |
+ +-------------------------------------+--------------------------------------------+
++ |Described |described type |
++ +-------------------------------------+--------------------------------------------+
+
- The following Go types cannot be marshaled: uintptr, function, interface, channel
-
- TODO
-
- Go types: array, slice, struct, complex64/128.
++The following Go types cannot be marshaled: uintptr, function, channel, array (use slice), struct
+
- AMQP types: decimal32/64/128, char, timestamp, uuid, array, multi-section message bodies.
++TODO: Not yet implemented:
+
- Described types.
++Go types: struct, complex64/128.
+
++AMQP types: decimal32/64/128, char, timestamp, uuid, array.
+*/
+func Marshal(v interface{}, buffer []byte) (outbuf []byte, err error) {
- defer func() {
- if r := recover(); r != nil {
- if merr, ok := r.(*MarshalError); ok {
- err = merr
- } else {
- panic(r)
- }
- }
- }()
-
++ defer recoverMarshal(&err)
+ data := C.pn_data(0)
+ defer C.pn_data_free(data)
+ marshal(v, data)
+ encode := func(buf []byte) ([]byte, error) {
+ n := int(C.pn_data_encode(data, cPtr(buf), cLen(buf)))
+ switch {
+ case n == int(C.PN_OVERFLOW):
+ return buf, overflow
+ case n < 0:
+ return buf, dataMarshalError(v, data)
+ default:
+ return buf[:n], nil
+ }
+ }
+ return encodeGrow(buffer, encode)
+}
+
++// Internal
++func MarshalUnsafe(v interface{}, pn_data unsafe.Pointer) (err error) {
++ defer recoverMarshal(&err)
++ marshal(v, (*C.pn_data_t)(pn_data))
++ return
++}
++
+const minEncode = 256
+
+// overflow is returned when an encoding function can't fit data in the buffer.
+var overflow = fmt.Errorf("buffer too small")
+
+// encodeFn encodes into buffer[0:len(buffer)].
+// Returns buffer with length adjusted for data encoded.
+// If buffer too small, returns overflow as error.
+type encodeFn func(buffer []byte) ([]byte, error)
+
+// encodeGrow calls encode() into buffer, if it returns overflow grows the buffer.
+// Returns the final buffer.
+func encodeGrow(buffer []byte, encode encodeFn) ([]byte, error) {
+ if buffer == nil || len(buffer) == 0 {
+ buffer = make([]byte, minEncode)
+ }
+ var err error
+ for buffer, err = encode(buffer); err == overflow; buffer, err = encode(buffer) {
+ buffer = make([]byte, 2*len(buffer))
+ }
+ return buffer, err
+}
+
+func marshal(v interface{}, data *C.pn_data_t) {
+ switch v := v.(type) {
+ case nil:
+ C.pn_data_put_null(data)
+ case bool:
+ C.pn_data_put_bool(data, C.bool(v))
+ case int8:
+ C.pn_data_put_byte(data, C.int8_t(v))
+ case int16:
+ C.pn_data_put_short(data, C.int16_t(v))
+ case int32:
+ C.pn_data_put_int(data, C.int32_t(v))
+ case int64:
+ C.pn_data_put_long(data, C.int64_t(v))
+ case int:
+ if unsafe.Sizeof(int(0)) == 8 {
+ C.pn_data_put_long(data, C.int64_t(v))
+ } else {
+ C.pn_data_put_int(data, C.int32_t(v))
+ }
+ case uint8:
+ C.pn_data_put_ubyte(data, C.uint8_t(v))
+ case uint16:
+ C.pn_data_put_ushort(data, C.uint16_t(v))
+ case uint32:
+ C.pn_data_put_uint(data, C.uint32_t(v))
+ case uint64:
+ C.pn_data_put_ulong(data, C.uint64_t(v))
+ case uint:
+ if unsafe.Sizeof(int(0)) == 8 {
+ C.pn_data_put_ulong(data, C.uint64_t(v))
+ } else {
+ C.pn_data_put_uint(data, C.uint32_t(v))
+ }
+ case float32:
+ C.pn_data_put_float(data, C.float(v))
+ case float64:
+ C.pn_data_put_double(data, C.double(v))
+ case string:
+ C.pn_data_put_string(data, pnBytes([]byte(v)))
+ case []byte:
+ C.pn_data_put_binary(data, pnBytes(v))
+ case Binary:
+ C.pn_data_put_binary(data, pnBytes([]byte(v)))
+ case Symbol:
+ C.pn_data_put_symbol(data, pnBytes([]byte(v)))
+ case Map: // Special map type
+ C.pn_data_put_map(data)
+ C.pn_data_enter(data)
+ for key, val := range v {
+ marshal(key, data)
+ marshal(val, data)
+ }
+ C.pn_data_exit(data)
++ case Described:
++ C.pn_data_put_described(data)
++ C.pn_data_enter(data)
++ marshal(v.Descriptor, data)
++ marshal(v.Value, data)
++ C.pn_data_exit(data)
++ case AnnotationKey:
++ marshal(v.Get(), data)
+ default:
+ switch reflect.TypeOf(v).Kind() {
+ case reflect.Map:
+ putMap(data, v)
+ case reflect.Slice:
+ putList(data, v)
+ default:
+ panic(newMarshalError(v, "no conversion"))
+ }
+ }
+ if err := dataMarshalError(v, data); err != nil {
+ panic(err)
+ }
+ return
+}
+
+func clearMarshal(v interface{}, data *C.pn_data_t) {
+ C.pn_data_clear(data)
+ marshal(v, data)
+}
+
+func putMap(data *C.pn_data_t, v interface{}) {
+ mapValue := reflect.ValueOf(v)
+ C.pn_data_put_map(data)
+ C.pn_data_enter(data)
+ for _, key := range mapValue.MapKeys() {
+ marshal(key.Interface(), data)
+ marshal(mapValue.MapIndex(key).Interface(), data)
+ }
+ C.pn_data_exit(data)
+}
+
+func putList(data *C.pn_data_t, v interface{}) {
+ listValue := reflect.ValueOf(v)
+ C.pn_data_put_list(data)
+ C.pn_data_enter(data)
+ for i := 0; i < listValue.Len(); i++ {
+ marshal(listValue.Index(i).Interface(), data)
+ }
+ C.pn_data_exit(data)
+}
+
+// Encoder encodes AMQP values to an io.Writer
+type Encoder struct {
+ writer io.Writer
+ buffer []byte
+}
+
+// New encoder returns a new encoder that writes to w.
+func NewEncoder(w io.Writer) *Encoder {
+ return &Encoder{w, make([]byte, minEncode)}
+}
+
+func (e *Encoder) Encode(v interface{}) (err error) {
+ e.buffer, err = Marshal(v, e.buffer)
+ if err == nil {
+ _, err = e.writer.Write(e.buffer)
+ }
+ return err
+}
-
- func replace(data *C.pn_data_t, v interface{}) {
- C.pn_data_clear(data)
- marshal(v, data)
- }
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/847a83cc/amqp/marshal_test.go
----------------------------------------------------------------------
diff --cc amqp/marshal_test.go
index 0000000,0000000..d8e0711
new file mode 100644
--- /dev/null
+++ b/amqp/marshal_test.go
@@@ -1,0 -1,0 +1,90 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements. See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership. The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied. See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package amqp
++
++import (
++ "testing"
++)
++
++func TestSymbolKey(t *testing.T) {
++ bytes, err := Marshal(AnnotationKeySymbol("foo"), nil)
++ if err != nil {
++ t.Fatal(err)
++ }
++ var k AnnotationKey
++ if _, err := Unmarshal(bytes, &k); err != nil {
++ t.Error(err)
++ }
++ if err := checkEqual("foo", string(k.Get().(Symbol))); err != nil {
++ t.Error(err)
++ }
++ var sym Symbol
++ if _, err := Unmarshal(bytes, &sym); err != nil {
++ t.Error(err)
++ }
++ if err := checkEqual("foo", sym.String()); err != nil {
++ t.Error(err)
++ }
++
++}
++
++func TestStringKey(t *testing.T) {
++ bytes, err := Marshal(AnnotationKeyString("foo"), nil)
++ if err != nil {
++ t.Fatal(err)
++ }
++ var k AnnotationKey
++ if _, err := Unmarshal(bytes, &k); err != nil {
++ t.Error(err)
++ }
++ if err := checkEqual("foo", string(k.Get().(Symbol))); err != nil {
++ t.Error(err)
++ }
++ var s string
++ if _, err := Unmarshal(bytes, &s); err != nil {
++ t.Error(err)
++ }
++ if err := checkEqual("foo", s); err != nil {
++ t.Error(err)
++ }
++
++}
++
++func TestIntKey(t *testing.T) {
++ bytes, err := Marshal(AnnotationKeyUint64(12345), nil)
++ if err != nil {
++ t.Fatal(err)
++ }
++ var k AnnotationKey
++ if _, err := Unmarshal(bytes, &k); err != nil {
++ t.Error(err)
++ }
++ if 12345 != k.Get().(uint64) {
++ t.Errorf("(%T)%v != (%T)%v", 12345, k.Get().(uint64))
++ }
++ var n uint64
++ if _, err := Unmarshal(bytes, &n); err != nil {
++ t.Error(err)
++ }
++ if 12345 != n {
++ t.Errorf("%v != %v", 12345, k.Get().(uint64))
++ }
++
++}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/847a83cc/amqp/message.go
----------------------------------------------------------------------
diff --cc amqp/message.go
index 753682e,0000000..d1ad2eb
mode 100644,000000..100644
--- a/amqp/message.go
+++ b/amqp/message.go
@@@ -1,348 -1,0 +1,402 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package amqp
+
+// #include <proton/codec.h>
+// #include <proton/types.h>
+// #include <proton/message.h>
+// #include <stdlib.h>
+//
+// /* Helper for setting message string fields */
+// typedef int (*set_fn)(pn_message_t*, const char*);
+// int msg_set_str(pn_message_t* m, char* s, set_fn set) {
+// int result = set(m, s);
+// free(s);
+// return result;
+// }
+//
+import "C"
+
+import (
+ "fmt"
+ "runtime"
+ "time"
- "unsafe"
+)
+
+// Message is the interface to an AMQP message.
+type Message interface {
+ // Durable indicates that any parties taking responsibility
+ // for the message must durably store the content.
+ Durable() bool
+ SetDurable(bool)
+
+ // Priority impacts ordering guarantees. Within a
+ // given ordered context, higher priority messages may jump ahead of
+ // lower priority messages.
+ Priority() uint8
+ SetPriority(uint8)
+
+ // TTL or Time To Live, a message it may be dropped after this duration
+ TTL() time.Duration
+ SetTTL(time.Duration)
+
+ // FirstAcquirer indicates
+ // that the recipient of the message is the first recipient to acquire
+ // the message, i.e. there have been no failed delivery attempts to
+ // other acquirers. Note that this does not mean the message has not
+ // been delivered to, but not acquired, by other recipients.
+ FirstAcquirer() bool
+ SetFirstAcquirer(bool)
+
+ // DeliveryCount tracks how many attempts have been made to
+ // delivery a message.
+ DeliveryCount() uint32
+ SetDeliveryCount(uint32)
+
+ // MessageId provides a unique identifier for a message.
+ // it can be an a string, an unsigned long, a uuid or a
+ // binary value.
+ MessageId() interface{}
+ SetMessageId(interface{})
+
+ UserId() string
+ SetUserId(string)
+
+ Address() string
+ SetAddress(string)
+
+ Subject() string
+ SetSubject(string)
+
+ ReplyTo() string
+ SetReplyTo(string)
+
+ // CorrelationId is set on correlated request and response messages. It can be
+ // an a string, an unsigned long, a uuid or a binary value.
+ CorrelationId() interface{}
+ SetCorrelationId(interface{})
+
+ ContentType() string
+ SetContentType(string)
+
+ ContentEncoding() string
+ SetContentEncoding(string)
+
+ // ExpiryTime indicates an absoulte time when the message may be dropped.
+ // A Zero time (i.e. t.isZero() == true) indicates a message never expires.
+ ExpiryTime() time.Time
+ SetExpiryTime(time.Time)
+
+ CreationTime() time.Time
+ SetCreationTime(time.Time)
+
+ GroupId() string
+ SetGroupId(string)
+
+ GroupSequence() int32
+ SetGroupSequence(int32)
+
+ ReplyToGroupId() string
+ SetReplyToGroupId(string)
+
- // Instructions - AMQP delivery instructions.
- Instructions() map[string]interface{}
- SetInstructions(v map[string]interface{})
++ // Property map set by the application to be carried with the message.
++ // Values must be simple types (not maps, lists or sequences)
++ ApplicationProperties() map[string]interface{}
++ SetApplicationProperties(map[string]interface{})
+
- // Annotations - AMQP annotations.
- Annotations() map[string]interface{}
- SetAnnotations(v map[string]interface{})
++ // Per-delivery annotations to provide delivery instructions.
++ // May be added or removed by intermediaries during delivery.
++ DeliveryAnnotations() map[AnnotationKey]interface{}
++ SetDeliveryAnnotations(map[AnnotationKey]interface{})
+
- // Properties - Application properties.
- Properties() map[string]interface{}
- SetProperties(v map[string]interface{})
++ // Message annotations added as part of the bare message at creation, usually
++ // by an AMQP library. See ApplicationProperties() for adding application data.
++ MessageAnnotations() map[AnnotationKey]interface{}
++ SetMessageAnnotations(map[AnnotationKey]interface{})
+
+ // Inferred indicates how the message content
+ // is encoded into AMQP sections. If inferred is true then binary and
+ // list values in the body of the message will be encoded as AMQP DATA
+ // and AMQP SEQUENCE sections, respectively. If inferred is false,
+ // then all values in the body of the message will be encoded as AMQP
+ // VALUE sections regardless of their type.
+ Inferred() bool
+ SetInferred(bool)
+
+ // Marshal a Go value into the message body. See amqp.Marshal() for details.
+ Marshal(interface{})
+
+ // Unmarshal the message body into the value pointed to by v. See amqp.Unmarshal() for details.
+ Unmarshal(interface{})
+
+ // Body value resulting from the default unmarshalling of message body as interface{}
+ Body() interface{}
+
+ // Encode encodes the message as AMQP data. If buffer is non-nil and is large enough
+ // the message is encoded into it, otherwise a new buffer is created.
+ // Returns the buffer containing the message.
+ Encode(buffer []byte) ([]byte, error)
+
+ // Decode data into this message. Overwrites an existing message content.
+ Decode(buffer []byte) error
+
+ // Clear the message contents.
+ Clear()
+
+ // Copy the contents of another message to this one.
+ Copy(m Message) error
++
++ // Deprecated: use DeliveryAnnotations() for a more type-safe interface
++ Instructions() map[string]interface{}
++ SetInstructions(v map[string]interface{})
++
++ // Deprecated: use MessageAnnotations() for a more type-safe interface
++ Annotations() map[string]interface{}
++ SetAnnotations(v map[string]interface{})
++
++ // Deprecated: use ApplicationProperties() for a more type-safe interface
++ Properties() map[string]interface{}
++ SetProperties(v map[string]interface{})
+}
+
+type message struct{ pn *C.pn_message_t }
+
+func freeMessage(m *message) {
+ C.pn_message_free(m.pn)
+ m.pn = nil
+}
+
+// NewMessage creates a new message instance.
+func NewMessage() Message {
+ m := &message{C.pn_message()}
+ runtime.SetFinalizer(m, freeMessage)
+ return m
+}
+
+// NewMessageWith creates a message with value as the body. Equivalent to
+// m := NewMessage(); m.Marshal(body)
+func NewMessageWith(value interface{}) Message {
+ m := NewMessage()
+ m.Marshal(value)
+ return m
+}
+
+func (m *message) Clear() { C.pn_message_clear(m.pn) }
+
+func (m *message) Copy(x Message) error {
+ if data, err := x.Encode(nil); err == nil {
+ return m.Decode(data)
+ } else {
+ return err
+ }
+}
+
+// ==== message get functions
+
+func rewindGet(data *C.pn_data_t) (v interface{}) {
+ C.pn_data_rewind(data)
+ C.pn_data_next(data)
+ unmarshal(&v, data)
+ return v
+}
+
- func rewindMap(data *C.pn_data_t) (v map[string]interface{}) {
- C.pn_data_rewind(data)
- C.pn_data_next(data)
- unmarshal(&v, data)
- return v
- }
-
+func (m *message) Inferred() bool { return bool(C.pn_message_is_inferred(m.pn)) }
+func (m *message) Durable() bool { return bool(C.pn_message_is_durable(m.pn)) }
+func (m *message) Priority() uint8 { return uint8(C.pn_message_get_priority(m.pn)) }
+func (m *message) TTL() time.Duration {
+ return time.Duration(C.pn_message_get_ttl(m.pn)) * time.Millisecond
+}
+func (m *message) FirstAcquirer() bool { return bool(C.pn_message_is_first_acquirer(m.pn)) }
+func (m *message) DeliveryCount() uint32 { return uint32(C.pn_message_get_delivery_count(m.pn)) }
+func (m *message) MessageId() interface{} { return rewindGet(C.pn_message_id(m.pn)) }
+func (m *message) UserId() string { return goString(C.pn_message_get_user_id(m.pn)) }
+func (m *message) Address() string { return C.GoString(C.pn_message_get_address(m.pn)) }
+func (m *message) Subject() string { return C.GoString(C.pn_message_get_subject(m.pn)) }
+func (m *message) ReplyTo() string { return C.GoString(C.pn_message_get_reply_to(m.pn)) }
+func (m *message) CorrelationId() interface{} { return rewindGet(C.pn_message_correlation_id(m.pn)) }
+func (m *message) ContentType() string { return C.GoString(C.pn_message_get_content_type(m.pn)) }
+func (m *message) ContentEncoding() string { return C.GoString(C.pn_message_get_content_encoding(m.pn)) }
+
+func (m *message) ExpiryTime() time.Time {
+ return time.Unix(0, int64(time.Millisecond*time.Duration(C.pn_message_get_expiry_time(m.pn))))
+}
+func (m *message) CreationTime() time.Time {
+ return time.Unix(0, int64(time.Millisecond)*int64(C.pn_message_get_creation_time(m.pn)))
+}
+func (m *message) GroupId() string { return C.GoString(C.pn_message_get_group_id(m.pn)) }
+func (m *message) GroupSequence() int32 { return int32(C.pn_message_get_group_sequence(m.pn)) }
+func (m *message) ReplyToGroupId() string { return C.GoString(C.pn_message_get_reply_to_group_id(m.pn)) }
+
- func (m *message) Instructions() map[string]interface{} {
- return rewindMap(C.pn_message_instructions(m.pn))
++func getAnnotations(data *C.pn_data_t) (v map[AnnotationKey]interface{}) {
++ C.pn_data_rewind(data)
++ C.pn_data_next(data)
++ unmarshal(&v, data)
++ return v
+}
- func (m *message) Annotations() map[string]interface{} {
- return rewindMap(C.pn_message_annotations(m.pn))
++
++func (m *message) DeliveryAnnotations() map[AnnotationKey]interface{} {
++ return getAnnotations(C.pn_message_instructions(m.pn))
+}
- func (m *message) Properties() map[string]interface{} {
- return rewindMap(C.pn_message_properties(m.pn))
++func (m *message) MessageAnnotations() map[AnnotationKey]interface{} {
++ return getAnnotations(C.pn_message_annotations(m.pn))
++}
++
++func (m *message) ApplicationProperties() map[string]interface{} {
++ var v map[string]interface{}
++ data := C.pn_message_properties(m.pn)
++ C.pn_data_rewind(data)
++ C.pn_data_next(data)
++ unmarshal(&v, data)
++ return v
+}
+
+// ==== message set methods
+
+func setData(v interface{}, data *C.pn_data_t) {
+ C.pn_data_clear(data)
+ marshal(v, data)
+}
+
- func dataString(data *C.pn_data_t) string {
- str := C.pn_string(C.CString(""))
- defer C.pn_free(unsafe.Pointer(str))
- C.pn_inspect(unsafe.Pointer(data), str)
- return C.GoString(C.pn_string_get(str))
- }
-
+func (m *message) SetInferred(b bool) { C.pn_message_set_inferred(m.pn, C.bool(b)) }
+func (m *message) SetDurable(b bool) { C.pn_message_set_durable(m.pn, C.bool(b)) }
+func (m *message) SetPriority(b uint8) { C.pn_message_set_priority(m.pn, C.uint8_t(b)) }
+func (m *message) SetTTL(d time.Duration) {
+ C.pn_message_set_ttl(m.pn, C.pn_millis_t(d/time.Millisecond))
+}
+func (m *message) SetFirstAcquirer(b bool) { C.pn_message_set_first_acquirer(m.pn, C.bool(b)) }
+func (m *message) SetDeliveryCount(c uint32) { C.pn_message_set_delivery_count(m.pn, C.uint32_t(c)) }
+func (m *message) SetMessageId(id interface{}) { setData(id, C.pn_message_id(m.pn)) }
+func (m *message) SetUserId(s string) { C.pn_message_set_user_id(m.pn, pnBytes(([]byte)(s))) }
+func (m *message) SetAddress(s string) {
+ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_address))
+}
+func (m *message) SetSubject(s string) {
+ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_subject))
+}
+func (m *message) SetReplyTo(s string) {
+ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to))
+}
+func (m *message) SetCorrelationId(c interface{}) { setData(c, C.pn_message_correlation_id(m.pn)) }
+func (m *message) SetContentType(s string) {
+ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_type))
+}
+func (m *message) SetContentEncoding(s string) {
+ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_encoding))
+}
+func (m *message) SetExpiryTime(t time.Time) { C.pn_message_set_expiry_time(m.pn, pnTime(t)) }
+func (m *message) SetCreationTime(t time.Time) { C.pn_message_set_creation_time(m.pn, pnTime(t)) }
+func (m *message) SetGroupId(s string) {
+ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_group_id))
+}
+func (m *message) SetGroupSequence(s int32) {
+ C.pn_message_set_group_sequence(m.pn, C.pn_sequence_t(s))
+}
+func (m *message) SetReplyToGroupId(s string) {
+ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to_group_id))
+}
+
- func (m *message) SetInstructions(v map[string]interface{}) {
++func (m *message) SetDeliveryAnnotations(v map[AnnotationKey]interface{}) {
+ setData(v, C.pn_message_instructions(m.pn))
+}
- func (m *message) SetAnnotations(v map[string]interface{}) { setData(v, C.pn_message_annotations(m.pn)) }
- func (m *message) SetProperties(v map[string]interface{}) { setData(v, C.pn_message_properties(m.pn)) }
++func (m *message) SetMessageAnnotations(v map[AnnotationKey]interface{}) {
++ setData(v, C.pn_message_annotations(m.pn))
++}
++func (m *message) SetApplicationProperties(v map[string]interface{}) {
++ setData(v, C.pn_message_properties(m.pn))
++}
+
+// Marshal/Unmarshal body
+func (m *message) Marshal(v interface{}) { clearMarshal(v, C.pn_message_body(m.pn)) }
+func (m *message) Unmarshal(v interface{}) { rewindUnmarshal(v, C.pn_message_body(m.pn)) }
+func (m *message) Body() (v interface{}) { m.Unmarshal(&v); return }
+
+func (m *message) Decode(data []byte) error {
+ m.Clear()
+ if len(data) == 0 {
+ return fmt.Errorf("empty buffer for decode")
+ }
+ if C.pn_message_decode(m.pn, cPtr(data), cLen(data)) < 0 {
+ return fmt.Errorf("decoding message: %s", PnError(C.pn_message_error(m.pn)))
+ }
+ return nil
+}
+
+func DecodeMessage(data []byte) (m Message, err error) {
+ m = NewMessage()
+ err = m.Decode(data)
+ return
+}
+
+func (m *message) Encode(buffer []byte) ([]byte, error) {
+ encode := func(buf []byte) ([]byte, error) {
+ len := cLen(buf)
+ result := C.pn_message_encode(m.pn, cPtr(buf), &len)
+ switch {
+ case result == C.PN_OVERFLOW:
+ return buf, overflow
+ case result < 0:
+ return buf, fmt.Errorf("cannot encode message: %s", PnErrorCode(result))
+ default:
+ return buf[:len], nil
+ }
+ }
+ return encodeGrow(buffer, encode)
+}
+
+// TODO aconway 2015-09-14: Multi-section messages.
+
+// TODO aconway 2016-09-09: Message.String() use inspect.
++
++// ==== Deprecated functions
++func oldGetAnnotations(data *C.pn_data_t) (v map[string]interface{}) {
++ C.pn_data_rewind(data)
++ C.pn_data_next(data)
++ unmarshal(&v, data)
++ return v
++}
++
++func (m *message) Instructions() map[string]interface{} {
++ return oldGetAnnotations(C.pn_message_instructions(m.pn))
++}
++func (m *message) Annotations() map[string]interface{} {
++ return oldGetAnnotations(C.pn_message_annotations(m.pn))
++}
++func (m *message) Properties() map[string]interface{} {
++ return oldGetAnnotations(C.pn_message_properties(m.pn))
++}
++
++// Convert old string-keyed annotations to an AnnotationKey map
++func fixAnnotations(old map[string]interface{}) (annotations map[AnnotationKey]interface{}) {
++ annotations = make(map[AnnotationKey]interface{})
++ for k, v := range old {
++ annotations[AnnotationKeyString(k)] = v
++ }
++ return
++}
++
++func (m *message) SetInstructions(v map[string]interface{}) {
++ setData(fixAnnotations(v), C.pn_message_instructions(m.pn))
++}
++func (m *message) SetAnnotations(v map[string]interface{}) {
++ setData(fixAnnotations(v), C.pn_message_annotations(m.pn))
++}
++func (m *message) SetProperties(v map[string]interface{}) {
++ setData(fixAnnotations(v), C.pn_message_properties(m.pn))
++}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/847a83cc/amqp/message_test.go
----------------------------------------------------------------------
diff --cc amqp/message_test.go
index 7a6e5a8,0000000..bfebfa1
mode 100644,000000..100644
--- a/amqp/message_test.go
+++ b/amqp/message_test.go
@@@ -1,166 -1,0 +1,201 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package amqp
+
+import (
+ "testing"
+ "time"
+)
+
+func roundTrip(m Message) error {
+ buffer, err := m.Encode(nil)
+ if err != nil {
+ return err
+ }
+ m2, err := DecodeMessage(buffer)
+ if err != nil {
+ return err
+ }
+ return checkEqual(m, m2)
+}
+
+func TestDefaultMessage(t *testing.T) {
+ m := NewMessage()
+ // Check defaults
+ for _, data := range [][]interface{}{
+ {m.Inferred(), false},
+ {m.Durable(), false},
+ {m.Priority(), uint8(4)},
+ {m.TTL(), time.Duration(0)},
+ {m.UserId(), ""},
+ {m.Address(), ""},
+ {m.Subject(), ""},
+ {m.ReplyTo(), ""},
+ {m.ContentType(), ""},
+ {m.ContentEncoding(), ""},
+ {m.GroupId(), ""},
+ {m.GroupSequence(), int32(0)},
+ {m.ReplyToGroupId(), ""},
+ {m.MessageId(), nil},
+ {m.CorrelationId(), nil},
++ {m.DeliveryAnnotations(), map[AnnotationKey]interface{}{}},
++ {m.MessageAnnotations(), map[AnnotationKey]interface{}{}},
++ {m.ApplicationProperties(), map[string]interface{}{}},
++
++ // Deprecated
+ {m.Instructions(), map[string]interface{}{}},
+ {m.Annotations(), map[string]interface{}{}},
+ {m.Properties(), map[string]interface{}{}},
+ {m.Body(), nil},
+ } {
+ if err := checkEqual(data[0], data[1]); err != nil {
+ t.Error(err)
+ }
+ }
+ if err := roundTrip(m); err != nil {
+ t.Error(err)
+ }
+}
+
+func TestMessageRoundTrip(t *testing.T) {
+ m := NewMessage()
+ m.SetInferred(false)
+ m.SetDurable(true)
+ m.SetPriority(42)
+ m.SetTTL(0)
+ m.SetUserId("user")
+ m.SetAddress("address")
+ m.SetSubject("subject")
+ m.SetReplyTo("replyto")
+ m.SetContentType("content")
+ m.SetContentEncoding("encoding")
+ m.SetGroupId("group")
+ m.SetGroupSequence(42)
+ m.SetReplyToGroupId("replytogroup")
+ m.SetMessageId("id")
+ m.SetCorrelationId("correlation")
- m.SetInstructions(map[string]interface{}{"instructions": "foo"})
- m.SetAnnotations(map[string]interface{}{"annotations": "foo"})
- m.SetProperties(map[string]interface{}{"int": int32(32), "bool": true, "string": "foo"})
++ m.SetDeliveryAnnotations(map[AnnotationKey]interface{}{AnnotationKeySymbol("instructions"): "foo"})
++ m.SetMessageAnnotations(map[AnnotationKey]interface{}{AnnotationKeySymbol("annotations"): "bar"})
++ m.SetApplicationProperties(map[string]interface{}{"int": int32(32), "bool": true})
+ m.Marshal("hello")
+
+ for _, data := range [][]interface{}{
+ {m.Inferred(), false},
+ {m.Durable(), true},
+ {m.Priority(), uint8(42)},
+ {m.TTL(), time.Duration(0)},
+ {m.UserId(), "user"},
+ {m.Address(), "address"},
+ {m.Subject(), "subject"},
+ {m.ReplyTo(), "replyto"},
+ {m.ContentType(), "content"},
+ {m.ContentEncoding(), "encoding"},
+ {m.GroupId(), "group"},
+ {m.GroupSequence(), int32(42)},
+ {m.ReplyToGroupId(), "replytogroup"},
+ {m.MessageId(), "id"},
+ {m.CorrelationId(), "correlation"},
- {m.Instructions(), map[string]interface{}{"instructions": "foo"}},
- {m.Annotations(), map[string]interface{}{"annotations": "foo"}},
- {m.Properties(), map[string]interface{}{"int": int32(32), "bool": true, "string": "foo"}},
++
++ {m.DeliveryAnnotations(), map[AnnotationKey]interface{}{AnnotationKeySymbol("instructions"): "foo"}},
++ {m.MessageAnnotations(), map[AnnotationKey]interface{}{AnnotationKeySymbol("annotations"): "bar"}},
++ {m.ApplicationProperties(), map[string]interface{}{"int": int32(32), "bool": true}},
+ {m.Body(), "hello"},
++
++ // Deprecated
++ {m.Instructions(), map[string]interface{}{"instructions": "foo"}},
++ {m.Annotations(), map[string]interface{}{"annotations": "bar"}},
++ } {
++ if err := checkEqual(data[0], data[1]); err != nil {
++ t.Error(err)
++ }
++ }
++ if err := roundTrip(m); err != nil {
++ t.Error(err)
++ }
++}
++
++func TestDeprecated(t *testing.T) {
++ m := NewMessage()
++
++ m.SetInstructions(map[string]interface{}{"instructions": "foo"})
++ m.SetAnnotations(map[string]interface{}{"annotations": "bar"})
++ m.SetProperties(map[string]interface{}{"int": int32(32), "bool": true})
++
++ for _, data := range [][]interface{}{
++ {m.DeliveryAnnotations(), map[AnnotationKey]interface{}{AnnotationKeySymbol("instructions"): "foo"}},
++ {m.MessageAnnotations(), map[AnnotationKey]interface{}{AnnotationKeySymbol("annotations"): "bar"}},
++ {m.ApplicationProperties(), map[string]interface{}{"int": int32(32), "bool": true}},
++
++ {m.Instructions(), map[string]interface{}{"instructions": "foo"}},
++ {m.Annotations(), map[string]interface{}{"annotations": "bar"}},
++ {m.Properties(), map[string]interface{}{"int": int32(32), "bool": true}},
+ } {
+ if err := checkEqual(data[0], data[1]); err != nil {
+ t.Error(err)
+ }
+ }
+ if err := roundTrip(m); err != nil {
+ t.Error(err)
+ }
+}
+
+func TestMessageBodyTypes(t *testing.T) {
+ var s string
+ var body interface{}
+ var i int64
+
+ m := NewMessageWith(int64(42))
+ m.Unmarshal(&body)
+ m.Unmarshal(&i)
+ if err := checkEqual(body.(int64), int64(42)); err != nil {
+ t.Error(err)
+ }
+ if err := checkEqual(i, int64(42)); err != nil {
+ t.Error(err)
+ }
+
+ m = NewMessageWith("hello")
+ m.Unmarshal(&s)
+ m.Unmarshal(&body)
+ if err := checkEqual(s, "hello"); err != nil {
+ t.Error(err)
+ }
+ if err := checkEqual(body.(string), "hello"); err != nil {
+ t.Error(err)
+ }
+ if err := roundTrip(m); err != nil {
+ t.Error(err)
+ }
+
+ m = NewMessageWith(Binary("bin"))
+ m.Unmarshal(&s)
+ m.Unmarshal(&body)
+ if err := checkEqual(body.(Binary), Binary("bin")); err != nil {
+ t.Error(err)
+ }
+ if err := checkEqual(s, "bin"); err != nil {
+ t.Error(err)
+ }
+ if err := roundTrip(m); err != nil {
+ t.Error(err)
+ }
+
+ // TODO aconway 2015-09-08: array etc.
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/847a83cc/amqp/types.go
----------------------------------------------------------------------
diff --cc amqp/types.go
index 2852c23,0000000..9d41de6
mode 100644,000000..100644
--- a/amqp/types.go
+++ b/amqp/types.go
@@@ -1,194 -1,0 +1,221 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package amqp
+
+// #include <proton/codec.h>
+import "C"
+
+import (
+ "bytes"
+ "fmt"
+ "reflect"
+ "time"
+ "unsafe"
+)
+
+func (t C.pn_type_t) String() string {
+ switch C.pn_type_t(t) {
+ case C.PN_NULL:
+ return "null"
+ case C.PN_BOOL:
+ return "bool"
+ case C.PN_UBYTE:
+ return "ubyte"
+ case C.PN_BYTE:
+ return "byte"
+ case C.PN_USHORT:
+ return "ushort"
+ case C.PN_SHORT:
+ return "short"
+ case C.PN_CHAR:
+ return "char"
+ case C.PN_UINT:
+ return "uint"
+ case C.PN_INT:
+ return "int"
+ case C.PN_ULONG:
+ return "ulong"
+ case C.PN_LONG:
+ return "long"
+ case C.PN_TIMESTAMP:
+ return "timestamp"
+ case C.PN_FLOAT:
+ return "float"
+ case C.PN_DOUBLE:
+ return "double"
+ case C.PN_DECIMAL32:
+ return "decimal32"
+ case C.PN_DECIMAL64:
+ return "decimal64"
+ case C.PN_DECIMAL128:
+ return "decimal128"
+ case C.PN_UUID:
+ return "uuid"
+ case C.PN_BINARY:
+ return "binary"
+ case C.PN_STRING:
+ return "string"
+ case C.PN_SYMBOL:
+ return "symbol"
+ case C.PN_DESCRIBED:
+ return "described"
+ case C.PN_ARRAY:
+ return "array"
+ case C.PN_LIST:
+ return "list"
+ case C.PN_MAP:
+ return "map"
+ default:
- return "no-data"
++ return fmt.Sprintf("<bad-type %v>", int(t))
+ }
+}
+
+// Go types
+var (
+ bytesType = reflect.TypeOf([]byte{})
+ valueType = reflect.TypeOf(reflect.Value{})
+)
+
+// TODO aconway 2015-04-08: can't handle AMQP maps with key types that are not valid Go map keys.
+
+// Map is a generic map that can have mixed key and value types and so can represent any AMQP map
+type Map map[interface{}]interface{}
+
+// List is a generic list that can hold mixed values and can represent any AMQP list.
+//
+type List []interface{}
+
+// Symbol is a string that is encoded as an AMQP symbol
+type Symbol string
+
++func (s Symbol) String() string { return string(s) }
+func (s Symbol) GoString() string { return fmt.Sprintf("s\"%s\"", s) }
+
+// Binary is a string that is encoded as an AMQP binary.
+// It is a string rather than a byte[] because byte[] is not hashable and can't be used as
+// a map key, AMQP frequently uses binary types as map keys. It can convert to and from []byte
+type Binary string
+
++func (b Binary) String() string { return string(b) }
+func (b Binary) GoString() string { return fmt.Sprintf("b\"%s\"", b) }
+
+// GoString for Map prints values with their types, useful for debugging.
+func (m Map) GoString() string {
+ out := &bytes.Buffer{}
+ fmt.Fprintf(out, "%T{", m)
+ i := len(m)
+ for k, v := range m {
+ fmt.Fprintf(out, "%T(%#v): %T(%#v)", k, k, v, v)
+ i--
+ if i > 0 {
+ fmt.Fprint(out, ", ")
+ }
+ }
+ fmt.Fprint(out, "}")
+ return out.String()
+}
+
+// GoString for List prints values with their types, useful for debugging.
+func (l List) GoString() string {
+ out := &bytes.Buffer{}
+ fmt.Fprintf(out, "%T{", l)
+ for i := 0; i < len(l); i++ {
+ fmt.Fprintf(out, "%T(%#v)", l[i], l[i])
+ if i == len(l)-1 {
+ fmt.Fprint(out, ", ")
+ }
+ }
+ fmt.Fprint(out, "}")
+ return out.String()
+}
+
+// pnTime converts Go time.Time to Proton millisecond Unix time.
+func pnTime(t time.Time) C.pn_timestamp_t {
+ secs := t.Unix()
+ // Note: sub-second accuracy is not guaraunteed if the Unix time in
+ // nanoseconds cannot be represented by an int64 (sometime around year 2260)
+ msecs := (t.UnixNano() % int64(time.Second)) / int64(time.Millisecond)
+ return C.pn_timestamp_t(secs*1000 + msecs)
+}
+
+// goTime converts a pn_timestamp_t to a Go time.Time.
+func goTime(t C.pn_timestamp_t) time.Time {
+ secs := int64(t) / 1000
+ nsecs := (int64(t) % 1000) * int64(time.Millisecond)
+ return time.Unix(secs, nsecs)
+}
+
+func goBytes(cBytes C.pn_bytes_t) (bytes []byte) {
+ if cBytes.start != nil {
+ bytes = C.GoBytes(unsafe.Pointer(cBytes.start), C.int(cBytes.size))
+ }
+ return
+}
+
+func goString(cBytes C.pn_bytes_t) (str string) {
+ if cBytes.start != nil {
+ str = C.GoStringN(cBytes.start, C.int(cBytes.size))
+ }
+ return
+}
+
+func pnBytes(b []byte) C.pn_bytes_t {
+ if len(b) == 0 {
+ return C.pn_bytes_t{0, nil}
+ } else {
+ return C.pn_bytes_t{C.size_t(len(b)), (*C.char)(unsafe.Pointer(&b[0]))}
+ }
+}
+
+func cPtr(b []byte) *C.char {
+ if len(b) == 0 {
+ return nil
+ }
+ return (*C.char)(unsafe.Pointer(&b[0]))
+}
+
+func cLen(b []byte) C.size_t {
+ return C.size_t(len(b))
+}
++
++// AnnotationKey is used as a map key for AMQP annotation maps which are
++// allowed to have keys that are either symbol or ulong but no other type.
++//
++type AnnotationKey struct {
++ value interface{}
++}
++
++func AnnotationKeySymbol(v Symbol) AnnotationKey { return AnnotationKey{v} }
++func AnnotationKeyUint64(v uint64) AnnotationKey { return AnnotationKey{v} }
++func AnnotationKeyString(v string) AnnotationKey { return AnnotationKey{Symbol(v)} }
++
++// Returns the value which must be Symbol, uint64 or nil
++func (k AnnotationKey) Get() interface{} { return k.value }
++
++func (k AnnotationKey) String() string { return fmt.Sprintf("%v", k.Get()) }
++
++// Described represents an AMQP described type, which is really
++// just a pair of AMQP values - the first is treated as a "descriptor",
++// and is normally a string or ulong providing information about the type.
++// The second is the "value" and can be any AMQP value.
++type Described struct {
++ Descriptor interface{}
++ Value interface{}
++}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/847a83cc/amqp/types_test.go
----------------------------------------------------------------------
diff --cc amqp/types_test.go
index 0000000,0000000..959a558
new file mode 100644
--- /dev/null
+++ b/amqp/types_test.go
@@@ -1,0 -1,0 +1,197 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements. See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership. The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied. See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package amqp
++
++import (
++ "fmt"
++ "reflect"
++ "testing"
++)
++
++func checkEqual(want interface{}, got interface{}) error {
++ if !reflect.DeepEqual(want, got) {
++ return fmt.Errorf("%#v != %#v", want, got)
++ }
++ return nil
++}
++
++func checkUnmarshal(marshalled []byte, v interface{}) error {
++ got, err := Unmarshal(marshalled, v)
++ if err != nil {
++ return err
++ }
++ if got != len(marshalled) {
++ return fmt.Errorf("Wanted to Unmarshal %v bytes, got %v", len(marshalled), got)
++ }
++ return nil
++}
++
++func ExampleKey() {
++ var k AnnotationKey = AnnotationKeySymbol(Symbol("foo"))
++ fmt.Println(k.Get().(Symbol))
++ k = AnnotationKeyUint64(42)
++ fmt.Println(k.Get().(uint64))
++ // Output:
++ // foo
++ // 42
++}
++
++// Values that are unchanged by a marshal/unmarshal round-trip from interface{}
++// to interface{}
++var rtValues = []interface{}{
++ true,
++ int8(-8), int16(-16), int32(-32), int64(-64),
++ uint8(8), uint16(16), uint32(32), uint64(64),
++ float32(0.32), float64(0.64),
++ "string", Binary("Binary"), Symbol("symbol"),
++ nil,
++ Map{"V": "X"},
++ List{"V", int32(1)},
++ Described{"D", "V"},
++}
++
++// Go values that unmarshal as an equivalent value but a different type
++// if unmarshalled to interface{}.
++var oddValues = []interface{}{
++ int(-99), uint(99), // [u]int32|64
++ []byte("byte"), // amqp.Binary
++ map[string]int{"str": 99}, // amqp.Map
++ []string{"a", "b"}, // amqp.List
++}
++
++var allValues = append(rtValues, oddValues...)
++
++// %v formatted representation of allValues
++var vstrings = []string{
++ // for rtValues
++ "true",
++ "-8", "-16", "-32", "-64",
++ "8", "16", "32", "64",
++ "0.32", "0.64",
++ "string", "Binary", "symbol",
++ "<nil>",
++ "map[V:X]",
++ "[V 1]",
++ "{D V}",
++ // for oddValues
++ "-99", "99",
++ "[98 121 116 101]", /*"byte"*/
++ "map[str:99]",
++ "[a b]",
++}
++
++// Round-trip encoding test
++func TestTypesRoundTrip(t *testing.T) {
++ for _, x := range rtValues {
++ marshalled, err := Marshal(x, nil)
++ if err != nil {
++ t.Error(err)
++ }
++ var v interface{}
++ if err := checkUnmarshal(marshalled, &v); err != nil {
++ t.Error(err)
++ }
++ if err := checkEqual(v, x); err != nil {
++ t.Error(t, err)
++ }
++ }
++}
++
++// Round trip from T to T where T is the type of the value.
++func TestTypesRoundTripAll(t *testing.T) {
++ for _, x := range allValues {
++ marshalled, err := Marshal(x, nil)
++ if err != nil {
++ t.Error(err)
++ }
++ if x == nil { // We can't create an instance of nil to unmarshal to.
++ continue
++ }
++ vp := reflect.New(reflect.TypeOf(x)) // v points to a Zero of the same type as x
++ if err := checkUnmarshal(marshalled, vp.Interface()); err != nil {
++ t.Error(err)
++ }
++ v := vp.Elem().Interface()
++ if err := checkEqual(v, x); err != nil {
++ t.Error(err)
++ }
++ }
++}
++
++func TestTypesPrint(t *testing.T) {
++ // Default %v representations of rtValues and oddValues
++ for i, x := range allValues {
++ if s := fmt.Sprintf("%v", x); vstrings[i] != s {
++ t.Errorf("printing %T: want %v, got %v", x, vstrings[i], s)
++ }
++ }
++}
++
++func TestDescribed(t *testing.T) {
++ want := Described{"D", "V"}
++ marshalled, _ := Marshal(want, nil)
++
++ // Unmarshal to Described type
++ var d Described
++ if err := checkUnmarshal(marshalled, &d); err != nil {
++ t.Error(err)
++ }
++ if err := checkEqual(want, d); err != nil {
++ t.Error(err)
++ }
++
++ // Unmarshal to interface{}
++ var i interface{}
++ if err := checkUnmarshal(marshalled, &i); err != nil {
++ t.Error(err)
++ }
++ if _, ok := i.(Described); !ok {
++ t.Errorf("Expected Described, got %T(%v)", i, i)
++ }
++ if err := checkEqual(want, i); err != nil {
++ t.Error(err)
++ }
++
++ // Unmarshal value only (drop descriptor) to the value type
++ var s string
++ if err := checkUnmarshal(marshalled, &s); err != nil {
++ t.Error(err)
++ }
++ if err := checkEqual(want.Value, s); err != nil {
++ t.Error(err)
++ }
++
++ // Nested described types
++ want = Described{Described{int64(123), true}, "foo"}
++ marshalled, _ = Marshal(want, nil)
++ if err := checkUnmarshal(marshalled, &d); err != nil {
++ t.Error(err)
++ }
++ if err := checkEqual(want, d); err != nil {
++ t.Error(err)
++ }
++ // Nested to interface
++ if err := checkUnmarshal(marshalled, &i); err != nil {
++ t.Error(err)
++ }
++ if err := checkEqual(want, i); err != nil {
++ t.Error(err)
++ }
++}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/847a83cc/amqp/unmarshal.go
----------------------------------------------------------------------
diff --cc amqp/unmarshal.go
index d56cbd2,0000000..253d66d
mode 100644,000000..100644
--- a/amqp/unmarshal.go
+++ b/amqp/unmarshal.go
@@@ -1,567 -1,0 +1,638 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+oor more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package amqp
+
+// #include <proton/codec.h>
+import "C"
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "reflect"
++ "strings"
+ "unsafe"
+)
+
+const minDecode = 1024
+
+// Error returned if AMQP data cannot be unmarshaled as the desired Go type.
+type UnmarshalError struct {
+ // The name of the AMQP type.
+ AMQPType string
+ // The Go type.
+ GoType reflect.Type
+
+ s string
+}
+
+func (e UnmarshalError) Error() string { return e.s }
+
- func newUnmarshalError(pnType C.pn_type_t, v interface{}) *UnmarshalError {
++func newUnmarshalErrorMsg(pnType C.pn_type_t, v interface{}, msg string) *UnmarshalError {
++ if len(msg) > 0 && !strings.HasPrefix(msg, ":") {
++ msg = ": " + msg
++ }
+ e := &UnmarshalError{AMQPType: C.pn_type_t(pnType).String(), GoType: reflect.TypeOf(v)}
+ if e.GoType.Kind() != reflect.Ptr {
- e.s = fmt.Sprintf("cannot unmarshal to type %s, not a pointer", e.GoType)
++ e.s = fmt.Sprintf("cannot unmarshal to type %s, not a pointer%s", e.GoType, msg)
+ } else {
- e.s = fmt.Sprintf("cannot unmarshal AMQP %s to %s", e.AMQPType, e.GoType)
++ e.s = fmt.Sprintf("cannot unmarshal AMQP %s to %s%s", e.AMQPType, e.GoType, msg)
+ }
+ return e
+}
+
++func newUnmarshalError(pnType C.pn_type_t, v interface{}) *UnmarshalError {
++ return newUnmarshalErrorMsg(pnType, v, "")
++}
++
+func newUnmarshalErrorData(data *C.pn_data_t, v interface{}) *UnmarshalError {
+ err := PnError(C.pn_data_error(data))
+ if err == nil {
+ return nil
+ }
+ e := newUnmarshalError(C.pn_data_type(data), v)
+ e.s = e.s + ": " + err.Error()
+ return e
+}
+
+func recoverUnmarshal(err *error) {
+ if r := recover(); r != nil {
+ if uerr, ok := r.(*UnmarshalError); ok {
+ *err = uerr
+ } else {
+ panic(r)
+ }
+ }
+}
+
+//
+// Decoding from a pn_data_t
+//
+// NOTE: we use panic() to signal a decoding error, simplifies decoding logic.
+// We recover() at the highest possible level - i.e. in the exported Unmarshal or Decode.
+//
+
+// Decoder decodes AMQP values from an io.Reader.
+//
+type Decoder struct {
+ reader io.Reader
+ buffer bytes.Buffer
+}
+
+// NewDecoder returns a new decoder that reads from r.
+//
+// The decoder has it's own buffer and may read more data than required for the
+// AMQP values requested. Use Buffered to see if there is data left in the
+// buffer.
+//
+func NewDecoder(r io.Reader) *Decoder {
+ return &Decoder{r, bytes.Buffer{}}
+}
+
+// Buffered returns a reader of the data remaining in the Decoder's buffer. The
+// reader is valid until the next call to Decode.
+//
+func (d *Decoder) Buffered() io.Reader {
+ return bytes.NewReader(d.buffer.Bytes())
+}
+
+// Decode reads the next AMQP value from the Reader and stores it in the value pointed to by v.
+//
+// See the documentation for Unmarshal for details about the conversion of AMQP into a Go value.
+//
+func (d *Decoder) Decode(v interface{}) (err error) {
+ defer recoverUnmarshal(&err)
+ data := C.pn_data(0)
+ defer C.pn_data_free(data)
+ var n int
+ for n == 0 {
+ n, err = decode(data, d.buffer.Bytes())
+ if err != nil {
+ return err
+ }
+ if n == 0 { // n == 0 means not enough data, read more
+ err = d.more()
+ } else {
+ unmarshal(v, data)
+ }
+ }
+ d.buffer.Next(n)
+ return
+}
+
+/*
- Unmarshal decodes AMQP-encoded bytes and stores the result in the value pointed to by v.
++Unmarshal decodes AMQP-encoded bytes and stores the result in the Go value pointed to by v.
+Types are converted as follows:
+
- +---------------------------+----------------------------------------------------------------------+
- |To Go types |From AMQP types |
- +===========================+======================================================================+
- |bool |bool |
- +---------------------------+----------------------------------------------------------------------+
- |int, int8, int16, |Equivalent or smaller signed integer type: byte, short, int, long. |
- |int32, int64 | |
- +---------------------------+----------------------------------------------------------------------+
- |uint, uint8, uint16, |Equivalent or smaller unsigned integer type: ubyte, ushort, uint, |
- |uint32, uint64 types |ulong |
- +---------------------------+----------------------------------------------------------------------+
- |float32, float64 |Equivalent or smaller float or double. |
- +---------------------------+----------------------------------------------------------------------+
- |string, []byte |string, symbol or binary. |
- +---------------------------+----------------------------------------------------------------------+
- |Symbol |symbol |
- +---------------------------+----------------------------------------------------------------------+
- |map[K]T |map, provided all keys and values can unmarshal to types K, T |
- +---------------------------+----------------------------------------------------------------------+
- |Map |map, any AMQP map |
- +---------------------------+----------------------------------------------------------------------+
- |interface{} |Any AMQP value can be unmarshaled to an interface{} as follows: |
- | +------------------------+---------------------------------------------+
- | |AMQP Type |Go Type in interface{} |
- | +========================+=============================================+
- | |bool |bool |
- | +------------------------+---------------------------------------------+
- | |byte,short,int,long |int8,int16,int32,int64 |
- | +------------------------+---------------------------------------------+
- | |ubyte,ushort,uint,ulong |uint8,uint16,uint32,uint64 |
- | +------------------------+---------------------------------------------+
- | |float, double |float32, float64 |
- | +------------------------+---------------------------------------------+
- | |string |string |
- | +------------------------+---------------------------------------------+
- | |symbol |Symbol |
- | +------------------------+---------------------------------------------+
- | |binary |Binary |
- | +------------------------+---------------------------------------------+
- | |nulll |nil |
- | +------------------------+---------------------------------------------+
- | |map |Map |
- | +------------------------+---------------------------------------------+
- | |list |List |
- +---------------------------+------------------------+---------------------------------------------+
-
- The following Go types cannot be unmarshaled: uintptr, function, interface, channel.
-
- TODO
-
- Go types: array, struct.
-
- AMQP types: decimal32/64/128, char (round trip), timestamp, uuid, array, multi-section message bodies.
-
- AMQP maps with mixed/unhashable key types need an alternate representation.
-
- Described types.
++ +------------------------+-------------------------------------------------+
++ |To Go types |From AMQP types |
++ +========================+=================================================+
++ |bool |bool |
++ +------------------------+-------------------------------------------------+
++ |int, int8, int16, int32,|Equivalent or smaller signed integer type: byte, |
++ |int64 |short, int, long. |
++ +------------------------+-------------------------------------------------+
++ |uint, uint8, uint16, |Equivalent or smaller unsigned integer type: |
++ |uint32, uint64 |ubyte, ushort, uint, ulong |
++ +------------------------+-------------------------------------------------+
++ |float32, float64 |Equivalent or smaller float or double. |
++ +------------------------+-------------------------------------------------+
++ |string, []byte |string, symbol or binary. |
++ +------------------------+-------------------------------------------------+
++ |Symbol |symbol |
++ +------------------------+-------------------------------------------------+
++ |map[K]T |map, provided all keys and values can unmarshal |
++ | |to types K,T |
++ +------------------------+-------------------------------------------------+
++ |Map |map, any AMQP map |
++ +------------------------+-------------------------------------------------+
++ |Described |described type |
++ +------------------------+-------------------------------------------------+
++
++An AMQP described type can unmarshal into the corresponding plain type, discarding the descriptor.
++For example an AMQP described string can unmarshal into a plain go string.
++Unmarshal into the Described type preserves the descriptor.
++
++Any AMQP type can unmarshal to an interface{}, the Go type used to unmarshal is chosen from the AMQP type as follows
++
++ +------------------------+-------------------------------------------------+
++ |AMQP Type |Go Type in interface{} |
++ +========================+=================================================+
++ |bool |bool |
++ +------------------------+-------------------------------------------------+
++ |byte,short,int,long |int8,int16,int32,int64 |
++ +------------------------+-------------------------------------------------+
++ |ubyte,ushort,uint,ulong |uint8,uint16,uint32,uint64 |
++ +------------------------+-------------------------------------------------+
++ |float, double |float32, float64 |
++ +------------------------+-------------------------------------------------+
++ |string |string |
++ +------------------------+-------------------------------------------------+
++ |symbol |Symbol |
++ +------------------------+-------------------------------------------------+
++ |binary |Binary |
++ +------------------------+-------------------------------------------------+
++ |null |nil |
++ +------------------------+-------------------------------------------------+
++ |map |Map |
++ +------------------------+-------------------------------------------------+
++ |list |List |
++ +------------------------+-------------------------------------------------+
++ |described type |Described |
++ +--------------------------------------------------------------------------+
++
++The following Go types cannot be unmarshaled: uintptr, function, interface, channel, array (use slice), struct
++
++TODO: Not yet implemented:
++
++AMQP types: decimal32/64/128, char (round trip), timestamp, uuid.
++
++AMQP maps with mixed key types, or key types that are not legal Go map keys.
+*/
+func Unmarshal(bytes []byte, v interface{}) (n int, err error) {
+ defer recoverUnmarshal(&err)
+
+ data := C.pn_data(0)
+ defer C.pn_data_free(data)
+ n, err = decode(data, bytes)
+ if err != nil {
+ return 0, err
+ }
+ if n == 0 {
+ return 0, fmt.Errorf("not enough data")
+ } else {
+ unmarshal(v, data)
+ }
+ return n, nil
+}
+
++// Internal
++func UnmarshalUnsafe(pn_data unsafe.Pointer, v interface{}) (err error) {
++ defer recoverUnmarshal(&err)
++ unmarshal(v, (*C.pn_data_t)(pn_data))
++ return
++}
++
+// more reads more data when we can't parse a complete AMQP type
+func (d *Decoder) more() error {
+ var readSize int64 = minDecode
+ if int64(d.buffer.Len()) > readSize { // Grow by doubling
+ readSize = int64(d.buffer.Len())
+ }
+ var n int64
+ n, err := d.buffer.ReadFrom(io.LimitReader(d.reader, readSize))
+ if n == 0 && err == nil { // ReadFrom won't report io.EOF, just returns 0
+ err = io.EOF
+ }
+ return err
+}
+
+// Unmarshal from data into value pointed at by v.
+func unmarshal(v interface{}, data *C.pn_data_t) {
+ pnType := C.pn_data_type(data)
++
++ // Check for PN_DESCRIBED first, as described types can unmarshal into any of the Go types.
++ // Interfaces are handled in the switch below, even for described types.
++ if _, isInterface := v.(*interface{}); !isInterface && bool(C.pn_data_is_described(data)) {
++ getDescribed(data, v)
++ return
++ }
++
++ // Unmarshal based on the target type
+ switch v := v.(type) {
+ case *bool:
+ switch pnType {
+ case C.PN_BOOL:
+ *v = bool(C.pn_data_get_bool(data))
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+ case *int8:
+ switch pnType {
+ case C.PN_CHAR:
+ *v = int8(C.pn_data_get_char(data))
+ case C.PN_BYTE:
+ *v = int8(C.pn_data_get_byte(data))
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+ case *uint8:
+ switch pnType {
+ case C.PN_CHAR:
+ *v = uint8(C.pn_data_get_char(data))
+ case C.PN_UBYTE:
+ *v = uint8(C.pn_data_get_ubyte(data))
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+ case *int16:
+ switch pnType {
+ case C.PN_CHAR:
+ *v = int16(C.pn_data_get_char(data))
+ case C.PN_BYTE:
+ *v = int16(C.pn_data_get_byte(data))
+ case C.PN_SHORT:
+ *v = int16(C.pn_data_get_short(data))
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+ case *uint16:
+ switch pnType {
+ case C.PN_CHAR:
+ *v = uint16(C.pn_data_get_char(data))
+ case C.PN_UBYTE:
+ *v = uint16(C.pn_data_get_ubyte(data))
+ case C.PN_USHORT:
+ *v = uint16(C.pn_data_get_ushort(data))
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+ case *int32:
+ switch pnType {
+ case C.PN_CHAR:
+ *v = int32(C.pn_data_get_char(data))
+ case C.PN_BYTE:
+ *v = int32(C.pn_data_get_byte(data))
+ case C.PN_SHORT:
+ *v = int32(C.pn_data_get_short(data))
+ case C.PN_INT:
+ *v = int32(C.pn_data_get_int(data))
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+ case *uint32:
+ switch pnType {
+ case C.PN_CHAR:
+ *v = uint32(C.pn_data_get_char(data))
+ case C.PN_UBYTE:
+ *v = uint32(C.pn_data_get_ubyte(data))
+ case C.PN_USHORT:
+ *v = uint32(C.pn_data_get_ushort(data))
+ case C.PN_UINT:
+ *v = uint32(C.pn_data_get_uint(data))
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+
+ case *int64:
+ switch pnType {
+ case C.PN_CHAR:
+ *v = int64(C.pn_data_get_char(data))
+ case C.PN_BYTE:
+ *v = int64(C.pn_data_get_byte(data))
+ case C.PN_SHORT:
+ *v = int64(C.pn_data_get_short(data))
+ case C.PN_INT:
+ *v = int64(C.pn_data_get_int(data))
+ case C.PN_LONG:
+ *v = int64(C.pn_data_get_long(data))
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+
+ case *uint64:
+ switch pnType {
+ case C.PN_CHAR:
+ *v = uint64(C.pn_data_get_char(data))
+ case C.PN_UBYTE:
+ *v = uint64(C.pn_data_get_ubyte(data))
+ case C.PN_USHORT:
+ *v = uint64(C.pn_data_get_ushort(data))
+ case C.PN_ULONG:
+ *v = uint64(C.pn_data_get_ulong(data))
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+
+ case *int:
+ switch pnType {
+ case C.PN_CHAR:
+ *v = int(C.pn_data_get_char(data))
+ case C.PN_BYTE:
+ *v = int(C.pn_data_get_byte(data))
+ case C.PN_SHORT:
+ *v = int(C.pn_data_get_short(data))
+ case C.PN_INT:
+ *v = int(C.pn_data_get_int(data))
+ case C.PN_LONG:
+ if unsafe.Sizeof(int(0)) == 8 {
+ *v = int(C.pn_data_get_long(data))
+ } else {
+ panic(newUnmarshalError(pnType, v))
+ }
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+
+ case *uint:
+ switch pnType {
+ case C.PN_CHAR:
+ *v = uint(C.pn_data_get_char(data))
+ case C.PN_UBYTE:
+ *v = uint(C.pn_data_get_ubyte(data))
+ case C.PN_USHORT:
+ *v = uint(C.pn_data_get_ushort(data))
+ case C.PN_UINT:
+ *v = uint(C.pn_data_get_uint(data))
+ case C.PN_ULONG:
+ if unsafe.Sizeof(int(0)) == 8 {
+ *v = uint(C.pn_data_get_ulong(data))
+ } else {
+ panic(newUnmarshalError(pnType, v))
+ }
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+
+ case *float32:
+ switch pnType {
+ case C.PN_FLOAT:
+ *v = float32(C.pn_data_get_float(data))
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+
+ case *float64:
+ switch pnType {
+ case C.PN_FLOAT:
+ *v = float64(C.pn_data_get_float(data))
+ case C.PN_DOUBLE:
+ *v = float64(C.pn_data_get_double(data))
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+
+ case *string:
+ switch pnType {
+ case C.PN_STRING:
+ *v = goString(C.pn_data_get_string(data))
+ case C.PN_SYMBOL:
+ *v = goString(C.pn_data_get_symbol(data))
+ case C.PN_BINARY:
+ *v = goString(C.pn_data_get_binary(data))
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+
+ case *[]byte:
+ switch pnType {
+ case C.PN_STRING:
+ *v = goBytes(C.pn_data_get_string(data))
+ case C.PN_SYMBOL:
+ *v = goBytes(C.pn_data_get_symbol(data))
+ case C.PN_BINARY:
+ *v = goBytes(C.pn_data_get_binary(data))
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+
+ case *Binary:
+ switch pnType {
+ case C.PN_BINARY:
+ *v = Binary(goBytes(C.pn_data_get_binary(data)))
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+
+ case *Symbol:
+ switch pnType {
+ case C.PN_SYMBOL:
+ *v = Symbol(goBytes(C.pn_data_get_symbol(data)))
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+
+ case *interface{}:
+ getInterface(data, v)
+
- default:
++ case *AnnotationKey:
++ if pnType == C.PN_ULONG || pnType == C.PN_SYMBOL || pnType == C.PN_STRING {
++ unmarshal(&v.value, data)
++ } else {
++ panic(newUnmarshalError(pnType, v))
++ }
++
++ default: // This is not one of the fixed well-known types, reflect for map and slice types
+ if reflect.TypeOf(v).Kind() != reflect.Ptr {
+ panic(newUnmarshalError(pnType, v))
+ }
+ switch reflect.TypeOf(v).Elem().Kind() {
+ case reflect.Map:
+ getMap(data, v)
+ case reflect.Slice:
+ getList(data, v)
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+ }
+ if err := newUnmarshalErrorData(data, v); err != nil {
+ panic(err)
+ }
+ return
+}
+
+func rewindUnmarshal(v interface{}, data *C.pn_data_t) {
+ C.pn_data_rewind(data)
+ C.pn_data_next(data)
+ unmarshal(v, data)
+}
+
+// Getting into an interface is driven completely by the AMQP type, since the interface{}
+// target is type-neutral.
+func getInterface(data *C.pn_data_t, v *interface{}) {
+ pnType := C.pn_data_type(data)
+ switch pnType {
+ case C.PN_BOOL:
+ *v = bool(C.pn_data_get_bool(data))
+ case C.PN_UBYTE:
+ *v = uint8(C.pn_data_get_ubyte(data))
+ case C.PN_BYTE:
+ *v = int8(C.pn_data_get_byte(data))
+ case C.PN_USHORT:
+ *v = uint16(C.pn_data_get_ushort(data))
+ case C.PN_SHORT:
+ *v = int16(C.pn_data_get_short(data))
+ case C.PN_UINT:
+ *v = uint32(C.pn_data_get_uint(data))
+ case C.PN_INT:
+ *v = int32(C.pn_data_get_int(data))
+ case C.PN_CHAR:
+ *v = uint8(C.pn_data_get_char(data))
+ case C.PN_ULONG:
+ *v = uint64(C.pn_data_get_ulong(data))
+ case C.PN_LONG:
+ *v = int64(C.pn_data_get_long(data))
+ case C.PN_FLOAT:
+ *v = float32(C.pn_data_get_float(data))
+ case C.PN_DOUBLE:
+ *v = float64(C.pn_data_get_double(data))
+ case C.PN_BINARY:
+ *v = Binary(goBytes(C.pn_data_get_binary(data)))
+ case C.PN_STRING:
+ *v = goString(C.pn_data_get_string(data))
+ case C.PN_SYMBOL:
+ *v = Symbol(goString(C.pn_data_get_symbol(data)))
+ case C.PN_MAP:
+ m := make(Map)
+ unmarshal(&m, data)
+ *v = m
+ case C.PN_LIST:
+ l := make(List, 0)
+ unmarshal(&l, data)
+ *v = l
- default: // No data (-1 or NULL)
++ case C.PN_DESCRIBED:
++ d := Described{}
++ unmarshal(&d, data)
++ *v = d
++ case C.PN_NULL:
+ *v = nil
++ case C.PN_INVALID:
++ // Allow decoding from an empty data object to an interface, treat it like NULL.
++ // This happens when optional values or properties are omitted from a message.
++ *v = nil
++ default: // Don't know how to handle this
++ panic(newUnmarshalError(pnType, v))
+ }
+}
+
+// get into map pointed at by v
+func getMap(data *C.pn_data_t, v interface{}) {
+ mapValue := reflect.ValueOf(v).Elem()
+ mapValue.Set(reflect.MakeMap(mapValue.Type())) // Clear the map
+ switch pnType := C.pn_data_type(data); pnType {
+ case C.PN_MAP:
+ count := int(C.pn_data_get_map(data))
+ if bool(C.pn_data_enter(data)) {
+ defer C.pn_data_exit(data)
+ for i := 0; i < count/2; i++ {
+ if bool(C.pn_data_next(data)) {
+ key := reflect.New(mapValue.Type().Key())
+ unmarshal(key.Interface(), data)
+ if bool(C.pn_data_next(data)) {
+ val := reflect.New(mapValue.Type().Elem())
+ unmarshal(val.Interface(), data)
+ mapValue.SetMapIndex(key.Elem(), val.Elem())
+ }
+ }
+ }
+ }
+ default: // Empty/error/unknown, leave map empty
+ }
+}
+
+func getList(data *C.pn_data_t, v interface{}) {
+ pnType := C.pn_data_type(data)
+ if pnType != C.PN_LIST {
+ panic(newUnmarshalError(pnType, v))
+ }
+ count := int(C.pn_data_get_list(data))
+ listValue := reflect.MakeSlice(reflect.TypeOf(v).Elem(), count, count)
+ if bool(C.pn_data_enter(data)) {
+ for i := 0; i < count; i++ {
+ if bool(C.pn_data_next(data)) {
+ val := reflect.New(listValue.Type().Elem())
+ unmarshal(val.Interface(), data)
+ listValue.Index(i).Set(val.Elem())
+ }
+ }
+ C.pn_data_exit(data)
+ }
+ reflect.ValueOf(v).Elem().Set(listValue)
+}
+
++func getDescribed(data *C.pn_data_t, v interface{}) {
++ d, _ := v.(*Described)
++ pnType := C.pn_data_type(data)
++ if bool(C.pn_data_enter(data)) {
++ defer C.pn_data_exit(data)
++ if bool(C.pn_data_next(data)) {
++ if d != nil {
++ unmarshal(&d.Descriptor, data)
++ }
++ if bool(C.pn_data_next(data)) {
++ if d != nil {
++ unmarshal(&d.Value, data)
++ } else {
++ unmarshal(v, data)
++ }
++ return
++ }
++ }
++ }
++ // The pn_data cursor didn't move as expected
++ panic(newUnmarshalErrorMsg(pnType, v, "bad described value encoding"))
++}
++
+// decode from bytes.
+// Return bytes decoded or 0 if we could not decode a complete object.
+//
+func decode(data *C.pn_data_t, bytes []byte) (int, error) {
+ if len(bytes) == 0 {
+ return 0, nil
+ }
+ n := int(C.pn_data_decode(data, cPtr(bytes), cLen(bytes)))
+ if n == int(C.PN_UNDERFLOW) {
+ C.pn_error_clear(C.pn_data_error(data))
+ return 0, nil
+ } else if n <= 0 {
+ return 0, fmt.Errorf("unmarshal %s", PnErrorCode(n))
+ }
+ return n, nil
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org