You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2017/01/10 15:58:52 UTC
[31/55] [partial] qpid-proton-j git commit: PROTON-1385: retain
proton-j content only, the rest remains in the other repo at:
https://git-wip-us.apache.org/repos/asf/qpid-proton.git
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/amqp/interop_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/interop_test.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/interop_test.go
deleted file mode 100644
index b3e27bc..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/amqp/interop_test.go
+++ /dev/null
@@ -1,385 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-// Test that conversion of Go type to/from AMQP is compatible with other
-// bindings.
-//
-package amqp
-
-import (
- "bytes"
- "fmt"
- "io"
- "io/ioutil"
- "os"
- "reflect"
- "strings"
- "testing"
-)
-
-func checkEqual(want interface{}, got interface{}) error {
- if !reflect.DeepEqual(want, got) {
- return fmt.Errorf("%#v != %#v", want, got)
- }
- return nil
-}
-
-func getReader(t *testing.T, name string) (r io.Reader) {
- dir := os.Getenv("PN_INTEROP_DIR")
- if dir == "" {
- t.Skip("no PN_INTEROP_DIR in environment")
- }
- r, err := os.Open(dir + "/" + name + ".amqp")
- if err != nil {
- t.Fatalf("can't open %#v: %v", name, err)
- }
- return
-}
-
-func remaining(d *Decoder) string {
- remainder, _ := ioutil.ReadAll(io.MultiReader(d.Buffered(), d.reader))
- return string(remainder)
-}
-
-// checkDecode: want is the expected value, gotPtr is a pointer to a
-// instance of the same type for Decode.
-func checkDecode(d *Decoder, want interface{}, gotPtr interface{}, t *testing.T) {
-
- if err := d.Decode(gotPtr); err != nil {
- t.Error("Decode failed", err)
- return
- }
- got := reflect.ValueOf(gotPtr).Elem().Interface()
- if err := checkEqual(want, got); err != nil {
- t.Error("Decode bad value:", err)
- return
- }
-
- // Try round trip encoding
- bytes, err := Marshal(want, nil)
- if err != nil {
- t.Error("Marshal failed", err)
- return
- }
- n, err := Unmarshal(bytes, gotPtr)
- if err != nil {
- t.Error("Unmarshal failed", err)
- return
- }
- if err := checkEqual(n, len(bytes)); err != nil {
- t.Error("Bad unmarshal length", err)
- return
- }
- got = reflect.ValueOf(gotPtr).Elem().Interface()
- if err = checkEqual(want, got); err != nil {
- t.Error("Bad unmarshal value", err)
- return
- }
-}
-
-func TestUnmarshal(t *testing.T) {
- bytes, err := ioutil.ReadAll(getReader(t, "strings"))
- if err != nil {
- t.Error(err)
- }
- for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", "", ""} {
- var got string
- n, err := Unmarshal(bytes, &got)
- if err != nil {
- t.Error(err)
- }
- if want != got {
- t.Errorf("%#v != %#v", want, got)
- }
- bytes = bytes[n:]
- }
-}
-
-func TestPrimitivesExact(t *testing.T) {
- d := NewDecoder(getReader(t, "primitives"))
- // Decoding into exact types
- var b bool
- checkDecode(d, true, &b, t)
- checkDecode(d, false, &b, t)
- var u8 uint8
- checkDecode(d, uint8(42), &u8, t)
- var u16 uint16
- checkDecode(d, uint16(42), &u16, t)
- var i16 int16
- checkDecode(d, int16(-42), &i16, t)
- var u32 uint32
- checkDecode(d, uint32(12345), &u32, t)
- var i32 int32
- checkDecode(d, int32(-12345), &i32, t)
- var u64 uint64
- checkDecode(d, uint64(12345), &u64, t)
- var i64 int64
- checkDecode(d, int64(-12345), &i64, t)
- var f32 float32
- checkDecode(d, float32(0.125), &f32, t)
- var f64 float64
- checkDecode(d, float64(0.125), &f64, t)
-}
-
-func TestPrimitivesCompatible(t *testing.T) {
- d := NewDecoder(getReader(t, "primitives"))
- // Decoding into compatible types
- var b bool
- var i int
- var u uint
- var f float64
- checkDecode(d, true, &b, t)
- checkDecode(d, false, &b, t)
- checkDecode(d, uint(42), &u, t)
- checkDecode(d, uint(42), &u, t)
- checkDecode(d, -42, &i, t)
- checkDecode(d, uint(12345), &u, t)
- checkDecode(d, -12345, &i, t)
- checkDecode(d, uint(12345), &u, t)
- checkDecode(d, -12345, &i, t)
- checkDecode(d, 0.125, &f, t)
- checkDecode(d, 0.125, &f, t)
-}
-
-// checkDecodeValue: want is the expected value, decode into a reflect.Value
-func checkDecodeInterface(d *Decoder, want interface{}, t *testing.T) {
-
- var got, got2 interface{}
- if err := d.Decode(&got); err != nil {
- t.Error("Decode failed", err)
- return
- }
- if err := checkEqual(want, got); err != nil {
- t.Error(err)
- return
- }
- // Try round trip encoding
- bytes, err := Marshal(got, nil)
- if err != nil {
- t.Error(err)
- return
- }
- n, err := Unmarshal(bytes, &got2)
- if err != nil {
- t.Error(err)
- return
- }
- if err := checkEqual(n, len(bytes)); err != nil {
- t.Error(err)
- return
- }
- if err := checkEqual(want, got2); err != nil {
- t.Error(err)
- return
- }
-}
-
-func TestPrimitivesInterface(t *testing.T) {
- d := NewDecoder(getReader(t, "primitives"))
- checkDecodeInterface(d, true, t)
- checkDecodeInterface(d, false, t)
- checkDecodeInterface(d, uint8(42), t)
- checkDecodeInterface(d, uint16(42), t)
- checkDecodeInterface(d, int16(-42), t)
- checkDecodeInterface(d, uint32(12345), t)
- checkDecodeInterface(d, int32(-12345), t)
- checkDecodeInterface(d, uint64(12345), t)
- checkDecodeInterface(d, int64(-12345), t)
- checkDecodeInterface(d, float32(0.125), t)
- checkDecodeInterface(d, float64(0.125), t)
-}
-
-func TestStrings(t *testing.T) {
- d := NewDecoder(getReader(t, "strings"))
- // Test decoding as plain Go strings
- for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", "", ""} {
- var got string
- checkDecode(d, want, &got, t)
- }
- remains := remaining(d)
- if remains != "" {
- t.Errorf("leftover: %s", remains)
- }
-
- // Test decoding as specific string types
- d = NewDecoder(getReader(t, "strings"))
- var bytes []byte
- var str, sym string
- checkDecode(d, []byte("abc\000defg"), &bytes, t)
- checkDecode(d, "abcdefg", &str, t)
- checkDecode(d, "abcdefg", &sym, t)
- checkDecode(d, make([]byte, 0), &bytes, t)
- checkDecode(d, "", &str, t)
- checkDecode(d, "", &sym, t)
- remains = remaining(d)
- if remains != "" {
- t.Fatalf("leftover: %s", remains)
- }
-
- // Test some error handling
- d = NewDecoder(getReader(t, "strings"))
- var s string
- err := d.Decode(s)
- if err == nil {
- t.Fatal("Expected error")
- }
- if !strings.Contains(err.Error(), "not a pointer") {
- t.Error(err)
- }
- var i int
- err = d.Decode(&i)
- if !strings.Contains(err.Error(), "cannot unmarshal") {
- t.Error(err)
- }
- _, err = Unmarshal([]byte{}, nil)
- if !strings.Contains(err.Error(), "not enough data") {
- t.Error(err)
- }
- _, err = Unmarshal([]byte("foobar"), nil)
- if !strings.Contains(err.Error(), "invalid-argument") {
- t.Error(err)
- }
-}
-
-func TestEncodeDecode(t *testing.T) {
- type data struct {
- s string
- i int
- u8 uint8
- b bool
- f float32
- v interface{}
- }
-
- in := data{"foo", 42, 9, true, 1.234, "thing"}
-
- buf := bytes.Buffer{}
- e := NewEncoder(&buf)
- if err := e.Encode(in.s); err != nil {
- t.Error(err)
- }
- if err := e.Encode(in.i); err != nil {
- t.Error(err)
- }
- if err := e.Encode(in.u8); err != nil {
- t.Error(err)
- }
- if err := e.Encode(in.b); err != nil {
- t.Error(err)
- }
- if err := e.Encode(in.f); err != nil {
- t.Error(err)
- }
- if err := e.Encode(in.v); err != nil {
- t.Error(err)
- }
-
- var out data
- d := NewDecoder(&buf)
- if err := d.Decode(&out.s); err != nil {
- t.Error(err)
- }
- if err := d.Decode(&out.i); err != nil {
- t.Error(err)
- }
- if err := d.Decode(&out.u8); err != nil {
- t.Error(err)
- }
- if err := d.Decode(&out.b); err != nil {
- t.Error(err)
- }
- if err := d.Decode(&out.f); err != nil {
- t.Error(err)
- }
- if err := d.Decode(&out.v); err != nil {
- t.Error(err)
- }
-
- if err := checkEqual(in, out); err != nil {
- t.Error(err)
- }
-}
-
-func TestMap(t *testing.T) {
- d := NewDecoder(getReader(t, "maps"))
-
- // Generic map
- var m Map
- checkDecode(d, Map{"one": int32(1), "two": int32(2), "three": int32(3)}, &m, t)
-
- // Interface as map
- var i interface{}
- checkDecode(d, Map{int32(1): "one", int32(2): "two", int32(3): "three"}, &i, t)
-
- d = NewDecoder(getReader(t, "maps"))
- // Specific typed map
- var m2 map[string]int
- checkDecode(d, map[string]int{"one": 1, "two": 2, "three": 3}, &m2, t)
-
- // Nested map
- m = Map{int64(1): "one", "two": int32(2), true: Map{uint8(1): true, uint8(2): false}}
- bytes, err := Marshal(m, nil)
- if err != nil {
- t.Fatal(err)
- }
- _, err = Unmarshal(bytes, &i)
- if err != nil {
- t.Fatal(err)
- }
- if err = checkEqual(m, i); err != nil {
- t.Fatal(err)
- }
-}
-
-func TestList(t *testing.T) {
- d := NewDecoder(getReader(t, "lists"))
- var l List
- checkDecode(d, List{int32(32), "foo", true}, &l, t)
- checkDecode(d, List{}, &l, t)
-}
-
-// TODO aconway 2015-09-08: the message.amqp file seems to be incorrectly coded as
-// as an AMQP string *inside* an AMQP binary?? Skip the test for now.
-func TODO_TestMessage(t *testing.T) {
- bytes, err := ioutil.ReadAll(getReader(t, "message"))
- if err != nil {
- t.Fatal(err)
- }
-
- m, err := DecodeMessage(bytes)
- if err != nil {
- t.Fatal(err)
- } else {
- if err := checkEqual(m.Body(), "hello"); err != nil {
- t.Error(err)
- }
- }
-
- m2 := NewMessageWith("hello")
- bytes2, err := m2.Encode(nil)
- if err != nil {
- t.Error(err)
- } else {
- if err = checkEqual(bytes, bytes2); err != nil {
- t.Error(err)
- }
- }
-}
-
-// TODO aconway 2015-03-13: finish the full interop test
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go
deleted file mode 100644
index bce7323..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package amqp
-
-// #include <proton/codec.h>
-import "C"
-
-import (
- "fmt"
- "io"
- "reflect"
- "unsafe"
-)
-
-func dataError(prefix string, data *C.pn_data_t) error {
- err := PnError(C.pn_data_error(data))
- if err != nil {
- err = fmt.Errorf("%s: %s", prefix, err.Error())
- }
- return err
-}
-
-/*
-Marshal encodes a Go value as AMQP data in buffer.
-If buffer is nil, or is not large enough, a new buffer is created.
-
-Returns the buffer used for encoding with len() adjusted to the actual size of data.
-
-Go types are encoded as follows
-
- +-------------------------------------+--------------------------------------------+
- |Go type |AMQP type |
- +-------------------------------------+--------------------------------------------+
- |bool |bool |
- +-------------------------------------+--------------------------------------------+
- |int8, int16, int32, int64 (int) |byte, short, int, long (int or long) |
- +-------------------------------------+--------------------------------------------+
- |uint8, uint16, uint32, uint64 (uint) |ubyte, ushort, uint, ulong (uint or ulong) |
- +-------------------------------------+--------------------------------------------+
- |float32, float64 |float, double. |
- +-------------------------------------+--------------------------------------------+
- |string |string |
- +-------------------------------------+--------------------------------------------+
- |[]byte, Binary |binary |
- +-------------------------------------+--------------------------------------------+
- |Symbol |symbol |
- +-------------------------------------+--------------------------------------------+
- |interface{} |the contained type |
- +-------------------------------------+--------------------------------------------+
- |nil |null |
- +-------------------------------------+--------------------------------------------+
- |map[K]T |map with K and T converted as above |
- +-------------------------------------+--------------------------------------------+
- |Map |map, may have mixed types for keys, values |
- +-------------------------------------+--------------------------------------------+
- |[]T |list with T converted as above |
- +-------------------------------------+--------------------------------------------+
- |List |list, may have mixed types values |
- +-------------------------------------+--------------------------------------------+
-
-The following Go types cannot be marshaled: uintptr, function, interface, channel
-
-TODO
-
-Go types: array, slice, struct, complex64/128.
-
-AMQP types: decimal32/64/128, char, timestamp, uuid, array, multi-section message bodies.
-
-Described types.
-
-*/
-func Marshal(v interface{}, buffer []byte) (outbuf []byte, err error) {
- defer doRecover(&err)
- data := C.pn_data(0)
- defer C.pn_data_free(data)
- marshal(v, data)
- encode := func(buf []byte) ([]byte, error) {
- n := int(C.pn_data_encode(data, cPtr(buf), cLen(buf)))
- switch {
- case n == int(C.PN_OVERFLOW):
- return buf, overflow
- case n < 0:
- return buf, dataError("marshal error", data)
- default:
- return buf[:n], nil
- }
- }
- return encodeGrow(buffer, encode)
-}
-
-const minEncode = 256
-
-// overflow is returned when an encoding function can't fit data in the buffer.
-var overflow = fmt.Errorf("buffer too small")
-
-// encodeFn encodes into buffer[0:len(buffer)].
-// Returns buffer with length adjusted for data encoded.
-// If buffer too small, returns overflow as error.
-type encodeFn func(buffer []byte) ([]byte, error)
-
-// encodeGrow calls encode() into buffer, if it returns overflow grows the buffer.
-// Returns the final buffer.
-func encodeGrow(buffer []byte, encode encodeFn) ([]byte, error) {
- if buffer == nil || len(buffer) == 0 {
- buffer = make([]byte, minEncode)
- }
- var err error
- for buffer, err = encode(buffer); err == overflow; buffer, err = encode(buffer) {
- buffer = make([]byte, 2*len(buffer))
- }
- return buffer, err
-}
-
-func marshal(v interface{}, data *C.pn_data_t) {
- switch v := v.(type) {
- case nil:
- C.pn_data_put_null(data)
- case bool:
- C.pn_data_put_bool(data, C.bool(v))
- case int8:
- C.pn_data_put_byte(data, C.int8_t(v))
- case int16:
- C.pn_data_put_short(data, C.int16_t(v))
- case int32:
- C.pn_data_put_int(data, C.int32_t(v))
- case int64:
- C.pn_data_put_long(data, C.int64_t(v))
- case int:
- if unsafe.Sizeof(0) == 8 {
- C.pn_data_put_long(data, C.int64_t(v))
- } else {
- C.pn_data_put_int(data, C.int32_t(v))
- }
- case uint8:
- C.pn_data_put_ubyte(data, C.uint8_t(v))
- case uint16:
- C.pn_data_put_ushort(data, C.uint16_t(v))
- case uint32:
- C.pn_data_put_uint(data, C.uint32_t(v))
- case uint64:
- C.pn_data_put_ulong(data, C.uint64_t(v))
- case uint:
- if unsafe.Sizeof(0) == 8 {
- C.pn_data_put_ulong(data, C.uint64_t(v))
- } else {
- C.pn_data_put_uint(data, C.uint32_t(v))
- }
- case float32:
- C.pn_data_put_float(data, C.float(v))
- case float64:
- C.pn_data_put_double(data, C.double(v))
- case string:
- C.pn_data_put_string(data, pnBytes([]byte(v)))
- case []byte:
- C.pn_data_put_binary(data, pnBytes(v))
- case Binary:
- C.pn_data_put_binary(data, pnBytes([]byte(v)))
- case Symbol:
- C.pn_data_put_symbol(data, pnBytes([]byte(v)))
- case Map: // Special map type
- C.pn_data_put_map(data)
- C.pn_data_enter(data)
- for key, val := range v {
- marshal(key, data)
- marshal(val, data)
- }
- C.pn_data_exit(data)
- default:
- switch reflect.TypeOf(v).Kind() {
- case reflect.Map:
- putMap(data, v)
- case reflect.Slice:
- putList(data, v)
- default:
- panic(fmt.Errorf("cannot marshal %s to AMQP", reflect.TypeOf(v)))
- }
- }
- err := dataError("marshal", data)
- if err != nil {
- panic(err)
- }
- return
-}
-
-func clearMarshal(v interface{}, data *C.pn_data_t) {
- C.pn_data_clear(data)
- marshal(v, data)
-}
-
-func putMap(data *C.pn_data_t, v interface{}) {
- mapValue := reflect.ValueOf(v)
- C.pn_data_put_map(data)
- C.pn_data_enter(data)
- for _, key := range mapValue.MapKeys() {
- marshal(key.Interface(), data)
- marshal(mapValue.MapIndex(key).Interface(), data)
- }
- C.pn_data_exit(data)
-}
-
-func putList(data *C.pn_data_t, v interface{}) {
- listValue := reflect.ValueOf(v)
- C.pn_data_put_list(data)
- C.pn_data_enter(data)
- for i := 0; i < listValue.Len(); i++ {
- marshal(listValue.Index(i).Interface(), data)
- }
- C.pn_data_exit(data)
-}
-
-// Encoder encodes AMQP values to an io.Writer
-type Encoder struct {
- writer io.Writer
- buffer []byte
-}
-
-// New encoder returns a new encoder that writes to w.
-func NewEncoder(w io.Writer) *Encoder {
- return &Encoder{w, make([]byte, minEncode)}
-}
-
-func (e *Encoder) Encode(v interface{}) (err error) {
- e.buffer, err = Marshal(v, e.buffer)
- if err == nil {
- _, err = e.writer.Write(e.buffer)
- }
- return err
-}
-
-func replace(data *C.pn_data_t, v interface{}) {
- C.pn_data_clear(data)
- marshal(v, data)
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/amqp/message.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/message.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/message.go
deleted file mode 100644
index 753682e..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/amqp/message.go
+++ /dev/null
@@ -1,348 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package amqp
-
-// #include <proton/codec.h>
-// #include <proton/types.h>
-// #include <proton/message.h>
-// #include <stdlib.h>
-//
-// /* Helper for setting message string fields */
-// typedef int (*set_fn)(pn_message_t*, const char*);
-// int msg_set_str(pn_message_t* m, char* s, set_fn set) {
-// int result = set(m, s);
-// free(s);
-// return result;
-// }
-//
-import "C"
-
-import (
- "fmt"
- "runtime"
- "time"
- "unsafe"
-)
-
-// Message is the interface to an AMQP message.
-type Message interface {
- // Durable indicates that any parties taking responsibility
- // for the message must durably store the content.
- Durable() bool
- SetDurable(bool)
-
- // Priority impacts ordering guarantees. Within a
- // given ordered context, higher priority messages may jump ahead of
- // lower priority messages.
- Priority() uint8
- SetPriority(uint8)
-
- // TTL or Time To Live, a message it may be dropped after this duration
- TTL() time.Duration
- SetTTL(time.Duration)
-
- // FirstAcquirer indicates
- // that the recipient of the message is the first recipient to acquire
- // the message, i.e. there have been no failed delivery attempts to
- // other acquirers. Note that this does not mean the message has not
- // been delivered to, but not acquired, by other recipients.
- FirstAcquirer() bool
- SetFirstAcquirer(bool)
-
- // DeliveryCount tracks how many attempts have been made to
- // delivery a message.
- DeliveryCount() uint32
- SetDeliveryCount(uint32)
-
- // MessageId provides a unique identifier for a message.
- // it can be an a string, an unsigned long, a uuid or a
- // binary value.
- MessageId() interface{}
- SetMessageId(interface{})
-
- UserId() string
- SetUserId(string)
-
- Address() string
- SetAddress(string)
-
- Subject() string
- SetSubject(string)
-
- ReplyTo() string
- SetReplyTo(string)
-
- // CorrelationId is set on correlated request and response messages. It can be
- // an a string, an unsigned long, a uuid or a binary value.
- CorrelationId() interface{}
- SetCorrelationId(interface{})
-
- ContentType() string
- SetContentType(string)
-
- ContentEncoding() string
- SetContentEncoding(string)
-
- // ExpiryTime indicates an absoulte time when the message may be dropped.
- // A Zero time (i.e. t.isZero() == true) indicates a message never expires.
- ExpiryTime() time.Time
- SetExpiryTime(time.Time)
-
- CreationTime() time.Time
- SetCreationTime(time.Time)
-
- GroupId() string
- SetGroupId(string)
-
- GroupSequence() int32
- SetGroupSequence(int32)
-
- ReplyToGroupId() string
- SetReplyToGroupId(string)
-
- // Instructions - AMQP delivery instructions.
- Instructions() map[string]interface{}
- SetInstructions(v map[string]interface{})
-
- // Annotations - AMQP annotations.
- Annotations() map[string]interface{}
- SetAnnotations(v map[string]interface{})
-
- // Properties - Application properties.
- Properties() map[string]interface{}
- SetProperties(v map[string]interface{})
-
- // Inferred indicates how the message content
- // is encoded into AMQP sections. If inferred is true then binary and
- // list values in the body of the message will be encoded as AMQP DATA
- // and AMQP SEQUENCE sections, respectively. If inferred is false,
- // then all values in the body of the message will be encoded as AMQP
- // VALUE sections regardless of their type.
- Inferred() bool
- SetInferred(bool)
-
- // Marshal a Go value into the message body. See amqp.Marshal() for details.
- Marshal(interface{})
-
- // Unmarshal the message body into the value pointed to by v. See amqp.Unmarshal() for details.
- Unmarshal(interface{})
-
- // Body value resulting from the default unmarshalling of message body as interface{}
- Body() interface{}
-
- // Encode encodes the message as AMQP data. If buffer is non-nil and is large enough
- // the message is encoded into it, otherwise a new buffer is created.
- // Returns the buffer containing the message.
- Encode(buffer []byte) ([]byte, error)
-
- // Decode data into this message. Overwrites an existing message content.
- Decode(buffer []byte) error
-
- // Clear the message contents.
- Clear()
-
- // Copy the contents of another message to this one.
- Copy(m Message) error
-}
-
-type message struct{ pn *C.pn_message_t }
-
-func freeMessage(m *message) {
- C.pn_message_free(m.pn)
- m.pn = nil
-}
-
-// NewMessage creates a new message instance.
-func NewMessage() Message {
- m := &message{C.pn_message()}
- runtime.SetFinalizer(m, freeMessage)
- return m
-}
-
-// NewMessageWith creates a message with value as the body. Equivalent to
-// m := NewMessage(); m.Marshal(body)
-func NewMessageWith(value interface{}) Message {
- m := NewMessage()
- m.Marshal(value)
- return m
-}
-
-func (m *message) Clear() { C.pn_message_clear(m.pn) }
-
-func (m *message) Copy(x Message) error {
- if data, err := x.Encode(nil); err == nil {
- return m.Decode(data)
- } else {
- return err
- }
-}
-
-// ==== message get functions
-
-func rewindGet(data *C.pn_data_t) (v interface{}) {
- C.pn_data_rewind(data)
- C.pn_data_next(data)
- unmarshal(&v, data)
- return v
-}
-
-func rewindMap(data *C.pn_data_t) (v map[string]interface{}) {
- C.pn_data_rewind(data)
- C.pn_data_next(data)
- unmarshal(&v, data)
- return v
-}
-
-func (m *message) Inferred() bool { return bool(C.pn_message_is_inferred(m.pn)) }
-func (m *message) Durable() bool { return bool(C.pn_message_is_durable(m.pn)) }
-func (m *message) Priority() uint8 { return uint8(C.pn_message_get_priority(m.pn)) }
-func (m *message) TTL() time.Duration {
- return time.Duration(C.pn_message_get_ttl(m.pn)) * time.Millisecond
-}
-func (m *message) FirstAcquirer() bool { return bool(C.pn_message_is_first_acquirer(m.pn)) }
-func (m *message) DeliveryCount() uint32 { return uint32(C.pn_message_get_delivery_count(m.pn)) }
-func (m *message) MessageId() interface{} { return rewindGet(C.pn_message_id(m.pn)) }
-func (m *message) UserId() string { return goString(C.pn_message_get_user_id(m.pn)) }
-func (m *message) Address() string { return C.GoString(C.pn_message_get_address(m.pn)) }
-func (m *message) Subject() string { return C.GoString(C.pn_message_get_subject(m.pn)) }
-func (m *message) ReplyTo() string { return C.GoString(C.pn_message_get_reply_to(m.pn)) }
-func (m *message) CorrelationId() interface{} { return rewindGet(C.pn_message_correlation_id(m.pn)) }
-func (m *message) ContentType() string { return C.GoString(C.pn_message_get_content_type(m.pn)) }
-func (m *message) ContentEncoding() string { return C.GoString(C.pn_message_get_content_encoding(m.pn)) }
-
-func (m *message) ExpiryTime() time.Time {
- return time.Unix(0, int64(time.Millisecond*time.Duration(C.pn_message_get_expiry_time(m.pn))))
-}
-func (m *message) CreationTime() time.Time {
- return time.Unix(0, int64(time.Millisecond)*int64(C.pn_message_get_creation_time(m.pn)))
-}
-func (m *message) GroupId() string { return C.GoString(C.pn_message_get_group_id(m.pn)) }
-func (m *message) GroupSequence() int32 { return int32(C.pn_message_get_group_sequence(m.pn)) }
-func (m *message) ReplyToGroupId() string { return C.GoString(C.pn_message_get_reply_to_group_id(m.pn)) }
-
-func (m *message) Instructions() map[string]interface{} {
- return rewindMap(C.pn_message_instructions(m.pn))
-}
-func (m *message) Annotations() map[string]interface{} {
- return rewindMap(C.pn_message_annotations(m.pn))
-}
-func (m *message) Properties() map[string]interface{} {
- return rewindMap(C.pn_message_properties(m.pn))
-}
-
-// ==== message set methods
-
-func setData(v interface{}, data *C.pn_data_t) {
- C.pn_data_clear(data)
- marshal(v, data)
-}
-
-func dataString(data *C.pn_data_t) string {
- str := C.pn_string(C.CString(""))
- defer C.pn_free(unsafe.Pointer(str))
- C.pn_inspect(unsafe.Pointer(data), str)
- return C.GoString(C.pn_string_get(str))
-}
-
-func (m *message) SetInferred(b bool) { C.pn_message_set_inferred(m.pn, C.bool(b)) }
-func (m *message) SetDurable(b bool) { C.pn_message_set_durable(m.pn, C.bool(b)) }
-func (m *message) SetPriority(b uint8) { C.pn_message_set_priority(m.pn, C.uint8_t(b)) }
-func (m *message) SetTTL(d time.Duration) {
- C.pn_message_set_ttl(m.pn, C.pn_millis_t(d/time.Millisecond))
-}
-func (m *message) SetFirstAcquirer(b bool) { C.pn_message_set_first_acquirer(m.pn, C.bool(b)) }
-func (m *message) SetDeliveryCount(c uint32) { C.pn_message_set_delivery_count(m.pn, C.uint32_t(c)) }
-func (m *message) SetMessageId(id interface{}) { setData(id, C.pn_message_id(m.pn)) }
-func (m *message) SetUserId(s string) { C.pn_message_set_user_id(m.pn, pnBytes(([]byte)(s))) }
-func (m *message) SetAddress(s string) {
- C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_address))
-}
-func (m *message) SetSubject(s string) {
- C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_subject))
-}
-func (m *message) SetReplyTo(s string) {
- C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to))
-}
-func (m *message) SetCorrelationId(c interface{}) { setData(c, C.pn_message_correlation_id(m.pn)) }
-func (m *message) SetContentType(s string) {
- C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_type))
-}
-func (m *message) SetContentEncoding(s string) {
- C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_encoding))
-}
-func (m *message) SetExpiryTime(t time.Time) { C.pn_message_set_expiry_time(m.pn, pnTime(t)) }
-func (m *message) SetCreationTime(t time.Time) { C.pn_message_set_creation_time(m.pn, pnTime(t)) }
-func (m *message) SetGroupId(s string) {
- C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_group_id))
-}
-func (m *message) SetGroupSequence(s int32) {
- C.pn_message_set_group_sequence(m.pn, C.pn_sequence_t(s))
-}
-func (m *message) SetReplyToGroupId(s string) {
- C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to_group_id))
-}
-
-func (m *message) SetInstructions(v map[string]interface{}) {
- setData(v, C.pn_message_instructions(m.pn))
-}
-func (m *message) SetAnnotations(v map[string]interface{}) { setData(v, C.pn_message_annotations(m.pn)) }
-func (m *message) SetProperties(v map[string]interface{}) { setData(v, C.pn_message_properties(m.pn)) }
-
-// Marshal/Unmarshal body
-func (m *message) Marshal(v interface{}) { clearMarshal(v, C.pn_message_body(m.pn)) }
-func (m *message) Unmarshal(v interface{}) { rewindUnmarshal(v, C.pn_message_body(m.pn)) }
-func (m *message) Body() (v interface{}) { m.Unmarshal(&v); return }
-
-func (m *message) Decode(data []byte) error {
- m.Clear()
- if len(data) == 0 {
- return fmt.Errorf("empty buffer for decode")
- }
- if C.pn_message_decode(m.pn, cPtr(data), cLen(data)) < 0 {
- return fmt.Errorf("decoding message: %s", PnError(C.pn_message_error(m.pn)))
- }
- return nil
-}
-
-func DecodeMessage(data []byte) (m Message, err error) {
- m = NewMessage()
- err = m.Decode(data)
- return
-}
-
-func (m *message) Encode(buffer []byte) ([]byte, error) {
- encode := func(buf []byte) ([]byte, error) {
- len := cLen(buf)
- result := C.pn_message_encode(m.pn, cPtr(buf), &len)
- switch {
- case result == C.PN_OVERFLOW:
- return buf, overflow
- case result < 0:
- return buf, fmt.Errorf("cannot encode message: %s", PnErrorCode(result))
- default:
- return buf[:len], nil
- }
- }
- return encodeGrow(buffer, encode)
-}
-
-// TODO aconway 2015-09-14: Multi-section messages.
-
-// TODO aconway 2016-09-09: Message.String() use inspect.
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/amqp/message_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/message_test.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/message_test.go
deleted file mode 100644
index 7a6e5a8..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/amqp/message_test.go
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package amqp
-
-import (
- "testing"
- "time"
-)
-
-func roundTrip(m Message) error {
- buffer, err := m.Encode(nil)
- if err != nil {
- return err
- }
- m2, err := DecodeMessage(buffer)
- if err != nil {
- return err
- }
- return checkEqual(m, m2)
-}
-
-func TestDefaultMessage(t *testing.T) {
- m := NewMessage()
- // Check defaults
- for _, data := range [][]interface{}{
- {m.Inferred(), false},
- {m.Durable(), false},
- {m.Priority(), uint8(4)},
- {m.TTL(), time.Duration(0)},
- {m.UserId(), ""},
- {m.Address(), ""},
- {m.Subject(), ""},
- {m.ReplyTo(), ""},
- {m.ContentType(), ""},
- {m.ContentEncoding(), ""},
- {m.GroupId(), ""},
- {m.GroupSequence(), int32(0)},
- {m.ReplyToGroupId(), ""},
- {m.MessageId(), nil},
- {m.CorrelationId(), nil},
- {m.Instructions(), map[string]interface{}{}},
- {m.Annotations(), map[string]interface{}{}},
- {m.Properties(), map[string]interface{}{}},
- {m.Body(), nil},
- } {
- if err := checkEqual(data[0], data[1]); err != nil {
- t.Error(err)
- }
- }
- if err := roundTrip(m); err != nil {
- t.Error(err)
- }
-}
-
-func TestMessageRoundTrip(t *testing.T) {
- m := NewMessage()
- m.SetInferred(false)
- m.SetDurable(true)
- m.SetPriority(42)
- m.SetTTL(0)
- m.SetUserId("user")
- m.SetAddress("address")
- m.SetSubject("subject")
- m.SetReplyTo("replyto")
- m.SetContentType("content")
- m.SetContentEncoding("encoding")
- m.SetGroupId("group")
- m.SetGroupSequence(42)
- m.SetReplyToGroupId("replytogroup")
- m.SetMessageId("id")
- m.SetCorrelationId("correlation")
- m.SetInstructions(map[string]interface{}{"instructions": "foo"})
- m.SetAnnotations(map[string]interface{}{"annotations": "foo"})
- m.SetProperties(map[string]interface{}{"int": int32(32), "bool": true, "string": "foo"})
- m.Marshal("hello")
-
- for _, data := range [][]interface{}{
- {m.Inferred(), false},
- {m.Durable(), true},
- {m.Priority(), uint8(42)},
- {m.TTL(), time.Duration(0)},
- {m.UserId(), "user"},
- {m.Address(), "address"},
- {m.Subject(), "subject"},
- {m.ReplyTo(), "replyto"},
- {m.ContentType(), "content"},
- {m.ContentEncoding(), "encoding"},
- {m.GroupId(), "group"},
- {m.GroupSequence(), int32(42)},
- {m.ReplyToGroupId(), "replytogroup"},
- {m.MessageId(), "id"},
- {m.CorrelationId(), "correlation"},
- {m.Instructions(), map[string]interface{}{"instructions": "foo"}},
- {m.Annotations(), map[string]interface{}{"annotations": "foo"}},
- {m.Properties(), map[string]interface{}{"int": int32(32), "bool": true, "string": "foo"}},
- {m.Body(), "hello"},
- } {
- if err := checkEqual(data[0], data[1]); err != nil {
- t.Error(err)
- }
- }
- if err := roundTrip(m); err != nil {
- t.Error(err)
- }
-}
-
-func TestMessageBodyTypes(t *testing.T) {
- var s string
- var body interface{}
- var i int64
-
- m := NewMessageWith(int64(42))
- m.Unmarshal(&body)
- m.Unmarshal(&i)
- if err := checkEqual(body.(int64), int64(42)); err != nil {
- t.Error(err)
- }
- if err := checkEqual(i, int64(42)); err != nil {
- t.Error(err)
- }
-
- m = NewMessageWith("hello")
- m.Unmarshal(&s)
- m.Unmarshal(&body)
- if err := checkEqual(s, "hello"); err != nil {
- t.Error(err)
- }
- if err := checkEqual(body.(string), "hello"); err != nil {
- t.Error(err)
- }
- if err := roundTrip(m); err != nil {
- t.Error(err)
- }
-
- m = NewMessageWith(Binary("bin"))
- m.Unmarshal(&s)
- m.Unmarshal(&body)
- if err := checkEqual(body.(Binary), Binary("bin")); err != nil {
- t.Error(err)
- }
- if err := checkEqual(s, "bin"); err != nil {
- t.Error(err)
- }
- if err := roundTrip(m); err != nil {
- t.Error(err)
- }
-
- // TODO aconway 2015-09-08: array etc.
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/amqp/types.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/types.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/types.go
deleted file mode 100644
index 2852c23..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/amqp/types.go
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package amqp
-
-// #include <proton/codec.h>
-import "C"
-
-import (
- "bytes"
- "fmt"
- "reflect"
- "time"
- "unsafe"
-)
-
-func (t C.pn_type_t) String() string {
- switch C.pn_type_t(t) {
- case C.PN_NULL:
- return "null"
- case C.PN_BOOL:
- return "bool"
- case C.PN_UBYTE:
- return "ubyte"
- case C.PN_BYTE:
- return "byte"
- case C.PN_USHORT:
- return "ushort"
- case C.PN_SHORT:
- return "short"
- case C.PN_CHAR:
- return "char"
- case C.PN_UINT:
- return "uint"
- case C.PN_INT:
- return "int"
- case C.PN_ULONG:
- return "ulong"
- case C.PN_LONG:
- return "long"
- case C.PN_TIMESTAMP:
- return "timestamp"
- case C.PN_FLOAT:
- return "float"
- case C.PN_DOUBLE:
- return "double"
- case C.PN_DECIMAL32:
- return "decimal32"
- case C.PN_DECIMAL64:
- return "decimal64"
- case C.PN_DECIMAL128:
- return "decimal128"
- case C.PN_UUID:
- return "uuid"
- case C.PN_BINARY:
- return "binary"
- case C.PN_STRING:
- return "string"
- case C.PN_SYMBOL:
- return "symbol"
- case C.PN_DESCRIBED:
- return "described"
- case C.PN_ARRAY:
- return "array"
- case C.PN_LIST:
- return "list"
- case C.PN_MAP:
- return "map"
- default:
- return "no-data"
- }
-}
-
-// Go types
-var (
- bytesType = reflect.TypeOf([]byte{})
- valueType = reflect.TypeOf(reflect.Value{})
-)
-
-// TODO aconway 2015-04-08: can't handle AMQP maps with key types that are not valid Go map keys.
-
-// Map is a generic map that can have mixed key and value types and so can represent any AMQP map
-type Map map[interface{}]interface{}
-
-// List is a generic list that can hold mixed values and can represent any AMQP list.
-//
-type List []interface{}
-
-// Symbol is a string that is encoded as an AMQP symbol
-type Symbol string
-
-func (s Symbol) GoString() string { return fmt.Sprintf("s\"%s\"", s) }
-
-// Binary is a string that is encoded as an AMQP binary.
-// It is a string rather than a byte[] because byte[] is not hashable and can't be used as
-// a map key, AMQP frequently uses binary types as map keys. It can convert to and from []byte
-type Binary string
-
-func (b Binary) GoString() string { return fmt.Sprintf("b\"%s\"", b) }
-
-// GoString for Map prints values with their types, useful for debugging.
-func (m Map) GoString() string {
- out := &bytes.Buffer{}
- fmt.Fprintf(out, "%T{", m)
- i := len(m)
- for k, v := range m {
- fmt.Fprintf(out, "%T(%#v): %T(%#v)", k, k, v, v)
- i--
- if i > 0 {
- fmt.Fprint(out, ", ")
- }
- }
- fmt.Fprint(out, "}")
- return out.String()
-}
-
-// GoString for List prints values with their types, useful for debugging.
-func (l List) GoString() string {
- out := &bytes.Buffer{}
- fmt.Fprintf(out, "%T{", l)
- for i := 0; i < len(l); i++ {
- fmt.Fprintf(out, "%T(%#v)", l[i], l[i])
- if i == len(l)-1 {
- fmt.Fprint(out, ", ")
- }
- }
- fmt.Fprint(out, "}")
- return out.String()
-}
-
-// pnTime converts Go time.Time to Proton millisecond Unix time.
-func pnTime(t time.Time) C.pn_timestamp_t {
- secs := t.Unix()
- // Note: sub-second accuracy is not guaraunteed if the Unix time in
- // nanoseconds cannot be represented by an int64 (sometime around year 2260)
- msecs := (t.UnixNano() % int64(time.Second)) / int64(time.Millisecond)
- return C.pn_timestamp_t(secs*1000 + msecs)
-}
-
-// goTime converts a pn_timestamp_t to a Go time.Time.
-func goTime(t C.pn_timestamp_t) time.Time {
- secs := int64(t) / 1000
- nsecs := (int64(t) % 1000) * int64(time.Millisecond)
- return time.Unix(secs, nsecs)
-}
-
-func goBytes(cBytes C.pn_bytes_t) (bytes []byte) {
- if cBytes.start != nil {
- bytes = C.GoBytes(unsafe.Pointer(cBytes.start), C.int(cBytes.size))
- }
- return
-}
-
-func goString(cBytes C.pn_bytes_t) (str string) {
- if cBytes.start != nil {
- str = C.GoStringN(cBytes.start, C.int(cBytes.size))
- }
- return
-}
-
-func pnBytes(b []byte) C.pn_bytes_t {
- if len(b) == 0 {
- return C.pn_bytes_t{0, nil}
- } else {
- return C.pn_bytes_t{C.size_t(len(b)), (*C.char)(unsafe.Pointer(&b[0]))}
- }
-}
-
-func cPtr(b []byte) *C.char {
- if len(b) == 0 {
- return nil
- }
- return (*C.char)(unsafe.Pointer(&b[0]))
-}
-
-func cLen(b []byte) C.size_t {
- return C.size_t(len(b))
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/amqp/unmarshal.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/unmarshal.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/unmarshal.go
deleted file mode 100644
index 8f380a7..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/amqp/unmarshal.go
+++ /dev/null
@@ -1,557 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-oor more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package amqp
-
-// #include <proton/codec.h>
-import "C"
-
-import (
- "bytes"
- "fmt"
- "io"
- "reflect"
- "unsafe"
-)
-
-const minDecode = 1024
-
-// Error returned if AMQP data cannot be unmarshaled as the desired Go type.
-type UnmarshalError struct {
- // The name of the AMQP type.
- AMQPType string
- // The Go type.
- GoType reflect.Type
-}
-
-func newUnmarshalError(pnType C.pn_type_t, v interface{}) *UnmarshalError {
- return &UnmarshalError{C.pn_type_t(pnType).String(), reflect.TypeOf(v)}
-}
-
-func (e UnmarshalError) Error() string {
- if e.GoType.Kind() != reflect.Ptr {
- return fmt.Sprintf("cannot unmarshal to type %s, not a pointer", e.GoType)
- } else {
- return fmt.Sprintf("cannot unmarshal AMQP %s to %s", e.AMQPType, e.GoType)
- }
-}
-
-func doRecover(err *error) {
- r := recover()
- switch r := r.(type) {
- case nil:
- case *UnmarshalError:
- *err = r
- default:
- panic(r)
- }
-}
-
-//
-// Decoding from a pn_data_t
-//
-// NOTE: we use panic() to signal a decoding error, simplifies decoding logic.
-// We recover() at the highest possible level - i.e. in the exported Unmarshal or Decode.
-//
-
-// Decoder decodes AMQP values from an io.Reader.
-//
-type Decoder struct {
- reader io.Reader
- buffer bytes.Buffer
-}
-
-// NewDecoder returns a new decoder that reads from r.
-//
-// The decoder has it's own buffer and may read more data than required for the
-// AMQP values requested. Use Buffered to see if there is data left in the
-// buffer.
-//
-func NewDecoder(r io.Reader) *Decoder {
- return &Decoder{r, bytes.Buffer{}}
-}
-
-// Buffered returns a reader of the data remaining in the Decoder's buffer. The
-// reader is valid until the next call to Decode.
-//
-func (d *Decoder) Buffered() io.Reader {
- return bytes.NewReader(d.buffer.Bytes())
-}
-
-// Decode reads the next AMQP value from the Reader and stores it in the value pointed to by v.
-//
-// See the documentation for Unmarshal for details about the conversion of AMQP into a Go value.
-//
-func (d *Decoder) Decode(v interface{}) (err error) {
- defer doRecover(&err)
- data := C.pn_data(0)
- defer C.pn_data_free(data)
- var n int
- for n == 0 {
- n, err = decode(data, d.buffer.Bytes())
- if err != nil {
- return err
- }
- if n == 0 { // n == 0 means not enough data, read more
- err = d.more()
- } else {
- unmarshal(v, data)
- }
- }
- d.buffer.Next(n)
- return
-}
-
-/*
-Unmarshal decodes AMQP-encoded bytes and stores the result in the value pointed to by v.
-Types are converted as follows:
-
- +---------------------------+----------------------------------------------------------------------+
- |To Go types |From AMQP types |
- +===========================+======================================================================+
- |bool |bool |
- +---------------------------+----------------------------------------------------------------------+
- |int, int8, int16, |Equivalent or smaller signed integer type: byte, short, int, long. |
- |int32, int64 | |
- +---------------------------+----------------------------------------------------------------------+
- |uint, uint8, uint16, |Equivalent or smaller unsigned integer type: ubyte, ushort, uint, |
- |uint32, uint64 types |ulong |
- +---------------------------+----------------------------------------------------------------------+
- |float32, float64 |Equivalent or smaller float or double. |
- +---------------------------+----------------------------------------------------------------------+
- |string, []byte |string, symbol or binary. |
- +---------------------------+----------------------------------------------------------------------+
- |Symbol |symbol |
- +---------------------------+----------------------------------------------------------------------+
- |map[K]T |map, provided all keys and values can unmarshal to types K, T |
- +---------------------------+----------------------------------------------------------------------+
- |Map |map, any AMQP map |
- +---------------------------+----------------------------------------------------------------------+
- |interface{} |Any AMQP value can be unmarshaled to an interface{} as follows: |
- | +------------------------+---------------------------------------------+
- | |AMQP Type |Go Type in interface{} |
- | +========================+=============================================+
- | |bool |bool |
- | +------------------------+---------------------------------------------+
- | |byte,short,int,long |int8,int16,int32,int64 |
- | +------------------------+---------------------------------------------+
- | |ubyte,ushort,uint,ulong |uint8,uint16,uint32,uint64 |
- | +------------------------+---------------------------------------------+
- | |float, double |float32, float64 |
- | +------------------------+---------------------------------------------+
- | |string |string |
- | +------------------------+---------------------------------------------+
- | |symbol |Symbol |
- | +------------------------+---------------------------------------------+
- | |binary |Binary |
- | +------------------------+---------------------------------------------+
- | |nulll |nil |
- | +------------------------+---------------------------------------------+
- | |map |Map |
- | +------------------------+---------------------------------------------+
- | |list |List |
- +---------------------------+------------------------+---------------------------------------------+
-
-The following Go types cannot be unmarshaled: uintptr, function, interface, channel.
-
-TODO
-
-Go types: array, struct.
-
-AMQP types: decimal32/64/128, char (round trip), timestamp, uuid, array, multi-section message bodies.
-
-AMQP maps with mixed/unhashable key types need an alternate representation.
-
-Described types.
-*/
-func Unmarshal(bytes []byte, v interface{}) (n int, err error) {
- defer doRecover(&err)
-
- data := C.pn_data(0)
- defer C.pn_data_free(data)
- n, err = decode(data, bytes)
- if err != nil {
- return 0, err
- }
- if n == 0 {
- return 0, fmt.Errorf("not enough data")
- } else {
- unmarshal(v, data)
- }
- return n, nil
-}
-
-// more reads more data when we can't parse a complete AMQP type
-func (d *Decoder) more() error {
- var readSize int64 = minDecode
- if int64(d.buffer.Len()) > readSize { // Grow by doubling
- readSize = int64(d.buffer.Len())
- }
- var n int64
- n, err := d.buffer.ReadFrom(io.LimitReader(d.reader, readSize))
- if n == 0 && err == nil { // ReadFrom won't report io.EOF, just returns 0
- err = io.EOF
- }
- return err
-}
-
-// Unmarshal from data into value pointed at by v.
-func unmarshal(v interface{}, data *C.pn_data_t) {
- pnType := C.pn_data_type(data)
- switch v := v.(type) {
- case *bool:
- switch pnType {
- case C.PN_BOOL:
- *v = bool(C.pn_data_get_bool(data))
- default:
- panic(newUnmarshalError(pnType, v))
- }
- case *int8:
- switch pnType {
- case C.PN_CHAR:
- *v = int8(C.pn_data_get_char(data))
- case C.PN_BYTE:
- *v = int8(C.pn_data_get_byte(data))
- default:
- panic(newUnmarshalError(pnType, v))
- }
- case *uint8:
- switch pnType {
- case C.PN_CHAR:
- *v = uint8(C.pn_data_get_char(data))
- case C.PN_UBYTE:
- *v = uint8(C.pn_data_get_ubyte(data))
- default:
- panic(newUnmarshalError(pnType, v))
- }
- case *int16:
- switch pnType {
- case C.PN_CHAR:
- *v = int16(C.pn_data_get_char(data))
- case C.PN_BYTE:
- *v = int16(C.pn_data_get_byte(data))
- case C.PN_SHORT:
- *v = int16(C.pn_data_get_short(data))
- default:
- panic(newUnmarshalError(pnType, v))
- }
- case *uint16:
- switch pnType {
- case C.PN_CHAR:
- *v = uint16(C.pn_data_get_char(data))
- case C.PN_UBYTE:
- *v = uint16(C.pn_data_get_ubyte(data))
- case C.PN_USHORT:
- *v = uint16(C.pn_data_get_ushort(data))
- default:
- panic(newUnmarshalError(pnType, v))
- }
- case *int32:
- switch pnType {
- case C.PN_CHAR:
- *v = int32(C.pn_data_get_char(data))
- case C.PN_BYTE:
- *v = int32(C.pn_data_get_byte(data))
- case C.PN_SHORT:
- *v = int32(C.pn_data_get_short(data))
- case C.PN_INT:
- *v = int32(C.pn_data_get_int(data))
- default:
- panic(newUnmarshalError(pnType, v))
- }
- case *uint32:
- switch pnType {
- case C.PN_CHAR:
- *v = uint32(C.pn_data_get_char(data))
- case C.PN_UBYTE:
- *v = uint32(C.pn_data_get_ubyte(data))
- case C.PN_USHORT:
- *v = uint32(C.pn_data_get_ushort(data))
- case C.PN_UINT:
- *v = uint32(C.pn_data_get_uint(data))
- default:
- panic(newUnmarshalError(pnType, v))
- }
-
- case *int64:
- switch pnType {
- case C.PN_CHAR:
- *v = int64(C.pn_data_get_char(data))
- case C.PN_BYTE:
- *v = int64(C.pn_data_get_byte(data))
- case C.PN_SHORT:
- *v = int64(C.pn_data_get_short(data))
- case C.PN_INT:
- *v = int64(C.pn_data_get_int(data))
- case C.PN_LONG:
- *v = int64(C.pn_data_get_long(data))
- default:
- panic(newUnmarshalError(pnType, v))
- }
-
- case *uint64:
- switch pnType {
- case C.PN_CHAR:
- *v = uint64(C.pn_data_get_char(data))
- case C.PN_UBYTE:
- *v = uint64(C.pn_data_get_ubyte(data))
- case C.PN_USHORT:
- *v = uint64(C.pn_data_get_ushort(data))
- case C.PN_ULONG:
- *v = uint64(C.pn_data_get_ulong(data))
- default:
- panic(newUnmarshalError(pnType, v))
- }
-
- case *int:
- switch pnType {
- case C.PN_CHAR:
- *v = int(C.pn_data_get_char(data))
- case C.PN_BYTE:
- *v = int(C.pn_data_get_byte(data))
- case C.PN_SHORT:
- *v = int(C.pn_data_get_short(data))
- case C.PN_INT:
- *v = int(C.pn_data_get_int(data))
- case C.PN_LONG:
- if unsafe.Sizeof(0) == 8 {
- *v = int(C.pn_data_get_long(data))
- } else {
- panic(newUnmarshalError(pnType, v))
- }
- default:
- panic(newUnmarshalError(pnType, v))
- }
-
- case *uint:
- switch pnType {
- case C.PN_CHAR:
- *v = uint(C.pn_data_get_char(data))
- case C.PN_UBYTE:
- *v = uint(C.pn_data_get_ubyte(data))
- case C.PN_USHORT:
- *v = uint(C.pn_data_get_ushort(data))
- case C.PN_UINT:
- *v = uint(C.pn_data_get_uint(data))
- case C.PN_ULONG:
- if unsafe.Sizeof(0) == 8 {
- *v = uint(C.pn_data_get_ulong(data))
- } else {
- panic(newUnmarshalError(pnType, v))
- }
- default:
- panic(newUnmarshalError(pnType, v))
- }
-
- case *float32:
- switch pnType {
- case C.PN_FLOAT:
- *v = float32(C.pn_data_get_float(data))
- default:
- panic(newUnmarshalError(pnType, v))
- }
-
- case *float64:
- switch pnType {
- case C.PN_FLOAT:
- *v = float64(C.pn_data_get_float(data))
- case C.PN_DOUBLE:
- *v = float64(C.pn_data_get_double(data))
- default:
- panic(newUnmarshalError(pnType, v))
- }
-
- case *string:
- switch pnType {
- case C.PN_STRING:
- *v = goString(C.pn_data_get_string(data))
- case C.PN_SYMBOL:
- *v = goString(C.pn_data_get_symbol(data))
- case C.PN_BINARY:
- *v = goString(C.pn_data_get_binary(data))
- default:
- panic(newUnmarshalError(pnType, v))
- }
-
- case *[]byte:
- switch pnType {
- case C.PN_STRING:
- *v = goBytes(C.pn_data_get_string(data))
- case C.PN_SYMBOL:
- *v = goBytes(C.pn_data_get_symbol(data))
- case C.PN_BINARY:
- *v = goBytes(C.pn_data_get_binary(data))
- default:
- panic(newUnmarshalError(pnType, v))
- }
-
- case *Binary:
- switch pnType {
- case C.PN_BINARY:
- *v = Binary(goBytes(C.pn_data_get_binary(data)))
- default:
- panic(newUnmarshalError(pnType, v))
- }
-
- case *Symbol:
- switch pnType {
- case C.PN_SYMBOL:
- *v = Symbol(goBytes(C.pn_data_get_symbol(data)))
- default:
- panic(newUnmarshalError(pnType, v))
- }
-
- case *interface{}:
- getInterface(data, v)
-
- default:
- if reflect.TypeOf(v).Kind() != reflect.Ptr {
- panic(newUnmarshalError(pnType, v))
- }
- switch reflect.TypeOf(v).Elem().Kind() {
- case reflect.Map:
- getMap(data, v)
- case reflect.Slice:
- getList(data, v)
- default:
- panic(newUnmarshalError(pnType, v))
- }
- }
- err := dataError("unmarshaling", data)
- if err != nil {
- panic(err)
- }
- return
-}
-
-func rewindUnmarshal(v interface{}, data *C.pn_data_t) {
- C.pn_data_rewind(data)
- C.pn_data_next(data)
- unmarshal(v, data)
-}
-
-// Getting into an interface is driven completely by the AMQP type, since the interface{}
-// target is type-neutral.
-func getInterface(data *C.pn_data_t, v *interface{}) {
- pnType := C.pn_data_type(data)
- switch pnType {
- case C.PN_BOOL:
- *v = bool(C.pn_data_get_bool(data))
- case C.PN_UBYTE:
- *v = uint8(C.pn_data_get_ubyte(data))
- case C.PN_BYTE:
- *v = int8(C.pn_data_get_byte(data))
- case C.PN_USHORT:
- *v = uint16(C.pn_data_get_ushort(data))
- case C.PN_SHORT:
- *v = int16(C.pn_data_get_short(data))
- case C.PN_UINT:
- *v = uint32(C.pn_data_get_uint(data))
- case C.PN_INT:
- *v = int32(C.pn_data_get_int(data))
- case C.PN_CHAR:
- *v = uint8(C.pn_data_get_char(data))
- case C.PN_ULONG:
- *v = uint64(C.pn_data_get_ulong(data))
- case C.PN_LONG:
- *v = int64(C.pn_data_get_long(data))
- case C.PN_FLOAT:
- *v = float32(C.pn_data_get_float(data))
- case C.PN_DOUBLE:
- *v = float64(C.pn_data_get_double(data))
- case C.PN_BINARY:
- *v = Binary(goBytes(C.pn_data_get_binary(data)))
- case C.PN_STRING:
- *v = goString(C.pn_data_get_string(data))
- case C.PN_SYMBOL:
- *v = Symbol(goString(C.pn_data_get_symbol(data)))
- case C.PN_MAP:
- m := make(Map)
- unmarshal(&m, data)
- *v = m
- case C.PN_LIST:
- l := make(List, 0)
- unmarshal(&l, data)
- *v = l
- default: // No data (-1 or NULL)
- *v = nil
- }
-}
-
-// get into map pointed at by v
-func getMap(data *C.pn_data_t, v interface{}) {
- mapValue := reflect.ValueOf(v).Elem()
- mapValue.Set(reflect.MakeMap(mapValue.Type())) // Clear the map
- switch pnType := C.pn_data_type(data); pnType {
- case C.PN_MAP:
- count := int(C.pn_data_get_map(data))
- if bool(C.pn_data_enter(data)) {
- defer C.pn_data_exit(data)
- for i := 0; i < count/2; i++ {
- if bool(C.pn_data_next(data)) {
- key := reflect.New(mapValue.Type().Key())
- unmarshal(key.Interface(), data)
- if bool(C.pn_data_next(data)) {
- val := reflect.New(mapValue.Type().Elem())
- unmarshal(val.Interface(), data)
- mapValue.SetMapIndex(key.Elem(), val.Elem())
- }
- }
- }
- }
- default: // Empty/error/unknown, leave map empty
- }
-}
-
-func getList(data *C.pn_data_t, v interface{}) {
- pnType := C.pn_data_type(data)
- if pnType != C.PN_LIST {
- panic(newUnmarshalError(pnType, v))
- }
- count := int(C.pn_data_get_list(data))
- listValue := reflect.MakeSlice(reflect.TypeOf(v).Elem(), count, count)
- if bool(C.pn_data_enter(data)) {
- for i := 0; i < count; i++ {
- if bool(C.pn_data_next(data)) {
- val := reflect.New(listValue.Type().Elem())
- unmarshal(val.Interface(), data)
- listValue.Index(i).Set(val.Elem())
- }
- }
- C.pn_data_exit(data)
- }
- reflect.ValueOf(v).Elem().Set(listValue)
-}
-
-// decode from bytes.
-// Return bytes decoded or 0 if we could not decode a complete object.
-//
-func decode(data *C.pn_data_t, bytes []byte) (int, error) {
- if len(bytes) == 0 {
- return 0, nil
- }
- n := int(C.pn_data_decode(data, cPtr(bytes), cLen(bytes)))
- if n == int(C.PN_UNDERFLOW) {
- C.pn_error_clear(C.pn_data_error(data))
- return 0, nil
- } else if n <= 0 {
- return 0, fmt.Errorf("unmarshal %s", PnErrorCode(n))
- }
- return n, nil
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/amqp/url.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/url.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/url.go
deleted file mode 100644
index fd6c8dc..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/amqp/url.go
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package amqp
-
-import (
- "errors"
- "net"
- "net/url"
- "strings"
-)
-
-const (
- amqp string = "amqp"
- amqps = "amqps"
- defaulthost = "localhost"
-)
-
-// The way this is used it can only get a hostport already validated by
-// the URL parser, so this means we can skip some error checks
-func splitHostPort(hostport string) (string, string, error) {
- if hostport == "" {
- return "", "", nil
- }
- if hostport[0] == '[' {
- // There must be a matching ']' as already validated
- if l := strings.LastIndex(hostport, "]"); len(hostport) == l+1 {
- // trim off '[' and ']'
- return hostport[1:l], "", nil
- }
- } else if strings.IndexByte(hostport, ':') < 0 {
- return hostport, "", nil
- }
- return net.SplitHostPort(hostport)
-}
-
-func UpdateURL(in *url.URL) (err error) {
- // Detect form without "amqp://" and stick it on front
- // to make it match the usual proton defaults
- u := new (url.URL)
- *u = *in
- if (u.Scheme != "" && u.Opaque != "") ||
- (u.Scheme == "" && u.Host == "") {
- input := u.String()
- input = "amqp://" + input
- u, err = url.Parse(input)
- if err != nil {
- return
- }
- }
- // If Scheme is still "" then default to amqp
- if u.Scheme == "" {
- u.Scheme = amqp
- }
- // Error if the scheme is not an amqp scheme
- if u.Scheme != amqp && u.Scheme != amqps {
- return errors.New("invalid amqp scheme")
- }
- // Decompose Host into host and port
- host, port, err := splitHostPort(u.Host)
- if err != nil {
- return
- }
- if host == "" {
- host = defaulthost
- }
- if port == "" {
- port = u.Scheme
- }
- u.Host = net.JoinHostPort(host, port)
- *in = *u
- return nil
-}
-
-// ParseUrl parses an AMQP URL string and returns a net/url.Url.
-//
-// It is more forgiving than net/url.Parse and allows most of the parts of the
-// URL to be missing, assuming AMQP defaults.
-//
-func ParseURL(s string) (u *url.URL, err error) {
- if u, err = url.Parse(s); err != nil {
- return
- }
- if err = UpdateURL(u); err != nil {
- u = nil
- }
- return u, err
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/amqp/url_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/url_test.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/url_test.go
deleted file mode 100644
index f52d4bf..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/amqp/url_test.go
+++ /dev/null
@@ -1,59 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package amqp
-
-import (
- "fmt"
-)
-
-func ExampleParseURL() {
- for _, s := range []string{
- "amqp://username:password@host:1234/path",
- "host:1234",
- "host",
- "host/path",
- "amqps://host",
- "/path",
- "",
- ":1234",
- // Taken out becasue the go 1.4 URL parser isn't the same as later
- //"[::1]",
- //"[::1",
- // Output would be:
- // amqp://[::1]:amqp
- // parse amqp://[::1: missing ']' in host
- } {
- u, err := ParseURL(s)
- if err != nil {
- fmt.Println(err)
- } else {
- fmt.Println(u)
- }
- }
- // Output:
- // amqp://username:password@host:1234/path
- // amqp://host:1234
- // amqp://host:amqp
- // amqp://host:amqp/path
- // amqps://host:amqps
- // amqp://localhost:amqp/path
- // amqp://localhost:amqp
- // parse :1234: missing protocol scheme
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/amqp/version.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/version.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/version.go
deleted file mode 100644
index cefa904..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/amqp/version.go
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package amqp
-
-// Version check for proton library.
-// Done here because this is the lowest-level dependency for all the proton Go packages.
-
-// #include <proton/version.h>
-// #if PN_VERSION_MINOR < 10
-// #error packages qpid.apache.org/... require Proton-C library version 0.10 or greater
-// #endif
-import "C"
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/electron/auth_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/auth_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/auth_test.go
deleted file mode 100644
index 73a9299..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/auth_test.go
+++ /dev/null
@@ -1,124 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package electron
-
-import (
- "fmt"
- "io/ioutil"
- "os"
- "os/exec"
- "path/filepath"
- "strings"
- "testing"
-)
-
-func testAuthClientServer(t *testing.T, copts []ConnectionOption, sopts []ConnectionOption) (got connectionSettings, err error) {
- client, server := newClientServerOpts(t, copts, sopts)
- defer closeClientServer(client, server)
-
- go func() {
- for in := range server.Incoming() {
- switch in := in.(type) {
- case *IncomingConnection:
- got = connectionSettings{user: in.User(), virtualHost: in.VirtualHost()}
- }
- in.Accept()
- }
- }()
-
- err = client.Sync()
- return
-}
-
-func TestAuthAnonymous(t *testing.T) {
- fatalIf(t, configureSASL())
- got, err := testAuthClientServer(t,
- []ConnectionOption{User("fred"), VirtualHost("vhost"), SASLAllowInsecure(true)},
- []ConnectionOption{SASLAllowedMechs("ANONYMOUS"), SASLAllowInsecure(true)})
- fatalIf(t, err)
- errorIf(t, checkEqual(connectionSettings{user: "anonymous", virtualHost: "vhost"}, got))
-}
-
-func TestAuthPlain(t *testing.T) {
- fatalIf(t, configureSASL())
- got, err := testAuthClientServer(t,
- []ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN"), User("fred@proton"), Password([]byte("xxx"))},
- []ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN")})
- fatalIf(t, err)
- errorIf(t, checkEqual(connectionSettings{user: "fred@proton"}, got))
-}
-
-func TestAuthBadPass(t *testing.T) {
- fatalIf(t, configureSASL())
- _, err := testAuthClientServer(t,
- []ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN"), User("fred@proton"), Password([]byte("yyy"))},
- []ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN")})
- if err == nil {
- t.Error("Expected auth failure for bad pass")
- }
-}
-
-func TestAuthBadUser(t *testing.T) {
- fatalIf(t, configureSASL())
- _, err := testAuthClientServer(t,
- []ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN"), User("foo@bar"), Password([]byte("yyy"))},
- []ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN")})
- if err == nil {
- t.Error("Expected auth failure for bad user")
- }
-}
-
-var confDir string
-var confErr error
-
-func configureSASL() error {
- if confDir != "" || confErr != nil {
- return confErr
- }
- confDir, confErr = ioutil.TempDir("", "")
- if confErr != nil {
- return confErr
- }
-
- GlobalSASLConfigDir(confDir)
- GlobalSASLConfigName("test")
- conf := filepath.Join(confDir, "test.conf")
-
- db := filepath.Join(confDir, "proton.sasldb")
- cmd := exec.Command("saslpasswd2", "-c", "-p", "-f", db, "-u", "proton", "fred")
- cmd.Stdin = strings.NewReader("xxx") // Password
- if out, err := cmd.CombinedOutput(); err != nil {
- confErr = fmt.Errorf("saslpasswd2 failed: %s\n%s", err, out)
- return confErr
- }
- confStr := "sasldb_path: " + db + "\nmech_list: EXTERNAL DIGEST-MD5 SCRAM-SHA-1 CRAM-MD5 PLAIN ANONYMOUS\n"
- if err := ioutil.WriteFile(conf, []byte(confStr), os.ModePerm); err != nil {
- confErr = fmt.Errorf("write conf file %s failed: %s", conf, err)
- }
- return confErr
-}
-
-func TestMain(m *testing.M) {
- status := m.Run()
- if confDir != "" {
- _ = os.RemoveAll(confDir)
- }
- os.Exit(status)
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
deleted file mode 100644
index 7f3050f..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
+++ /dev/null
@@ -1,405 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package electron
-
-// #include <proton/disposition.h>
-import "C"
-
-import (
- "net"
- "qpid.apache.org/proton"
- "sync"
- "time"
-)
-
-// Settings associated with a Connection.
-type ConnectionSettings interface {
- // Authenticated user name associated with the connection.
- User() string
-
- // The AMQP virtual host name for the connection.
- //
- // Optional, useful when the server has multiple names and provides different
- // service based on the name the client uses to connect.
- //
- // By default it is set to the DNS host name that the client uses to connect,
- // but it can be set to something different at the client side with the
- // VirtualHost() option.
- //
- // Returns error if the connection fails to authenticate.
- VirtualHost() string
-
- // Heartbeat is the maximum delay between sending frames that the remote peer
- // has requested of us. If the interval expires an empty "heartbeat" frame
- // will be sent automatically to keep the connection open.
- Heartbeat() time.Duration
-}
-
-// Connection is an AMQP connection, created by a Container.
-type Connection interface {
- Endpoint
- ConnectionSettings
-
- // Sender opens a new sender on the DefaultSession.
- Sender(...LinkOption) (Sender, error)
-
- // Receiver opens a new Receiver on the DefaultSession().
- Receiver(...LinkOption) (Receiver, error)
-
- // DefaultSession() returns a default session for the connection. It is opened
- // on the first call to DefaultSession and returned on subsequent calls.
- DefaultSession() (Session, error)
-
- // Session opens a new session.
- Session(...SessionOption) (Session, error)
-
- // Container for the connection.
- Container() Container
-
- // Disconnect the connection abruptly with an error.
- Disconnect(error)
-
- // Wait waits for the connection to be disconnected.
- Wait() error
-
- // WaitTimeout is like Wait but returns Timeout if the timeout expires.
- WaitTimeout(time.Duration) error
-
- // Incoming returns a channel for incoming endpoints opened by the remote peer.
- // See the Incoming interface for more.
- //
- // Not receiving from Incoming() and calling Accept/Reject will block the
- // electron event loop. You should run a loop to handle the types that
- // interest you in a switch{} and and Accept() all others.
- Incoming() <-chan Incoming
-}
-
-type connectionSettings struct {
- user, virtualHost string
- heartbeat time.Duration
-}
-
-func (c connectionSettings) User() string { return c.user }
-func (c connectionSettings) VirtualHost() string { return c.virtualHost }
-func (c connectionSettings) Heartbeat() time.Duration { return c.heartbeat }
-
-// ConnectionOption can be passed when creating a connection to configure various options
-type ConnectionOption func(*connection)
-
-// User returns a ConnectionOption sets the user name for a connection
-func User(user string) ConnectionOption {
- return func(c *connection) {
- c.user = user
- c.pConnection.SetUser(user)
- }
-}
-
-// VirtualHost returns a ConnectionOption to set the AMQP virtual host for the connection.
-// Only applies to outbound client connection.
-func VirtualHost(virtualHost string) ConnectionOption {
- return func(c *connection) {
- c.virtualHost = virtualHost
- c.pConnection.SetHostname(virtualHost)
- }
-}
-
-// Password returns a ConnectionOption to set the password used to establish a
-// connection. Only applies to outbound client connection.
-//
-// The connection will erase its copy of the password from memory as soon as it
-// has been used to authenticate. If you are concerned about paswords staying in
-// memory you should never store them as strings, and should overwrite your
-// copy as soon as you are done with it.
-//
-func Password(password []byte) ConnectionOption {
- return func(c *connection) { c.pConnection.SetPassword(password) }
-}
-
-// Server returns a ConnectionOption to put the connection in server mode for incoming connections.
-//
-// A server connection will do protocol negotiation to accept a incoming AMQP
-// connection. Normally you would call this for a connection created by
-// net.Listener.Accept()
-//
-func Server() ConnectionOption {
- return func(c *connection) { c.engine.Server(); c.server = true; AllowIncoming()(c) }
-}
-
-// AllowIncoming returns a ConnectionOption to enable incoming endpoints, see
-// Connection.Incoming() This is automatically set for Server() connections.
-func AllowIncoming() ConnectionOption {
- return func(c *connection) { c.incoming = make(chan Incoming) }
-}
-
-// Parent returns a ConnectionOption that associates the Connection with it's Container
-// If not set a connection will create its own default container.
-func Parent(cont Container) ConnectionOption {
- return func(c *connection) { c.container = cont.(*container) }
-}
-
-type connection struct {
- endpoint
- connectionSettings
-
- defaultSessionOnce, closeOnce sync.Once
-
- container *container
- conn net.Conn
- server bool
- incoming chan Incoming
- handler *handler
- engine *proton.Engine
- pConnection proton.Connection
-
- defaultSession Session
-}
-
-// NewConnection creates a connection with the given options.
-func NewConnection(conn net.Conn, opts ...ConnectionOption) (*connection, error) {
- c := &connection{
- conn: conn,
- }
- c.handler = newHandler(c)
- var err error
- c.engine, err = proton.NewEngine(c.conn, c.handler.delegator)
- if err != nil {
- return nil, err
- }
- c.pConnection = c.engine.Connection()
- for _, set := range opts {
- set(c)
- }
- if c.container == nil {
- c.container = NewContainer("").(*container)
- }
- c.pConnection.SetContainer(c.container.Id())
- globalSASLInit(c.engine)
-
- c.endpoint.init(c.engine.String())
- go c.run()
- return c, nil
-}
-
-func (c *connection) run() {
- if !c.server {
- c.pConnection.Open()
- }
- _ = c.engine.Run()
- if c.incoming != nil {
- close(c.incoming)
- }
- _ = c.closed(Closed)
-}
-
-func (c *connection) Close(err error) {
- c.err.Set(err)
- c.engine.Close(err)
-}
-
-func (c *connection) Disconnect(err error) {
- c.err.Set(err)
- c.engine.Disconnect(err)
-}
-
-func (c *connection) Session(opts ...SessionOption) (Session, error) {
- var s Session
- err := c.engine.InjectWait(func() error {
- if c.Error() != nil {
- return c.Error()
- }
- pSession, err := c.engine.Connection().Session()
- if err == nil {
- pSession.Open()
- if err == nil {
- s = newSession(c, pSession, opts...)
- }
- }
- return err
- })
- return s, err
-}
-
-func (c *connection) Container() Container { return c.container }
-
-func (c *connection) DefaultSession() (s Session, err error) {
- c.defaultSessionOnce.Do(func() {
- c.defaultSession, err = c.Session()
- })
- if err == nil {
- err = c.Error()
- }
- return c.defaultSession, err
-}
-
-func (c *connection) Sender(opts ...LinkOption) (Sender, error) {
- if s, err := c.DefaultSession(); err == nil {
- return s.Sender(opts...)
- } else {
- return nil, err
- }
-}
-
-func (c *connection) Receiver(opts ...LinkOption) (Receiver, error) {
- if s, err := c.DefaultSession(); err == nil {
- return s.Receiver(opts...)
- } else {
- return nil, err
- }
-}
-
-func (c *connection) Connection() Connection { return c }
-
-func (c *connection) Wait() error { return c.WaitTimeout(Forever) }
-func (c *connection) WaitTimeout(timeout time.Duration) error {
- _, err := timedReceive(c.done, timeout)
- if err == Timeout {
- return Timeout
- }
- return c.Error()
-}
-
-func (c *connection) Incoming() <-chan Incoming {
- assert(c.incoming != nil, "electron.Connection.Incoming() disabled for %s", c)
- return c.incoming
-}
-
-type IncomingConnection struct {
- incoming
- connectionSettings
- c *connection
-}
-
-func newIncomingConnection(c *connection) *IncomingConnection {
- c.user = c.pConnection.Transport().User()
- c.virtualHost = c.pConnection.RemoteHostname()
- return &IncomingConnection{
- incoming: makeIncoming(c.pConnection),
- connectionSettings: c.connectionSettings,
- c: c}
-}
-
-// AcceptConnection is like Accept() but takes ConnectionOption s
-// For example you can set the Heartbeat() for the accepted connection.
-func (in *IncomingConnection) AcceptConnection(opts ...ConnectionOption) Connection {
- return in.accept(func() Endpoint {
- for _, opt := range opts {
- opt(in.c)
- }
- in.c.pConnection.Open()
- return in.c
- }).(Connection)
-}
-
-func (in *IncomingConnection) Accept() Endpoint {
- return in.AcceptConnection()
-}
-
-func sasl(c *connection) proton.SASL { return c.engine.Transport().SASL() }
-
-// SASLEnable returns a ConnectionOption that enables SASL authentication.
-// Only required if you don't set any other SASL options.
-func SASLEnable() ConnectionOption { return func(c *connection) { sasl(c) } }
-
-// SASLAllowedMechs returns a ConnectionOption to set the list of allowed SASL
-// mechanisms.
-//
-// Can be used on the client or the server to restrict the SASL for a connection.
-// mechs is a space-separated list of mechanism names.
-//
-func SASLAllowedMechs(mechs string) ConnectionOption {
- return func(c *connection) { sasl(c).AllowedMechs(mechs) }
-}
-
-// SASLAllowInsecure returns a ConnectionOption that allows or disallows clear
-// text SASL authentication mechanisms
-//
-// By default the SASL layer is configured not to allow mechanisms that disclose
-// the clear text of the password over an unencrypted AMQP connection. This specifically
-// will disallow the use of the PLAIN mechanism without using SSL encryption.
-//
-// This default is to avoid disclosing password information accidentally over an
-// insecure network.
-//
-func SASLAllowInsecure(b bool) ConnectionOption {
- return func(c *connection) { sasl(c).SetAllowInsecureMechs(b) }
-}
-
-// Heartbeat returns a ConnectionOption that requests the maximum delay
-// between sending frames for the remote peer. If we don't receive any frames
-// within 2*delay we will close the connection.
-//
-func Heartbeat(delay time.Duration) ConnectionOption {
- // Proton-C divides the idle-timeout by 2 before sending, so compensate.
- return func(c *connection) { c.engine.Transport().SetIdleTimeout(2 * delay) }
-}
-
-// GlobalSASLConfigDir sets the SASL configuration directory for every
-// Connection created in this process. If not called, the default is determined
-// by your SASL installation.
-//
-// You can set SASLAllowInsecure and SASLAllowedMechs on individual connections.
-//
-func GlobalSASLConfigDir(dir string) { globalSASLConfigDir = dir }
-
-// GlobalSASLConfigName sets the SASL configuration name for every Connection
-// created in this process. If not called the default is "proton-server".
-//
-// The complete configuration file name is
-// <sasl-config-dir>/<sasl-config-name>.conf
-//
-// You can set SASLAllowInsecure and SASLAllowedMechs on individual connections.
-//
-func GlobalSASLConfigName(dir string) { globalSASLConfigName = dir }
-
-var (
- globalSASLConfigName string
- globalSASLConfigDir string
-)
-
-// TODO aconway 2016-09-15: Current pn_sasl C impl config is broken, so all we
-// can realistically offer is global configuration. Later if/when the pn_sasl C
-// impl is fixed we can offer per connection over-rides.
-func globalSASLInit(eng *proton.Engine) {
- sasl := eng.Transport().SASL()
- if globalSASLConfigName != "" {
- sasl.ConfigName(globalSASLConfigName)
- }
- if globalSASLConfigDir != "" {
- sasl.ConfigPath(globalSASLConfigDir)
- }
-}
-
-// Dial is shorthand for using net.Dial() then NewConnection()
-func Dial(network, addr string, opts ...ConnectionOption) (c Connection, err error) {
- conn, err := net.Dial(network, addr)
- if err == nil {
- c, err = NewConnection(conn, opts...)
- }
- return
-}
-
-// DialWithDialer is shorthand for using dialer.Dial() then NewConnection()
-func DialWithDialer(dialer *net.Dialer, network, addr string, opts ...ConnectionOption) (c Connection, err error) {
- conn, err := dialer.Dial(network, addr)
- if err == nil {
- c, err = NewConnection(conn, opts...)
- }
- return
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/container.go b/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
deleted file mode 100644
index efb24ff..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package electron
-
-import (
- "net"
- "qpid.apache.org/proton"
- "strconv"
- "sync/atomic"
-)
-
-// Container is an AMQP container, it represents a single AMQP "application"
-// which can have multiple client or server connections.
-//
-// Each Container in a distributed AMQP application must have a unique
-// container-id which is applied to its connections.
-//
-// Create with NewContainer()
-//
-type Container interface {
- // Id is a unique identifier for the container in your distributed application.
- Id() string
-
- // Connection creates a connection associated with this container.
- Connection(conn net.Conn, opts ...ConnectionOption) (Connection, error)
-
- // Dial is shorthand for
- // conn, err := net.Dial(); c, err := Connection(conn, opts...)
- Dial(network string, addr string, opts ...ConnectionOption) (Connection, error)
-
- // Accept is shorthand for:
- // conn, err := l.Accept(); c, err := Connection(conn, append(opts, Server()...)
- Accept(l net.Listener, opts ...ConnectionOption) (Connection, error)
-
- // String returns Id()
- String() string
-}
-
-type container struct {
- id string
- tagCounter uint64
-}
-
-func (cont *container) nextTag() string {
- return strconv.FormatUint(atomic.AddUint64(&cont.tagCounter, 1), 32)
-}
-
-// NewContainer creates a new container. The id must be unique in your
-// distributed application, all connections created by the container
-// will have this container-id.
-//
-// If id == "" a random UUID will be generated for the id.
-func NewContainer(id string) Container {
- if id == "" {
- id = proton.UUID4().String()
- }
- cont := &container{id: id}
- return cont
-}
-
-func (cont *container) Id() string { return cont.id }
-
-func (cont *container) String() string { return cont.Id() }
-
-func (cont *container) nextLinkName() string {
- return cont.id + "@" + cont.nextTag()
-}
-
-func (cont *container) Connection(conn net.Conn, opts ...ConnectionOption) (Connection, error) {
- return NewConnection(conn, append(opts, Parent(cont))...)
-}
-
-func (cont *container) Dial(network, address string, opts ...ConnectionOption) (c Connection, err error) {
- conn, err := net.Dial(network, address)
- if err == nil {
- c, err = cont.Connection(conn, opts...)
- }
- return
-}
-
-func (cont *container) Accept(l net.Listener, opts ...ConnectionOption) (c Connection, err error) {
- conn, err := l.Accept()
- if err == nil {
- c, err = cont.Connection(conn, append(opts, Server())...)
- }
- return
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org