You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2015/11/23 18:58:18 UTC
[50/50] [abbrv] qpid-proton git commit: Merge branch 'master' into go1
Merge branch 'master' into go1
# Conflicts:
# readme-branch.md
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/9e788b2d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/9e788b2d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/9e788b2d
Branch: refs/heads/go1
Commit: 9e788b2d484a79154d9d05a93bd6447730ab7738
Parents: e1a83ee 1cfa056
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Nov 23 12:36:33 2015 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Nov 23 12:52:44 2015 -0500
----------------------------------------------------------------------
amqp/error.go | 39 ++-
amqp/interop | 1 -
amqp/marshal.go | 10 +-
amqp/message.go | 9 +-
amqp/types.go | 6 +-
amqp/unmarshal.go | 41 +--
amqp/url.go | 6 +-
amqp/url_test.go | 2 +-
electron/connection.go | 128 ++++++----
electron/container.go | 20 +-
electron/doc.go | 14 +-
electron/endpoint.go | 36 ++-
electron/error.go | 35 +++
electron/handler.go | 97 ++++---
electron/link.go | 86 ++++---
electron/messaging_test.go | 182 +++++++-------
electron/receiver.go | 60 +++--
electron/sender.go | 375 ++++++++++++---------------
electron/session.go | 73 +++---
electron/time.go | 13 +-
internal/error.go | 118 ---------
internal/flexchannel.go | 82 ------
internal/flexchannel_test.go | 89 -------
internal/safemap.go | 57 -----
internal/uuid.go | 70 ------
proton/engine.go | 27 +-
proton/error.go | 104 ++++----
proton/handlers.go | 7 +-
proton/interop_test.go | 290 ---------------------
proton/marshal.go | 210 ----------------
proton/message.go | 29 ++-
proton/types.go | 151 -----------
proton/unfinished.go | 53 ----
proton/unmarshal.go | 517 --------------------------------------
proton/url.go | 96 -------
proton/url_test.go | 51 ----
proton/uuid.go | 57 +++++
proton/wrappers.go | 39 +--
proton/wrappers_gen.go | 9 +-
readme-go-get.md | 18 ++
40 files changed, 873 insertions(+), 2434 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/amqp/error.go
----------------------------------------------------------------------
diff --cc amqp/error.go
index 868dbf3,0000000..349fc41
mode 100644,000000..100644
--- a/amqp/error.go
+++ b/amqp/error.go
@@@ -1,66 -1,0 +1,103 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package amqp
+
++// #include <proton/error.h>
++import "C"
++
+import (
+ "fmt"
+ "reflect"
+)
+
+// Error is an AMQP error condition. It has a name and a description.
+// It implements the Go error interface so can be returned as an error value.
+//
+// You can pass amqp.Error to methods that pass an error to a remote endpoint,
+// this gives you full control over what the remote endpoint will see.
+//
+// You can also pass any Go error to such functions, the remote peer
+// will see the equivalent of MakeError(error)
+//
+type Error struct{ Name, Description string }
+
+// Error implements the Go error interface for AMQP error errors.
- func (c Error) Error() string { return fmt.Sprintf("proton %s: %s", c.Name, c.Description) }
++func (c Error) Error() string { return fmt.Sprintf("%s: %s", c.Name, c.Description) }
+
+// Errorf makes a Error with name and formatted description as per fmt.Sprintf
+func Errorf(name, format string, arg ...interface{}) Error {
+ return Error{name, fmt.Sprintf(format, arg...)}
+}
+
+// MakeError makes an AMQP error from a go error using the Go error type as the name
+// and the err.Error() string as the description.
+func MakeError(err error) Error {
+ return Error{reflect.TypeOf(err).Name(), err.Error()}
+}
+
+var (
+ InternalError = "amqp:internal-error"
+ NotFound = "amqp:not-found"
+ UnauthorizedAccess = "amqp:unauthorized-access"
+ DecodeError = "amqp:decode-error"
+ ResourceLimit = "amqp:resource-limit"
+ NotAllowed = "amqp:not-allowed"
+ InvalidField = "amqp:invalid-field"
+ NotImplemented = "amqp:not-implemented"
+ ResourceLocked = "amqp:resource-locked"
+ PreerrorFailed = "amqp:preerror-failed"
+ ResourceDeleted = "amqp:resource-deleted"
+ IllegalState = "amqp:illegal-state"
+ FrameSizeTooSmall = "amqp:frame-size-too-small"
+)
++
++type PnErrorCode int
++
++func (e PnErrorCode) String() string {
++ switch e {
++ case C.PN_EOS:
++ return "end-of-data"
++ case C.PN_ERR:
++ return "error"
++ case C.PN_OVERFLOW:
++ return "overflow"
++ case C.PN_UNDERFLOW:
++ return "underflow"
++ case C.PN_STATE_ERR:
++ return "bad-state"
++ case C.PN_ARG_ERR:
++ return "invalid-argument"
++ case C.PN_TIMEOUT:
++ return "timeout"
++ case C.PN_INTR:
++ return "interrupted"
++ case C.PN_INPROGRESS:
++ return "in-progress"
++ default:
++ return fmt.Sprintf("unknown-error(%d)", e)
++ }
++}
++
++func PnError(e *C.pn_error_t) error {
++ if e == nil || C.pn_error_code(e) == 0 {
++ return nil
++ }
++ return fmt.Errorf("%s: %s", PnErrorCode(C.pn_error_code(e)), C.GoString(C.pn_error_text(e)))
++}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/amqp/marshal.go
----------------------------------------------------------------------
diff --cc amqp/marshal.go
index 666b4f6,0000000..9930e13
mode 100644,000000..100644
--- a/amqp/marshal.go
+++ b/amqp/marshal.go
@@@ -1,250 -1,0 +1,250 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package amqp
+
+// #include <proton/codec.h>
+import "C"
+
+import (
++ "fmt"
+ "io"
- "qpid.apache.org/internal"
+ "reflect"
+ "unsafe"
+)
+
+func dataError(prefix string, data *C.pn_data_t) error {
- err := internal.PnError(unsafe.Pointer(C.pn_data_error(data)))
++ err := PnError(C.pn_data_error(data))
+ if err != nil {
- err = internal.Errorf("%s: %s", prefix, err.(internal.Error))
++ err = fmt.Errorf("%s: %s", prefix, err.Error())
+ }
+ return err
+}
+
+/*
+Marshal encodes a Go value as AMQP data in buffer.
+If buffer is nil, or is not large enough, a new buffer is created.
+
+Returns the buffer used for encoding with len() adjusted to the actual size of data.
+
+Go types are encoded as follows
+
+ +-------------------------------------+--------------------------------------------+
+ |Go type |AMQP type |
+ +-------------------------------------+--------------------------------------------+
+ |bool |bool |
+ +-------------------------------------+--------------------------------------------+
+ |int8, int16, int32, int64 (int) |byte, short, int, long (int or long) |
+ +-------------------------------------+--------------------------------------------+
+ |uint8, uint16, uint32, uint64 (uint) |ubyte, ushort, uint, ulong (uint or ulong) |
+ +-------------------------------------+--------------------------------------------+
+ |float32, float64 |float, double. |
+ +-------------------------------------+--------------------------------------------+
+ |string |string |
+ +-------------------------------------+--------------------------------------------+
+ |[]byte, Binary |binary |
+ +-------------------------------------+--------------------------------------------+
+ |Symbol |symbol |
+ +-------------------------------------+--------------------------------------------+
+ |interface{} |the contained type |
+ +-------------------------------------+--------------------------------------------+
+ |nil |null |
+ +-------------------------------------+--------------------------------------------+
+ |map[K]T |map with K and T converted as above |
+ +-------------------------------------+--------------------------------------------+
+ |Map |map, may have mixed types for keys, values |
+ +-------------------------------------+--------------------------------------------+
+ |[]T |list with T converted as above |
+ +-------------------------------------+--------------------------------------------+
+ |List |list, may have mixed types values |
+ +-------------------------------------+--------------------------------------------+
+
+The following Go types cannot be marshaled: uintptr, function, interface, channel
+
+TODO
+
+Go types: array, slice, struct, complex64/128.
+
+AMQP types: decimal32/64/128, char, timestamp, uuid, array, multi-section message bodies.
+
+Described types.
+
+*/
+func Marshal(v interface{}, buffer []byte) (outbuf []byte, err error) {
+ defer doRecover(&err)
+ data := C.pn_data(0)
+ defer C.pn_data_free(data)
+ marshal(v, data)
+ encode := func(buf []byte) ([]byte, error) {
+ n := int(C.pn_data_encode(data, cPtr(buf), cLen(buf)))
+ switch {
+ case n == int(C.PN_OVERFLOW):
+ return buf, overflow
+ case n < 0:
+ return buf, dataError("marshal error", data)
+ default:
+ return buf[:n], nil
+ }
+ }
+ return encodeGrow(buffer, encode)
+}
+
+const minEncode = 256
+
+// overflow is returned when an encoding function can't fit data in the buffer.
- var overflow = internal.Errorf("buffer too small")
++var overflow = fmt.Errorf("buffer too small")
+
+// encodeFn encodes into buffer[0:len(buffer)].
+// Returns buffer with length adjusted for data encoded.
+// If buffer too small, returns overflow as error.
+type encodeFn func(buffer []byte) ([]byte, error)
+
+// encodeGrow calls encode() into buffer, if it returns overflow grows the buffer.
+// Returns the final buffer.
+func encodeGrow(buffer []byte, encode encodeFn) ([]byte, error) {
+ if buffer == nil || len(buffer) == 0 {
+ buffer = make([]byte, minEncode)
+ }
+ var err error
+ for buffer, err = encode(buffer); err == overflow; buffer, err = encode(buffer) {
+ buffer = make([]byte, 2*len(buffer))
+ }
+ return buffer, err
+}
+
+func marshal(v interface{}, data *C.pn_data_t) {
+ switch v := v.(type) {
+ case nil:
+ C.pn_data_put_null(data)
+ case bool:
+ C.pn_data_put_bool(data, C.bool(v))
+ case int8:
+ C.pn_data_put_byte(data, C.int8_t(v))
+ case int16:
+ C.pn_data_put_short(data, C.int16_t(v))
+ case int32:
+ C.pn_data_put_int(data, C.int32_t(v))
+ case int64:
+ C.pn_data_put_long(data, C.int64_t(v))
+ case int:
+ if unsafe.Sizeof(0) == 8 {
+ C.pn_data_put_long(data, C.int64_t(v))
+ } else {
+ C.pn_data_put_int(data, C.int32_t(v))
+ }
+ case uint8:
+ C.pn_data_put_ubyte(data, C.uint8_t(v))
+ case uint16:
+ C.pn_data_put_ushort(data, C.uint16_t(v))
+ case uint32:
+ C.pn_data_put_uint(data, C.uint32_t(v))
+ case uint64:
+ C.pn_data_put_ulong(data, C.uint64_t(v))
+ case uint:
+ if unsafe.Sizeof(0) == 8 {
+ C.pn_data_put_ulong(data, C.uint64_t(v))
+ } else {
+ C.pn_data_put_uint(data, C.uint32_t(v))
+ }
+ case float32:
+ C.pn_data_put_float(data, C.float(v))
+ case float64:
+ C.pn_data_put_double(data, C.double(v))
+ case string:
+ C.pn_data_put_string(data, pnBytes([]byte(v)))
+ case []byte:
+ C.pn_data_put_binary(data, pnBytes(v))
+ case Binary:
+ C.pn_data_put_binary(data, pnBytes([]byte(v)))
+ case Symbol:
+ C.pn_data_put_symbol(data, pnBytes([]byte(v)))
+ case Map: // Special map type
+ C.pn_data_put_map(data)
+ C.pn_data_enter(data)
+ for key, val := range v {
+ marshal(key, data)
+ marshal(val, data)
+ }
+ C.pn_data_exit(data)
+ default:
+ switch reflect.TypeOf(v).Kind() {
+ case reflect.Map:
+ putMap(data, v)
+ case reflect.Slice:
+ putList(data, v)
+ default:
- panic(internal.Errorf("cannot marshal %s to AMQP", reflect.TypeOf(v)))
++ panic(fmt.Errorf("cannot marshal %s to AMQP", reflect.TypeOf(v)))
+ }
+ }
+ err := dataError("marshal", data)
+ if err != nil {
+ panic(err)
+ }
+ return
+}
+
+func clearMarshal(v interface{}, data *C.pn_data_t) {
+ C.pn_data_clear(data)
+ marshal(v, data)
+}
+
+func putMap(data *C.pn_data_t, v interface{}) {
+ mapValue := reflect.ValueOf(v)
+ C.pn_data_put_map(data)
+ C.pn_data_enter(data)
+ for _, key := range mapValue.MapKeys() {
+ marshal(key.Interface(), data)
+ marshal(mapValue.MapIndex(key).Interface(), data)
+ }
+ C.pn_data_exit(data)
+}
+
+func putList(data *C.pn_data_t, v interface{}) {
+ listValue := reflect.ValueOf(v)
+ C.pn_data_put_list(data)
+ C.pn_data_enter(data)
+ for i := 0; i < listValue.Len(); i++ {
+ marshal(listValue.Index(i).Interface(), data)
+ }
+ C.pn_data_exit(data)
+}
+
+// Encoder encodes AMQP values to an io.Writer
+type Encoder struct {
+ writer io.Writer
+ buffer []byte
+}
+
+// New encoder returns a new encoder that writes to w.
+func NewEncoder(w io.Writer) *Encoder {
+ return &Encoder{w, make([]byte, minEncode)}
+}
+
+func (e *Encoder) Encode(v interface{}) (err error) {
+ e.buffer, err = Marshal(v, e.buffer)
+ if err == nil {
+ e.writer.Write(e.buffer)
+ }
+ return err
+}
+
+func replace(data *C.pn_data_t, v interface{}) {
+ C.pn_data_clear(data)
+ marshal(v, data)
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/amqp/message.go
----------------------------------------------------------------------
diff --cc amqp/message.go
index 5ba4f4f,0000000..e36c6f2
mode 100644,000000..100644
--- a/amqp/message.go
+++ b/amqp/message.go
@@@ -1,347 -1,0 +1,346 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package amqp
+
+// #include <proton/types.h>
+// #include <proton/message.h>
+// #include <proton/codec.h>
+// #include <stdlib.h>
+//
+// /* Helper for setting message string fields */
+// typedef int (*set_fn)(pn_message_t*, const char*);
+// int msg_set_str(pn_message_t* m, char* s, set_fn set) {
+// int result = set(m, s);
+// free(s);
+// return result;
+// }
+//
+import "C"
+
+import (
- "qpid.apache.org/internal"
++ "fmt"
+ "runtime"
+ "time"
+ "unsafe"
+)
+
+// Message is the interface to an AMQP message.
+type Message interface {
+ // Durable indicates that any parties taking responsibility
+ // for the message must durably store the content.
+ Durable() bool
+ SetDurable(bool)
+
+ // Priority impacts ordering guarantees. Within a
+ // given ordered context, higher priority messages may jump ahead of
+ // lower priority messages.
+ Priority() uint8
+ SetPriority(uint8)
+
+ // TTL or Time To Live, a message it may be dropped after this duration
+ TTL() time.Duration
+ SetTTL(time.Duration)
+
+ // FirstAcquirer indicates
+ // that the recipient of the message is the first recipient to acquire
+ // the message, i.e. there have been no failed delivery attempts to
+ // other acquirers. Note that this does not mean the message has not
+ // been delivered to, but not acquired, by other recipients.
+ FirstAcquirer() bool
+ SetFirstAcquirer(bool)
+
+ // DeliveryCount tracks how many attempts have been made to
+ // delivery a message.
+ DeliveryCount() uint32
+ SetDeliveryCount(uint32)
+
+ // MessageId provides a unique identifier for a message.
+ // it can be an a string, an unsigned long, a uuid or a
+ // binary value.
+ MessageId() interface{}
+ SetMessageId(interface{})
+
+ UserId() string
+ SetUserId(string)
+
+ Address() string
+ SetAddress(string)
+
+ Subject() string
+ SetSubject(string)
+
+ ReplyTo() string
+ SetReplyTo(string)
+
+ // CorrelationId is set on correlated request and response messages. It can be
+ // an a string, an unsigned long, a uuid or a binary value.
+ CorrelationId() interface{}
+ SetCorrelationId(interface{})
+
+ ContentType() string
+ SetContentType(string)
+
+ ContentEncoding() string
+ SetContentEncoding(string)
+
+ // ExpiryTime indicates an absoulte time when the message may be dropped.
+ // A Zero time (i.e. t.isZero() == true) indicates a message never expires.
+ ExpiryTime() time.Time
+ SetExpiryTime(time.Time)
+
+ CreationTime() time.Time
+ SetCreationTime(time.Time)
+
+ GroupId() string
+ SetGroupId(string)
+
+ GroupSequence() int32
+ SetGroupSequence(int32)
+
+ ReplyToGroupId() string
+ SetReplyToGroupId(string)
+
+ // Instructions - AMQP delivery instructions.
+ Instructions() map[string]interface{}
+ SetInstructions(v map[string]interface{})
+
+ // Annotations - AMQP annotations.
+ Annotations() map[string]interface{}
+ SetAnnotations(v map[string]interface{})
+
+ // Properties - Application properties.
+ Properties() map[string]interface{}
+ SetProperties(v map[string]interface{})
+
+ // Inferred indicates how the message content
+ // is encoded into AMQP sections. If inferred is true then binary and
+ // list values in the body of the message will be encoded as AMQP DATA
+ // and AMQP SEQUENCE sections, respectively. If inferred is false,
+ // then all values in the body of the message will be encoded as AMQP
+ // VALUE sections regardless of their type.
+ Inferred() bool
+ SetInferred(bool)
+
+ // Marshal a Go value into the message body. See amqp.Marshal() for details.
+ Marshal(interface{})
+
+ // Unmarshal the message body into the value pointed to by v. See amqp.Unmarshal() for details.
+ Unmarshal(interface{})
+
+ // Body value resulting from the default unmarshalling of message body as interface{}
+ Body() interface{}
+
+ // Encode encodes the message as AMQP data. If buffer is non-nil and is large enough
+ // the message is encoded into it, otherwise a new buffer is created.
+ // Returns the buffer containing the message.
+ Encode(buffer []byte) ([]byte, error)
+
+ // Decode data into this message. Overwrites an existing message content.
+ Decode(buffer []byte) error
+
+ // Clear the message contents.
+ Clear()
+
+ // Copy the contents of another message to this one.
+ Copy(m Message) error
+}
+
+type message struct{ pn *C.pn_message_t }
+
+func freeMessage(m *message) {
+ C.pn_message_free(m.pn)
+ m.pn = nil
+}
+
+// NewMessage creates a new message instance.
+func NewMessage() Message {
+ m := &message{C.pn_message()}
+ runtime.SetFinalizer(m, freeMessage)
+ return m
+}
+
+// NewMessageWith creates a message with value as the body. Equivalent to
+// m := NewMessage(); m.Marshal(body)
+func NewMessageWith(value interface{}) Message {
+ m := NewMessage()
+ m.Marshal(value)
+ return m
+}
+
+func (m *message) Clear() { C.pn_message_clear(m.pn) }
+
+func (m *message) Copy(x Message) error {
+ if data, err := x.Encode(nil); err == nil {
+ return m.Decode(data)
+ } else {
+ return err
+ }
+}
+
+// ==== message get functions
+
+func rewindGet(data *C.pn_data_t) (v interface{}) {
+ C.pn_data_rewind(data)
+ C.pn_data_next(data)
+ unmarshal(&v, data)
+ return v
+}
+
+func rewindMap(data *C.pn_data_t) (v map[string]interface{}) {
+ C.pn_data_rewind(data)
+ C.pn_data_next(data)
+ unmarshal(&v, data)
+ return v
+}
+
+func (m *message) Inferred() bool { return bool(C.pn_message_is_inferred(m.pn)) }
+func (m *message) Durable() bool { return bool(C.pn_message_is_durable(m.pn)) }
+func (m *message) Priority() uint8 { return uint8(C.pn_message_get_priority(m.pn)) }
+func (m *message) TTL() time.Duration {
+ return time.Duration(C.pn_message_get_ttl(m.pn)) * time.Millisecond
+}
+func (m *message) FirstAcquirer() bool { return bool(C.pn_message_is_first_acquirer(m.pn)) }
+func (m *message) DeliveryCount() uint32 { return uint32(C.pn_message_get_delivery_count(m.pn)) }
+func (m *message) MessageId() interface{} { return rewindGet(C.pn_message_id(m.pn)) }
+func (m *message) UserId() string { return goString(C.pn_message_get_user_id(m.pn)) }
+func (m *message) Address() string { return C.GoString(C.pn_message_get_address(m.pn)) }
+func (m *message) Subject() string { return C.GoString(C.pn_message_get_subject(m.pn)) }
+func (m *message) ReplyTo() string { return C.GoString(C.pn_message_get_reply_to(m.pn)) }
+func (m *message) CorrelationId() interface{} { return rewindGet(C.pn_message_correlation_id(m.pn)) }
+func (m *message) ContentType() string { return C.GoString(C.pn_message_get_content_type(m.pn)) }
+func (m *message) ContentEncoding() string { return C.GoString(C.pn_message_get_content_encoding(m.pn)) }
+
+func (m *message) ExpiryTime() time.Time {
+ return time.Unix(0, int64(time.Millisecond*time.Duration(C.pn_message_get_expiry_time(m.pn))))
+}
+func (m *message) CreationTime() time.Time {
+ return time.Unix(0, int64(time.Millisecond)*int64(C.pn_message_get_creation_time(m.pn)))
+}
+func (m *message) GroupId() string { return C.GoString(C.pn_message_get_group_id(m.pn)) }
+func (m *message) GroupSequence() int32 { return int32(C.pn_message_get_group_sequence(m.pn)) }
+func (m *message) ReplyToGroupId() string { return C.GoString(C.pn_message_get_reply_to_group_id(m.pn)) }
+
+func (m *message) Instructions() map[string]interface{} {
+ return rewindMap(C.pn_message_instructions(m.pn))
+}
+func (m *message) Annotations() map[string]interface{} {
+ return rewindMap(C.pn_message_annotations(m.pn))
+}
+func (m *message) Properties() map[string]interface{} {
+ return rewindMap(C.pn_message_properties(m.pn))
+}
+
+// ==== message set methods
+
+func setData(v interface{}, data *C.pn_data_t) {
+ C.pn_data_clear(data)
+ marshal(v, data)
+}
+
+func dataString(data *C.pn_data_t) string {
+ str := C.pn_string(C.CString(""))
+ defer C.pn_free(unsafe.Pointer(str))
+ C.pn_inspect(unsafe.Pointer(data), str)
+ return C.GoString(C.pn_string_get(str))
+}
+
+func (m *message) SetInferred(b bool) { C.pn_message_set_inferred(m.pn, C.bool(m.Inferred())) }
+func (m *message) SetDurable(b bool) { C.pn_message_set_durable(m.pn, C.bool(b)) }
+func (m *message) SetPriority(b uint8) { C.pn_message_set_priority(m.pn, C.uint8_t(b)) }
+func (m *message) SetTTL(d time.Duration) {
+ C.pn_message_set_ttl(m.pn, C.pn_millis_t(d/time.Millisecond))
+}
+func (m *message) SetFirstAcquirer(b bool) { C.pn_message_set_first_acquirer(m.pn, C.bool(b)) }
+func (m *message) SetDeliveryCount(c uint32) { C.pn_message_set_delivery_count(m.pn, C.uint32_t(c)) }
+func (m *message) SetMessageId(id interface{}) { setData(id, C.pn_message_id(m.pn)) }
+func (m *message) SetUserId(s string) { C.pn_message_set_user_id(m.pn, pnBytes(([]byte)(s))) }
+func (m *message) SetAddress(s string) {
+ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_address))
+}
+func (m *message) SetSubject(s string) {
+ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_subject))
+}
+func (m *message) SetReplyTo(s string) {
+ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to))
+}
+func (m *message) SetCorrelationId(c interface{}) { setData(c, C.pn_message_correlation_id(m.pn)) }
+func (m *message) SetContentType(s string) {
+ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_type))
+}
+func (m *message) SetContentEncoding(s string) {
+ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_encoding))
+}
+func (m *message) SetExpiryTime(t time.Time) { C.pn_message_set_expiry_time(m.pn, pnTime(t)) }
+func (m *message) SetCreationTime(t time.Time) { C.pn_message_set_creation_time(m.pn, pnTime(t)) }
+func (m *message) SetGroupId(s string) {
+ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_group_id))
+}
+func (m *message) SetGroupSequence(s int32) {
+ C.pn_message_set_group_sequence(m.pn, C.pn_sequence_t(s))
+}
+func (m *message) SetReplyToGroupId(s string) {
+ C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to_group_id))
+}
+
+func (m *message) SetInstructions(v map[string]interface{}) {
+ setData(v, C.pn_message_instructions(m.pn))
+}
+func (m *message) SetAnnotations(v map[string]interface{}) { setData(v, C.pn_message_annotations(m.pn)) }
+func (m *message) SetProperties(v map[string]interface{}) { setData(v, C.pn_message_properties(m.pn)) }
+
+// Marshal/Unmarshal body
+func (m *message) Marshal(v interface{}) { clearMarshal(v, C.pn_message_body(m.pn)) }
+func (m *message) Unmarshal(v interface{}) { rewindUnmarshal(v, C.pn_message_body(m.pn)) }
+func (m *message) Body() (v interface{}) { m.Unmarshal(&v); return }
+
+func (m *message) Decode(data []byte) error {
+ m.Clear()
+ if len(data) == 0 {
- return internal.Errorf("empty buffer for decode")
++ return fmt.Errorf("empty buffer for decode")
+ }
+ if C.pn_message_decode(m.pn, cPtr(data), cLen(data)) < 0 {
- return internal.Errorf("decoding message: %s",
- internal.PnError(unsafe.Pointer(C.pn_message_error(m.pn))))
++ return fmt.Errorf("decoding message: %s", PnError(C.pn_message_error(m.pn)))
+ }
+ return nil
+}
+
+func DecodeMessage(data []byte) (m Message, err error) {
+ m = NewMessage()
+ err = m.Decode(data)
+ return
+}
+
+func (m *message) Encode(buffer []byte) ([]byte, error) {
+ encode := func(buf []byte) ([]byte, error) {
+ len := cLen(buf)
+ result := C.pn_message_encode(m.pn, cPtr(buf), &len)
+ switch {
+ case result == C.PN_OVERFLOW:
+ return buf, overflow
+ case result < 0:
- return buf, internal.Errorf("cannot encode message: %s", internal.PnErrorCode(result))
++ return buf, fmt.Errorf("cannot encode message: %s", PnErrorCode(result))
+ default:
+ return buf[:len], nil
+ }
+ }
+ return encodeGrow(buffer, encode)
+}
+
+// TODO aconway 2015-09-14: Multi-section messages.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/amqp/types.go
----------------------------------------------------------------------
diff --cc amqp/types.go
index 796da66,0000000..abcff25
mode 100644,000000..100644
--- a/amqp/types.go
+++ b/amqp/types.go
@@@ -1,199 -1,0 +1,203 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package amqp
+
+// #include <proton/codec.h>
+import "C"
+
+import (
+ "bytes"
+ "fmt"
+ "reflect"
+ "time"
+ "unsafe"
+)
+
+type Type C.pn_type_t
+
++// Older proton versions don't define C.PN_INVALID, so define it here.
++// In C it is pn_type_t(-1), in Go use the bitwise NOT operator to get the same value.
++const pnInvalid = ^C.pn_type_t(0)
++
+func (t Type) String() string {
+ switch C.pn_type_t(t) {
+ case C.PN_NULL:
+ return "null"
+ case C.PN_BOOL:
+ return "bool"
+ case C.PN_UBYTE:
+ return "ubyte"
+ case C.PN_BYTE:
+ return "byte"
+ case C.PN_USHORT:
+ return "ushort"
+ case C.PN_SHORT:
+ return "short"
+ case C.PN_CHAR:
+ return "char"
+ case C.PN_UINT:
+ return "uint"
+ case C.PN_INT:
+ return "int"
+ case C.PN_ULONG:
+ return "ulong"
+ case C.PN_LONG:
+ return "long"
+ case C.PN_TIMESTAMP:
+ return "timestamp"
+ case C.PN_FLOAT:
+ return "float"
+ case C.PN_DOUBLE:
+ return "double"
+ case C.PN_DECIMAL32:
+ return "decimal32"
+ case C.PN_DECIMAL64:
+ return "decimal64"
+ case C.PN_DECIMAL128:
+ return "decimal128"
+ case C.PN_UUID:
+ return "uuid"
+ case C.PN_BINARY:
+ return "binary"
+ case C.PN_STRING:
+ return "string"
+ case C.PN_SYMBOL:
+ return "symbol"
+ case C.PN_DESCRIBED:
+ return "described"
+ case C.PN_ARRAY:
+ return "array"
+ case C.PN_LIST:
+ return "list"
+ case C.PN_MAP:
+ return "map"
+ default:
- if uint32(t) == uint32(C.PN_INVALID) {
++ if uint32(t) == uint32(pnInvalid) {
+ return "no-data"
+ }
+ return fmt.Sprintf("unknown-type(%d)", t)
+ }
+}
+
+// Go types
+var (
+ bytesType = reflect.TypeOf([]byte{})
+ valueType = reflect.TypeOf(reflect.Value{})
+)
+
+// TODO aconway 2015-04-08: can't handle AMQP maps with key types that are not valid Go map keys.
+
+// Map is a generic map that can have mixed key and value types and so can represent any AMQP map
+type Map map[interface{}]interface{}
+
+// List is a generic list that can hold mixed values and can represent any AMQP list.
+//
+type List []interface{}
+
+// Symbol is a string that is encoded as an AMQP symbol
+type Symbol string
+
+func (s Symbol) GoString() string { return fmt.Sprintf("s\"%s\"", s) }
+
+// Binary is a string that is encoded as an AMQP binary.
+// It is a string rather than a byte[] because byte[] is not hashable and can't be used as
+// a map key, AMQP frequently uses binary types as map keys. It can convert to and from []byte
+type Binary string
+
+func (b Binary) GoString() string { return fmt.Sprintf("b\"%s\"", b) }
+
+// GoString for Map prints values with their types, useful for debugging.
+func (m Map) GoString() string {
+ out := &bytes.Buffer{}
+ fmt.Fprintf(out, "%T{", m)
+ i := len(m)
+ for k, v := range m {
+ fmt.Fprintf(out, "%T(%#v): %T(%#v)", k, k, v, v)
+ i--
+ if i > 0 {
+ fmt.Fprint(out, ", ")
+ }
+ }
+ fmt.Fprint(out, "}")
+ return out.String()
+}
+
+// GoString for List prints values with their types, useful for debugging.
+func (l List) GoString() string {
+ out := &bytes.Buffer{}
+ fmt.Fprintf(out, "%T{", l)
+ for i := 0; i < len(l); i++ {
+ fmt.Fprintf(out, "%T(%#v)", l[i], l[i])
+ if i == len(l)-1 {
+ fmt.Fprint(out, ", ")
+ }
+ }
+ fmt.Fprint(out, "}")
+ return out.String()
+}
+
+// pnTime converts Go time.Time to Proton millisecond Unix time.
+func pnTime(t time.Time) C.pn_timestamp_t {
+ secs := t.Unix()
+ // Note: sub-second accuracy is not guaraunteed if the Unix time in
+ // nanoseconds cannot be represented by an int64 (sometime around year 2260)
+ msecs := (t.UnixNano() % int64(time.Second)) / int64(time.Millisecond)
+ return C.pn_timestamp_t(secs*1000 + msecs)
+}
+
+// goTime converts a pn_timestamp_t to a Go time.Time.
+func goTime(t C.pn_timestamp_t) time.Time {
+ secs := int64(t) / 1000
+ nsecs := (int64(t) % 1000) * int64(time.Millisecond)
+ return time.Unix(secs, nsecs)
+}
+
+func goBytes(cBytes C.pn_bytes_t) (bytes []byte) {
+ if cBytes.start != nil {
+ bytes = C.GoBytes(unsafe.Pointer(cBytes.start), C.int(cBytes.size))
+ }
+ return
+}
+
+func goString(cBytes C.pn_bytes_t) (str string) {
+ if cBytes.start != nil {
+ str = C.GoStringN(cBytes.start, C.int(cBytes.size))
+ }
+ return
+}
+
+func pnBytes(b []byte) C.pn_bytes_t {
+ if len(b) == 0 {
+ return C.pn_bytes_t{0, nil}
+ } else {
+ return C.pn_bytes_t{C.size_t(len(b)), (*C.char)(unsafe.Pointer(&b[0]))}
+ }
+}
+
+func cPtr(b []byte) *C.char {
+ if len(b) == 0 {
+ return nil
+ }
+ return (*C.char)(unsafe.Pointer(&b[0]))
+}
+
+func cLen(b []byte) C.size_t {
+ return C.size_t(len(b))
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/amqp/unmarshal.go
----------------------------------------------------------------------
diff --cc amqp/unmarshal.go
index 751921d,0000000..25bb519
mode 100644,000000..100644
--- a/amqp/unmarshal.go
+++ b/amqp/unmarshal.go
@@@ -1,558 -1,0 +1,561 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+oor more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package amqp
+
+// #include <proton/codec.h>
+import "C"
+
+import (
+ "bytes"
+ "fmt"
+ "io"
- "qpid.apache.org/internal"
+ "reflect"
+ "unsafe"
+)
+
+const minDecode = 1024
+
+// Error returned if AMQP data cannot be unmarshaled as the desired Go type.
+type UnmarshalError struct {
+ // The name of the AMQP type.
+ AMQPType string
+ // The Go type.
+ GoType reflect.Type
+}
+
+func newUnmarshalError(pnType C.pn_type_t, v interface{}) *UnmarshalError {
+ return &UnmarshalError{Type(pnType).String(), reflect.TypeOf(v)}
+}
+
+func (e UnmarshalError) Error() string {
+ if e.GoType.Kind() != reflect.Ptr {
- return fmt.Sprintf("proton: cannot unmarshal to type %s, not a pointer", e.GoType)
++ return fmt.Sprintf("cannot unmarshal to type %s, not a pointer", e.GoType)
+ } else {
- return fmt.Sprintf("proton: cannot unmarshal AMQP %s to %s", e.AMQPType, e.GoType)
++ return fmt.Sprintf("cannot unmarshal AMQP %s to %s", e.AMQPType, e.GoType)
+ }
+}
+
+func doRecover(err *error) {
+ r := recover()
+ switch r := r.(type) {
+ case nil:
- case *UnmarshalError, internal.Error:
- *err = r.(error)
++ case *UnmarshalError:
++ *err = r
+ default:
+ panic(r)
+ }
+}
+
+//
+// Decoding from a pn_data_t
+//
+// NOTE: we use panic() to signal a decoding error, simplifies decoding logic.
+// We recover() at the highest possible level - i.e. in the exported Unmarshal or Decode.
+//
+
+// Decoder decodes AMQP values from an io.Reader.
+//
+type Decoder struct {
+ reader io.Reader
+ buffer bytes.Buffer
+}
+
+// NewDecoder returns a new decoder that reads from r.
+//
+// The decoder has it's own buffer and may read more data than required for the
+// AMQP values requested. Use Buffered to see if there is data left in the
+// buffer.
+//
+func NewDecoder(r io.Reader) *Decoder {
+ return &Decoder{r, bytes.Buffer{}}
+}
+
+// Buffered returns a reader of the data remaining in the Decoder's buffer. The
+// reader is valid until the next call to Decode.
+//
+func (d *Decoder) Buffered() io.Reader {
+ return bytes.NewReader(d.buffer.Bytes())
+}
+
+// Decode reads the next AMQP value from the Reader and stores it in the value pointed to by v.
+//
+// See the documentation for Unmarshal for details about the conversion of AMQP into a Go value.
+//
+func (d *Decoder) Decode(v interface{}) (err error) {
+ defer doRecover(&err)
+ data := C.pn_data(0)
+ defer C.pn_data_free(data)
+ var n int
- for n == 0 && err == nil {
- n = decode(data, d.buffer.Bytes())
++ for n == 0 {
++ n, err = decode(data, d.buffer.Bytes())
++ if err != nil {
++ return err
++ }
+ if n == 0 { // n == 0 means not enough data, read more
+ err = d.more()
+ } else {
+ unmarshal(v, data)
+ }
+ }
+ d.buffer.Next(n)
+ return
+}
+
+/*
+Unmarshal decodes AMQP-encoded bytes and stores the result in the value pointed to by v.
+Types are converted as follows:
+
+ +---------------------------+----------------------------------------------------------------------+
+ |To Go types |From AMQP types |
+ +===========================+======================================================================+
+ |bool |bool |
+ +---------------------------+----------------------------------------------------------------------+
+ |int, int8, int16, |Equivalent or smaller signed integer type: byte, short, int, long. |
+ |int32, int64 | |
+ +---------------------------+----------------------------------------------------------------------+
+ |uint, uint8, uint16, |Equivalent or smaller unsigned integer type: ubyte, ushort, uint, |
+ |uint32, uint64 types |ulong |
+ +---------------------------+----------------------------------------------------------------------+
+ |float32, float64 |Equivalent or smaller float or double. |
+ +---------------------------+----------------------------------------------------------------------+
+ |string, []byte |string, symbol or binary. |
+ +---------------------------+----------------------------------------------------------------------+
+ |Symbol |symbol |
+ +---------------------------+----------------------------------------------------------------------+
+ |map[K]T |map, provided all keys and values can unmarshal to types K, T |
+ +---------------------------+----------------------------------------------------------------------+
+ |Map |map, any AMQP map |
+ +---------------------------+----------------------------------------------------------------------+
+ |interface{} |Any AMQP value can be unmarshaled to an interface{} as follows: |
+ | +------------------------+---------------------------------------------+
+ | |AMQP Type |Go Type in interface{} |
+ | +========================+=============================================+
+ | |bool |bool |
+ | +------------------------+---------------------------------------------+
+ | |byte,short,int,long |int8,int16,int32,int64 |
+ | +------------------------+---------------------------------------------+
+ | |ubyte,ushort,uint,ulong |uint8,uint16,uint32,uint64 |
+ | +------------------------+---------------------------------------------+
+ | |float, double |float32, float64 |
+ | +------------------------+---------------------------------------------+
+ | |string |string |
+ | +------------------------+---------------------------------------------+
+ | |symbol |Symbol |
+ | +------------------------+---------------------------------------------+
+ | |binary |Binary |
+ | +------------------------+---------------------------------------------+
+ | |nulll |nil |
+ | +------------------------+---------------------------------------------+
+ | |map |Map |
+ | +------------------------+---------------------------------------------+
+ | |list |List |
+ +---------------------------+------------------------+---------------------------------------------+
+
+The following Go types cannot be unmarshaled: uintptr, function, interface, channel.
+
+TODO
+
+Go types: array, struct.
+
+AMQP types: decimal32/64/128, char (round trip), timestamp, uuid, array, multi-section message bodies.
+
+AMQP maps with mixed/unhashable key types need an alternate representation.
+
+Described types.
+*/
+func Unmarshal(bytes []byte, v interface{}) (n int, err error) {
+ defer doRecover(&err)
+
+ data := C.pn_data(0)
+ defer C.pn_data_free(data)
- n = decode(data, bytes)
++ n, err = decode(data, bytes)
++ if err != nil {
++ return 0, err
++ }
+ if n == 0 {
- err = internal.Errorf("not enough data")
++ return 0, fmt.Errorf("not enough data")
+ } else {
+ unmarshal(v, data)
+ }
- return
++ return n, nil
+}
+
+// more reads more data when we can't parse a complete AMQP type
+func (d *Decoder) more() error {
+ var readSize int64 = minDecode
+ if int64(d.buffer.Len()) > readSize { // Grow by doubling
+ readSize = int64(d.buffer.Len())
+ }
+ var n int64
+ n, err := d.buffer.ReadFrom(io.LimitReader(d.reader, readSize))
+ if n == 0 && err == nil { // ReadFrom won't report io.EOF, just returns 0
+ err = io.EOF
+ }
+ return err
+}
+
+// Unmarshal from data into value pointed at by v.
+func unmarshal(v interface{}, data *C.pn_data_t) {
+ pnType := C.pn_data_type(data)
+ switch v := v.(type) {
+ case *bool:
+ switch pnType {
+ case C.PN_BOOL:
+ *v = bool(C.pn_data_get_bool(data))
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+ case *int8:
+ switch pnType {
+ case C.PN_CHAR:
+ *v = int8(C.pn_data_get_char(data))
+ case C.PN_BYTE:
+ *v = int8(C.pn_data_get_byte(data))
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+ case *uint8:
+ switch pnType {
+ case C.PN_CHAR:
+ *v = uint8(C.pn_data_get_char(data))
+ case C.PN_UBYTE:
+ *v = uint8(C.pn_data_get_ubyte(data))
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+ case *int16:
+ switch pnType {
+ case C.PN_CHAR:
+ *v = int16(C.pn_data_get_char(data))
+ case C.PN_BYTE:
+ *v = int16(C.pn_data_get_byte(data))
+ case C.PN_SHORT:
+ *v = int16(C.pn_data_get_short(data))
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+ case *uint16:
+ switch pnType {
+ case C.PN_CHAR:
+ *v = uint16(C.pn_data_get_char(data))
+ case C.PN_UBYTE:
+ *v = uint16(C.pn_data_get_ubyte(data))
+ case C.PN_USHORT:
+ *v = uint16(C.pn_data_get_ushort(data))
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+ case *int32:
+ switch pnType {
+ case C.PN_CHAR:
+ *v = int32(C.pn_data_get_char(data))
+ case C.PN_BYTE:
+ *v = int32(C.pn_data_get_byte(data))
+ case C.PN_SHORT:
+ *v = int32(C.pn_data_get_short(data))
+ case C.PN_INT:
+ *v = int32(C.pn_data_get_int(data))
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+ case *uint32:
+ switch pnType {
+ case C.PN_CHAR:
+ *v = uint32(C.pn_data_get_char(data))
+ case C.PN_UBYTE:
+ *v = uint32(C.pn_data_get_ubyte(data))
+ case C.PN_USHORT:
+ *v = uint32(C.pn_data_get_ushort(data))
+ case C.PN_UINT:
+ *v = uint32(C.pn_data_get_uint(data))
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+
+ case *int64:
+ switch pnType {
+ case C.PN_CHAR:
+ *v = int64(C.pn_data_get_char(data))
+ case C.PN_BYTE:
+ *v = int64(C.pn_data_get_byte(data))
+ case C.PN_SHORT:
+ *v = int64(C.pn_data_get_short(data))
+ case C.PN_INT:
+ *v = int64(C.pn_data_get_int(data))
+ case C.PN_LONG:
+ *v = int64(C.pn_data_get_long(data))
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+
+ case *uint64:
+ switch pnType {
+ case C.PN_CHAR:
+ *v = uint64(C.pn_data_get_char(data))
+ case C.PN_UBYTE:
+ *v = uint64(C.pn_data_get_ubyte(data))
+ case C.PN_USHORT:
+ *v = uint64(C.pn_data_get_ushort(data))
+ case C.PN_ULONG:
+ *v = uint64(C.pn_data_get_ulong(data))
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+
+ case *int:
+ switch pnType {
+ case C.PN_CHAR:
+ *v = int(C.pn_data_get_char(data))
+ case C.PN_BYTE:
+ *v = int(C.pn_data_get_byte(data))
+ case C.PN_SHORT:
+ *v = int(C.pn_data_get_short(data))
+ case C.PN_INT:
+ *v = int(C.pn_data_get_int(data))
+ case C.PN_LONG:
+ if unsafe.Sizeof(0) == 8 {
+ *v = int(C.pn_data_get_long(data))
+ } else {
+ panic(newUnmarshalError(pnType, v))
+ }
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+
+ case *uint:
+ switch pnType {
+ case C.PN_CHAR:
+ *v = uint(C.pn_data_get_char(data))
+ case C.PN_UBYTE:
+ *v = uint(C.pn_data_get_ubyte(data))
+ case C.PN_USHORT:
+ *v = uint(C.pn_data_get_ushort(data))
+ case C.PN_UINT:
+ *v = uint(C.pn_data_get_uint(data))
+ case C.PN_ULONG:
+ if unsafe.Sizeof(0) == 8 {
+ *v = uint(C.pn_data_get_ulong(data))
+ } else {
+ panic(newUnmarshalError(pnType, v))
+ }
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+
+ case *float32:
+ switch pnType {
+ case C.PN_FLOAT:
+ *v = float32(C.pn_data_get_float(data))
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+
+ case *float64:
+ switch pnType {
+ case C.PN_FLOAT:
+ *v = float64(C.pn_data_get_float(data))
+ case C.PN_DOUBLE:
+ *v = float64(C.pn_data_get_double(data))
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+
+ case *string:
+ switch pnType {
+ case C.PN_STRING:
+ *v = goString(C.pn_data_get_string(data))
+ case C.PN_SYMBOL:
+ *v = goString(C.pn_data_get_symbol(data))
+ case C.PN_BINARY:
+ *v = goString(C.pn_data_get_binary(data))
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+
+ case *[]byte:
+ switch pnType {
+ case C.PN_STRING:
+ *v = goBytes(C.pn_data_get_string(data))
+ case C.PN_SYMBOL:
+ *v = goBytes(C.pn_data_get_symbol(data))
+ case C.PN_BINARY:
+ *v = goBytes(C.pn_data_get_binary(data))
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+
+ case *Binary:
+ switch pnType {
+ case C.PN_BINARY:
+ *v = Binary(goBytes(C.pn_data_get_binary(data)))
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+
+ case *Symbol:
+ switch pnType {
+ case C.PN_SYMBOL:
+ *v = Symbol(goBytes(C.pn_data_get_symbol(data)))
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+
+ case *interface{}:
+ getInterface(data, v)
+
+ default:
+ if reflect.TypeOf(v).Kind() != reflect.Ptr {
+ panic(newUnmarshalError(pnType, v))
+ }
+ switch reflect.TypeOf(v).Elem().Kind() {
+ case reflect.Map:
+ getMap(data, v)
+ case reflect.Slice:
+ getList(data, v)
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+ }
+ err := dataError("unmarshaling", data)
+ if err != nil {
+ panic(err)
+ }
+ return
+}
+
+func rewindUnmarshal(v interface{}, data *C.pn_data_t) {
+ C.pn_data_rewind(data)
+ C.pn_data_next(data)
+ unmarshal(v, data)
+}
+
+// Getting into an interface is driven completely by the AMQP type, since the interface{}
+// target is type-neutral.
+func getInterface(data *C.pn_data_t, v *interface{}) {
+ pnType := C.pn_data_type(data)
+ switch pnType {
- // Note PN_INVALID is defined outside the enum, older Go versions don't consider it a C.pn_type_t
- case C.PN_NULL, C.pn_type_t(C.PN_INVALID): // No data.
++ case C.PN_NULL, C.pn_type_t(pnInvalid): // No data.
+ *v = nil
+ case C.PN_BOOL:
+ *v = bool(C.pn_data_get_bool(data))
+ case C.PN_UBYTE:
+ *v = uint8(C.pn_data_get_ubyte(data))
+ case C.PN_BYTE:
+ *v = int8(C.pn_data_get_byte(data))
+ case C.PN_USHORT:
+ *v = uint16(C.pn_data_get_ushort(data))
+ case C.PN_SHORT:
+ *v = int16(C.pn_data_get_short(data))
+ case C.PN_UINT:
+ *v = uint32(C.pn_data_get_uint(data))
+ case C.PN_INT:
+ *v = int32(C.pn_data_get_int(data))
+ case C.PN_CHAR:
+ *v = uint8(C.pn_data_get_char(data))
+ case C.PN_ULONG:
+ *v = uint64(C.pn_data_get_ulong(data))
+ case C.PN_LONG:
+ *v = int64(C.pn_data_get_long(data))
+ case C.PN_FLOAT:
+ *v = float32(C.pn_data_get_float(data))
+ case C.PN_DOUBLE:
+ *v = float64(C.pn_data_get_double(data))
+ case C.PN_BINARY:
+ *v = Binary(goBytes(C.pn_data_get_binary(data)))
+ case C.PN_STRING:
+ *v = goString(C.pn_data_get_string(data))
+ case C.PN_SYMBOL:
+ *v = Symbol(goString(C.pn_data_get_symbol(data)))
+ case C.PN_MAP:
+ m := make(Map)
+ unmarshal(&m, data)
+ *v = m
+ case C.PN_LIST:
+ l := make(List, 0)
+ unmarshal(&l, data)
+ *v = l
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+}
+
+// get into map pointed at by v
+func getMap(data *C.pn_data_t, v interface{}) {
+ mapValue := reflect.ValueOf(v).Elem()
+ mapValue.Set(reflect.MakeMap(mapValue.Type())) // Clear the map
+ switch pnType := C.pn_data_type(data); pnType {
+ case C.PN_MAP:
+ count := int(C.pn_data_get_map(data))
+ if bool(C.pn_data_enter(data)) {
+ defer C.pn_data_exit(data)
+ for i := 0; i < count/2; i++ {
+ if bool(C.pn_data_next(data)) {
+ key := reflect.New(mapValue.Type().Key())
+ unmarshal(key.Interface(), data)
+ if bool(C.pn_data_next(data)) {
+ val := reflect.New(mapValue.Type().Elem())
+ unmarshal(val.Interface(), data)
+ mapValue.SetMapIndex(key.Elem(), val.Elem())
+ }
+ }
+ }
+ }
- // Note PN_INVALID is defined outside the enum, older Go versions don't consider it a C.pn_type_t
- case C.pn_type_t(C.PN_INVALID): // Leave the map empty
++ case C.pn_type_t(pnInvalid): // Leave the map empty
+ default:
+ panic(newUnmarshalError(pnType, v))
+ }
+}
+
+func getList(data *C.pn_data_t, v interface{}) {
+ pnType := C.pn_data_type(data)
+ if pnType != C.PN_LIST {
+ panic(newUnmarshalError(pnType, v))
+ }
+ count := int(C.pn_data_get_list(data))
+ listValue := reflect.MakeSlice(reflect.TypeOf(v).Elem(), count, count)
+ if bool(C.pn_data_enter(data)) {
+ for i := 0; i < count; i++ {
+ if bool(C.pn_data_next(data)) {
+ val := reflect.New(listValue.Type().Elem())
+ unmarshal(val.Interface(), data)
+ listValue.Index(i).Set(val.Elem())
+ }
+ }
+ C.pn_data_exit(data)
+ }
+ reflect.ValueOf(v).Elem().Set(listValue)
+}
+
+// decode from bytes.
+// Return bytes decoded or 0 if we could not decode a complete object.
+//
- func decode(data *C.pn_data_t, bytes []byte) int {
++func decode(data *C.pn_data_t, bytes []byte) (int, error) {
+ if len(bytes) == 0 {
- return 0
++ return 0, nil
+ }
+ n := int(C.pn_data_decode(data, cPtr(bytes), cLen(bytes)))
+ if n == int(C.PN_UNDERFLOW) {
+ C.pn_error_clear(C.pn_data_error(data))
- return 0
++ return 0, nil
+ } else if n <= 0 {
- panic(internal.Errorf("unmarshal %s", internal.PnErrorCode(n)))
++ return 0, fmt.Errorf("unmarshal %s", PnErrorCode(n))
+ }
- return n
++ return n, nil
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/amqp/url.go
----------------------------------------------------------------------
diff --cc amqp/url.go
index 0d0c662,0000000..70545d2
mode 100644,000000..100644
--- a/amqp/url.go
+++ b/amqp/url.go
@@@ -1,96 -1,0 +1,96 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package amqp
+
+/*
+#include <stdlib.h>
+#include <string.h>
+#include <proton/url.h>
+
+// Helper function for setting URL fields.
+typedef void (*setter_fn)(pn_url_t* url, const char* value);
+inline void set(pn_url_t *url, setter_fn s, const char* value) {
+ s(url, value);
+}
+*/
+import "C"
+
+import (
++ "fmt"
+ "net"
+ "net/url"
- "qpid.apache.org/internal"
+ "unsafe"
+)
+
+const (
+ amqp string = "amqp"
+ amqps = "amqps"
+)
+
+// ParseUrl parses an AMQP URL string and returns a net/url.Url.
+//
+// It is more forgiving than net/url.Parse and allows most of the parts of the
+// URL to be missing, assuming AMQP defaults.
+//
+func ParseURL(s string) (u *url.URL, err error) {
+ cstr := C.CString(s)
+ defer C.free(unsafe.Pointer(cstr))
+ pnUrl := C.pn_url_parse(cstr)
+ if pnUrl == nil {
- return nil, internal.Errorf("bad URL %#v", s)
++ return nil, fmt.Errorf("bad URL %#v", s)
+ }
+ defer C.pn_url_free(pnUrl)
+
+ scheme := C.GoString(C.pn_url_get_scheme(pnUrl))
+ username := C.GoString(C.pn_url_get_username(pnUrl))
+ password := C.GoString(C.pn_url_get_password(pnUrl))
+ host := C.GoString(C.pn_url_get_host(pnUrl))
+ port := C.GoString(C.pn_url_get_port(pnUrl))
+ path := C.GoString(C.pn_url_get_path(pnUrl))
+
+ if err != nil {
- return nil, internal.Errorf("bad URL %#v: %s", s, err)
++ return nil, fmt.Errorf("bad URL %#v: %s", s, err)
+ }
+ if scheme == "" {
+ scheme = amqp
+ }
+ if port == "" {
+ if scheme == amqps {
+ port = amqps
+ } else {
+ port = amqp
+ }
+ }
+ var user *url.Userinfo
+ if password != "" {
+ user = url.UserPassword(username, password)
+ } else if username != "" {
+ user = url.User(username)
+ }
+
+ u = &url.URL{
+ Scheme: scheme,
+ User: user,
+ Host: net.JoinHostPort(host, port),
+ Path: path,
+ }
+
+ return u, nil
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/amqp/url_test.go
----------------------------------------------------------------------
diff --cc amqp/url_test.go
index f80f1c4,0000000..99b656d
mode 100644,000000..100644
--- a/amqp/url_test.go
+++ b/amqp/url_test.go
@@@ -1,51 -1,0 +1,51 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package amqp
+
+import (
+ "fmt"
+)
+
+func ExampleParseURL() {
+ for _, s := range []string{
+ "amqp://username:password@host:1234/path",
+ "host:1234",
+ "host",
+ ":1234",
+ "host/path",
+ "amqps://host",
+ "",
+ } {
+ u, err := ParseURL(s)
+ if err != nil {
+ fmt.Println(err)
+ } else {
+ fmt.Println(u)
+ }
+ }
+ // Output:
+ // amqp://username:password@host:1234/path
+ // amqp://host:1234
+ // amqp://host:amqp
+ // amqp://:1234
+ // amqp://host:amqp/path
+ // amqps://host:amqps
- // proton: bad URL ""
++ // bad URL ""
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/connection.go
----------------------------------------------------------------------
diff --cc electron/connection.go
index d6761d6,0000000..8a9e6cd
mode 100644,000000..100644
--- a/electron/connection.go
+++ b/electron/connection.go
@@@ -1,218 -1,0 +1,238 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+// #include <proton/disposition.h>
+import "C"
+
+import (
+ "net"
- "qpid.apache.org/amqp"
- "qpid.apache.org/internal"
+ "qpid.apache.org/proton"
+ "sync"
+ "time"
+)
+
+// Connection is an AMQP connection, created by a Container.
+type Connection interface {
+ Endpoint
+
+ // Sender opens a new sender on the DefaultSession.
- //
- // v can be a string, which is used as the Target address, or a SenderSettings
- // struct containing more details settings.
- Sender(...LinkSetting) (Sender, error)
++ Sender(...LinkOption) (Sender, error)
+
+ // Receiver opens a new Receiver on the DefaultSession().
- //
- // v can be a string, which is used as the
- // Source address, or a ReceiverSettings struct containing more details
- // settings.
- Receiver(...LinkSetting) (Receiver, error)
++ Receiver(...LinkOption) (Receiver, error)
+
+ // DefaultSession() returns a default session for the connection. It is opened
+ // on the first call to DefaultSession and returned on subsequent calls.
+ DefaultSession() (Session, error)
+
+ // Session opens a new session.
- Session(...SessionSetting) (Session, error)
++ Session(...SessionOption) (Session, error)
+
+ // Container for the connection.
+ Container() Container
+
+ // Disconnect the connection abruptly with an error.
+ Disconnect(error)
+
+ // Wait waits for the connection to be disconnected.
+ Wait() error
+
+ // WaitTimeout is like Wait but returns Timeout if the timeout expires.
+ WaitTimeout(time.Duration) error
++
++ // Incoming returns a channel for incoming endpoints opened by the remote end.
++ //
++ // To enable, pass AllowIncoming() when creating the Connection. Otherwise all
++ // incoming endpoint requests are automatically rejected and Incoming()
++ // returns nil.
++ //
++ // An Incoming value can be an *IncomingSession, *IncomingSender or
++ // *IncomingReceiver. You must call Accept() to open the endpoint or Reject()
++ // to close it with an error. The specific Incoming types have additional
++ // methods to configure the endpoint.
++ //
++ // Not receiving from Incoming() or not calling Accept/Reject will block the
++ // electron event loop. Normally you would have a dedicated goroutine receive
++ // from Incoming() and start new goroutines to serve each incoming endpoint.
++ // The channel is closed when the Connection closes.
++ //
++ Incoming() <-chan Incoming
+}
+
- // ConnectionSetting can be passed when creating a connection.
- // See functions that return ConnectionSetting for details
- type ConnectionSetting func(*connection)
++// ConnectionOption can be passed when creating a connection to configure various options
++type ConnectionOption func(*connection)
+
- // Server setting puts the connection in server mode.
++// Server returns a ConnectionOption to put the connection in server mode.
+//
+// A server connection will do protocol negotiation to accept a incoming AMQP
+// connection. Normally you would call this for a connection created by
+// net.Listener.Accept()
+//
- func Server() ConnectionSetting { return func(c *connection) { c.engine.Server() } }
++func Server() ConnectionOption { return func(c *connection) { c.engine.Server() } }
+
- // Accepter provides a function to be called when a connection receives an incoming
- // request to open an endpoint, one of IncomingSession, IncomingSender or IncomingReceiver.
- //
- // The accept() function must not block or use the accepted endpoint.
- // It can pass the endpoint to another goroutine for processing.
- //
- // By default all incoming endpoints are rejected.
- func Accepter(accept func(Incoming)) ConnectionSetting {
- return func(c *connection) { c.accept = accept }
++// AllowIncoming returns a ConnectionOption to enable incoming endpoint open requests.
++// See Connection.Incoming()
++func AllowIncoming() ConnectionOption {
++ return func(c *connection) { c.incoming = make(chan Incoming) }
+}
+
+type connection struct {
+ endpoint
- listenOnce, defaultSessionOnce, closeOnce sync.Once
++ defaultSessionOnce, closeOnce sync.Once
+
+ container *container
+ conn net.Conn
- accept func(Incoming)
++ incoming chan Incoming
+ handler *handler
+ engine *proton.Engine
- err internal.ErrorHolder
+ eConnection proton.Connection
+
+ defaultSession Session
- done chan struct{}
+}
+
- func newConnection(conn net.Conn, cont *container, setting ...ConnectionSetting) (*connection, error) {
- c := &connection{container: cont, conn: conn, accept: func(Incoming) {}, done: make(chan struct{})}
++func newConnection(conn net.Conn, cont *container, setting ...ConnectionOption) (*connection, error) {
++ c := &connection{container: cont, conn: conn}
+ c.handler = newHandler(c)
+ var err error
+ c.engine, err = proton.NewEngine(c.conn, c.handler.delegator)
+ if err != nil {
+ return nil, err
+ }
+ for _, set := range setting {
+ set(c)
+ }
- c.str = c.engine.String()
++ c.endpoint = makeEndpoint(c.engine.String())
+ c.eConnection = c.engine.Connection()
- go func() { c.engine.Run(); close(c.done) }()
++ go c.run()
+ return c, nil
+}
+
++func (c *connection) run() {
++ c.engine.Run()
++ if c.incoming != nil {
++ close(c.incoming)
++ }
++ c.closed(Closed)
++}
++
+func (c *connection) Close(err error) { c.err.Set(err); c.engine.Close(err) }
+
+func (c *connection) Disconnect(err error) { c.err.Set(err); c.engine.Disconnect(err) }
+
- func (c *connection) Session(setting ...SessionSetting) (Session, error) {
++func (c *connection) Session(setting ...SessionOption) (Session, error) {
+ var s Session
+ err := c.engine.InjectWait(func() error {
++ if c.Error() != nil {
++ return c.Error()
++ }
+ eSession, err := c.engine.Connection().Session()
+ if err == nil {
+ eSession.Open()
+ if err == nil {
+ s = newSession(c, eSession, setting...)
+ }
+ }
+ return err
+ })
+ return s, err
+}
+
+func (c *connection) Container() Container { return c.container }
+
+func (c *connection) DefaultSession() (s Session, err error) {
+ c.defaultSessionOnce.Do(func() {
+ c.defaultSession, err = c.Session()
+ })
+ if err == nil {
+ err = c.Error()
+ }
+ return c.defaultSession, err
+}
+
- func (c *connection) Sender(setting ...LinkSetting) (Sender, error) {
++func (c *connection) Sender(setting ...LinkOption) (Sender, error) {
+ if s, err := c.DefaultSession(); err == nil {
+ return s.Sender(setting...)
+ } else {
+ return nil, err
+ }
+}
+
- func (c *connection) Receiver(setting ...LinkSetting) (Receiver, error) {
++func (c *connection) Receiver(setting ...LinkOption) (Receiver, error) {
+ if s, err := c.DefaultSession(); err == nil {
+ return s.Receiver(setting...)
+ } else {
+ return nil, err
+ }
+}
+
+func (c *connection) Connection() Connection { return c }
+
+func (c *connection) Wait() error { return c.WaitTimeout(Forever) }
+func (c *connection) WaitTimeout(timeout time.Duration) error {
+ _, err := timedReceive(c.done, timeout)
+ if err == Timeout {
+ return Timeout
+ }
+ return c.Error()
+}
+
++func (c *connection) Incoming() <-chan Incoming { return c.incoming }
++
+// Incoming is the interface for incoming requests to open an endpoint.
+// Implementing types are IncomingSession, IncomingSender and IncomingReceiver.
+type Incoming interface {
- // Accept the endpoint with default settings.
- //
- // You must not use the returned endpoint in the accept() function that
- // receives the Incoming value, but you can pass it to other goroutines.
- //
- // Implementing types provide type-specific Accept functions that take additional settings.
++ // Accept and open the endpoint.
+ Accept() Endpoint
+
+ // Reject the endpoint with an error
+ Reject(error)
+
- error() error
++ // wait for and call the accept function, call in proton goroutine.
++ wait() error
++ pEndpoint() proton.Endpoint
+}
+
+type incoming struct {
- err error
- accepted bool
++ endpoint proton.Endpoint
++ acceptCh chan func() error
++}
++
++func makeIncoming(e proton.Endpoint) incoming {
++ return incoming{endpoint: e, acceptCh: make(chan func() error)}
+}
+
- func (i *incoming) Reject(err error) { i.err = err }
++func (in *incoming) Reject(err error) { in.acceptCh <- func() error { return err } }
++
++// Call in proton goroutine, wait for and call the accept function fr
++func (in *incoming) wait() error { return (<-in.acceptCh)() }
++
++func (in *incoming) pEndpoint() proton.Endpoint { return in.endpoint }
+
- func (i *incoming) error() error {
- switch {
- case i.err != nil:
- return i.err
- case !i.accepted:
- return amqp.Errorf(amqp.NotAllowed, "remote open rejected")
- default:
++// Called in app goroutine to send an accept function to proton and return the resulting endpoint.
++func (in *incoming) accept(f func() Endpoint) Endpoint {
++ done := make(chan Endpoint)
++ in.acceptCh <- func() error {
++ ep := f()
++ done <- ep
+ return nil
+ }
++ return <-done
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/container.go
----------------------------------------------------------------------
diff --cc electron/container.go
index 7bbc4b0,0000000..b5ce6c0
mode 100644,000000..100644
--- a/electron/container.go
+++ b/electron/container.go
@@@ -1,71 -1,0 +1,77 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
+ "net"
- "qpid.apache.org/internal"
++ "qpid.apache.org/proton"
++ "strconv"
++ "sync/atomic"
+)
+
+// Container is an AMQP container, it represents a single AMQP "application".It
+// provides functions to create new Connections to remote containers.
+//
+// Create with NewContainer()
+//
+type Container interface {
+ // Id is a unique identifier for the container in your distributed application.
+ Id() string
+
+ // Create a new AMQP Connection over the supplied net.Conn connection.
+ //
+ // You must call Connection.Open() on the returned Connection, after
+ // setting any Connection properties you need to set. Note the net.Conn
+ // can be an outgoing connection (e.g. made with net.Dial) or an incoming
+ // connection (e.g. made with net.Listener.Accept())
- Connection(net.Conn, ...ConnectionSetting) (Connection, error)
++ Connection(net.Conn, ...ConnectionOption) (Connection, error)
+}
+
+type container struct {
- id string
- linkNames internal.IdCounter
++ id string
++ tagCounter uint64
++}
++
++func (cont *container) nextTag() string {
++ return strconv.FormatUint(atomic.AddUint64(&cont.tagCounter, 1), 32)
+}
+
+// NewContainer creates a new container. The id must be unique in your
+// distributed application, all connections created by the container
+// will have this container-id.
+//
+// If id == "" a random UUID will be generated for the id.
+func NewContainer(id string) Container {
+ if id == "" {
- id = internal.UUID4().String()
++ id = proton.UUID4().String()
+ }
+ cont := &container{id: id}
+ return cont
+}
+
+func (cont *container) Id() string { return cont.id }
+
+func (cont *container) nextLinkName() string {
- return cont.id + "@" + cont.linkNames.Next()
++ return cont.id + "@" + cont.nextTag()
+}
+
- func (cont *container) Connection(conn net.Conn, setting ...ConnectionSetting) (Connection, error) {
++func (cont *container) Connection(conn net.Conn, setting ...ConnectionOption) (Connection, error) {
+ return newConnection(conn, cont, setting...)
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/doc.go
----------------------------------------------------------------------
diff --cc electron/doc.go
index eaa6e7a,0000000..46bde37
mode 100644,000000..100644
--- a/electron/doc.go
+++ b/electron/doc.go
@@@ -1,57 -1,0 +1,63 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/*
+Package electron is a procedural, concurrent-safe Go library for AMQP messaging.
+You can write clients and servers using this library.
+
+Start by creating a Container with NewContainer. A Container represents a client
+or server application that can contain many incoming or outgoing connections.
+
+Create connections with the standard Go 'net' package using net.Dial or
+net.Listen. Create an AMQP connection over a net.Conn with
+Container.Connection() and open it with Connection.Open().
+
+AMQP sends messages over "links". Each link has a Sender end and a Receiver
+end. Connection.Sender() and Connection.Receiver() allow you to create links to
+Send() and Receive() messages.
+
+You can create an AMQP server connection by calling Connection.Server() and
+Connection.Listen() before calling Connection.Open(). A server connection can
+negotiate protocol security details and can accept incoming links opened from
- the remote end of the connection
++the remote end of the connection.
++
+*/
+package electron
+
+//#cgo LDFLAGS: -lqpid-proton
+import "C"
+
+// Just for package comment
+
+/* DEVELOPER NOTES
+
+There is a single proton.Engine per connection, each driving it's own event-loop goroutine,
+and each with a 'handler'. Most state for a connection is maintained on the handler, and
- only accessed in the event-loop goroutine, so no locks are required.
++only accessed in the event-loop goroutine, so no locks are required there.
+
+The handler sets up channels as needed to get or send data from user goroutines
- using electron types like Sender or Receiver. We also use Engine.Inject to inject
- actions into the event loop from user goroutines.
++using electron types like Sender or Receiver.
++
++We also use Engine.Inject to inject actions into the event loop from user
++goroutines. It is important to check at the start of an injected function that
++required objects are still valid, for example a link may be remotely closed
++between the time a Sender function calls Inject and the time the injected
++function is execute by the handler goroutine. See comments in endpoint.go for more.
+
+*/
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/endpoint.go
----------------------------------------------------------------------
diff --cc electron/endpoint.go
index 745fd04,0000000..8cbeadb
mode 100644,000000..100644
--- a/electron/endpoint.go
+++ b/electron/endpoint.go
@@@ -1,68 -1,0 +1,94 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
+ "io"
- "qpid.apache.org/internal"
+ "qpid.apache.org/proton"
+)
+
+// Closed is an alias for io.EOF. It is returned as an error when an endpoint
+// was closed cleanly.
+var Closed = io.EOF
+
+// Endpoint is the common interface for Connection, Session, Link, Sender and Receiver.
+//
+// Endpoints can be created locally or by the remote peer. You must Open() an
+// endpoint before you can use it. Some endpoints have additional Set*() methods
+// that must be called before Open() to take effect, see Connection, Session,
+// Link, Sender and Receiver for details.
+//
+type Endpoint interface {
+ // Close an endpoint and signal an error to the remote end if error != nil.
+ Close(error)
+
+ // String is a human readable identifier, useful for debugging and logging.
+ String() string
+
+ // Error returns nil if the endpoint is open, otherwise returns an error.
+ // Error() == Closed means the endpoint was closed without error.
+ Error() error
+
+ // Connection containing the endpoint
+ Connection() Connection
++
++ // Done returns a channel that will close when the endpoint closes.
++ // Error() will contain the reason.
++ Done() <-chan struct{}
+}
+
++// DEVELOPER NOTES
++//
++// An electron.Endpoint corresponds to a proton.Endpoint, which can be invalidated
++//
+type endpoint struct {
- err internal.ErrorHolder
- str string // Must be set by the value that embeds endpoint.
++ err proton.ErrorHolder
++ str string // Must be set by the value that embeds endpoint.
++ done chan struct{}
++}
++
++func makeEndpoint(s string) endpoint { return endpoint{str: s, done: make(chan struct{})} }
++
++// Called in handler on a Closed event. Marks the endpoint as closed and the corresponding
++// proton.Endpoint pointer as invalid. Injected functions should check Error() to ensure
++// the pointer has not been invalidated.
++//
++// Returns the error stored on the endpoint, which may not be different to err if there was
++// already a n error
++func (e *endpoint) closed(err error) error {
++ e.err.Set(err)
++ e.err.Set(Closed)
++ close(e.done)
++ return e.err.Get()
+}
+
+func (e *endpoint) String() string { return e.str }
- func (e *endpoint) Error() error { return e.err.Get() }
+
- // Call in proton goroutine to close an endpoint locally
++func (e *endpoint) Error() error { return e.err.Get() }
++
++func (e *endpoint) Done() <-chan struct{} { return e.done }
++
++// Call in proton goroutine to initiate closing an endpoint locally
+// handler will complete the close when remote end closes.
+func localClose(ep proton.Endpoint, err error) {
+ if ep.State().LocalActive() {
+ proton.CloseError(ep, err)
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/error.go
----------------------------------------------------------------------
diff --cc electron/error.go
index 0000000,0000000..4dcfd94
new file mode 100644
--- /dev/null
+++ b/electron/error.go
@@@ -1,0 -1,0 +1,35 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements. See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership. The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied. See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package electron
++
++import (
++ "fmt"
++)
++
++// assert panics if condition is false with optional formatted message
++func assert(condition bool, format ...interface{}) {
++ if !condition {
++ if len(format) > 0 {
++ panic(fmt.Errorf(format[0].(string), format[1:]...))
++ } else {
++ panic(fmt.Errorf("assertion failed"))
++ }
++ }
++}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/handler.go
----------------------------------------------------------------------
diff --cc electron/handler.go
index b518e42,0000000..0237156
mode 100644,000000..100644
--- a/electron/handler.go
+++ b/electron/handler.go
@@@ -1,158 -1,0 +1,187 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
+ "qpid.apache.org/amqp"
+ "qpid.apache.org/proton"
+)
+
+// NOTE: methods in this file are called only in the proton goroutine unless otherwise indicated.
+
+type handler struct {
+ delegator *proton.MessagingAdapter
+ connection *connection
+ links map[proton.Link]Link
- sentMessages map[proton.Delivery]*sentMessage
++ sentMessages map[proton.Delivery]sentMessage
+ sessions map[proton.Session]*session
+}
+
+func newHandler(c *connection) *handler {
+ h := &handler{
+ connection: c,
+ links: make(map[proton.Link]Link),
- sentMessages: make(map[proton.Delivery]*sentMessage),
++ sentMessages: make(map[proton.Delivery]sentMessage),
+ sessions: make(map[proton.Session]*session),
+ }
+ h.delegator = proton.NewMessagingAdapter(h)
+ // Disable auto features of MessagingAdapter, we do these ourselves.
+ h.delegator.Prefetch = 0
+ h.delegator.AutoAccept = false
+ h.delegator.AutoSettle = false
+ h.delegator.AutoOpen = false
+ return h
+}
+
- func (h *handler) internalError(fmt string, arg ...interface{}) {
- proton.CloseError(h.connection.eConnection, amqp.Errorf(amqp.InternalError, fmt, arg...))
++func (h *handler) linkError(l proton.Link, msg string) {
++ proton.CloseError(l, amqp.Errorf(amqp.InternalError, "%s for %s %s", msg, l.Type(), l))
+}
+
+func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) {
+ switch t {
+
+ case proton.MMessage:
+ if r, ok := h.links[e.Link()].(*receiver); ok {
+ r.message(e.Delivery())
+ } else {
- h.internalError("no receiver for link %s", e.Link())
++ h.linkError(e.Link(), "no receiver")
+ }
+
+ case proton.MSettled:
- if sm := h.sentMessages[e.Delivery()]; sm != nil {
- sm.settled(nil)
++ if sm, ok := h.sentMessages[e.Delivery()]; ok {
++ d := e.Delivery().Remote()
++ sm.ack <- Outcome{sentStatus(d.Type()), d.Condition().Error(), sm.value}
++ delete(h.sentMessages, e.Delivery())
+ }
+
+ case proton.MSendable:
+ if s, ok := h.links[e.Link()].(*sender); ok {
+ s.sendable()
+ } else {
- h.internalError("no receiver for link %s", e.Link())
++ h.linkError(e.Link(), "no sender")
+ }
+
+ case proton.MSessionOpening:
+ if e.Session().State().LocalUninit() { // Remotely opened
- incoming := &IncomingSession{h: h, pSession: e.Session()}
- h.connection.accept(incoming)
- if err := incoming.error(); err != nil {
- proton.CloseError(e.Session(), err)
- }
++ h.incoming(newIncomingSession(h, e.Session()))
+ }
+
+ case proton.MSessionClosed:
- err := proton.EndpointError(e.Session())
- for l, _ := range h.links {
- if l.Session() == e.Session() {
- h.linkClosed(l, err)
- }
- }
- delete(h.sessions, e.Session())
++ h.sessionClosed(e.Session(), proton.EndpointError(e.Session()))
+
+ case proton.MLinkOpening:
+ l := e.Link()
+ if l.State().LocalActive() { // Already opened locally.
+ break
+ }
+ ss := h.sessions[l.Session()]
+ if ss == nil {
- h.internalError("no session for link %s", e.Link())
++ h.linkError(e.Link(), "no session")
+ break
+ }
- var incoming Incoming
+ if l.IsReceiver() {
- incoming = &IncomingReceiver{makeIncomingLink(ss, l)}
++ h.incoming(&IncomingReceiver{makeIncomingLink(ss, l)})
+ } else {
- incoming = &IncomingSender{makeIncomingLink(ss, l)}
- }
- h.connection.accept(incoming)
- if err := incoming.error(); err != nil {
- proton.CloseError(l, err)
- break
++ h.incoming(&IncomingSender{makeIncomingLink(ss, l)})
+ }
+
+ case proton.MLinkClosing:
+ e.Link().Close()
+
+ case proton.MLinkClosed:
+ h.linkClosed(e.Link(), proton.EndpointError(e.Link()))
+
+ case proton.MConnectionClosing:
+ h.connection.err.Set(e.Connection().RemoteCondition().Error())
+
+ case proton.MConnectionClosed:
- h.connection.err.Set(Closed) // If no error already set, this is an orderly close.
++ h.connectionClosed(proton.EndpointError(e.Connection()))
+
+ case proton.MDisconnected:
+ h.connection.err.Set(e.Transport().Condition().Error())
+ // If err not set at this point (e.g. to Closed) then this is unexpected.
+ h.connection.err.Set(amqp.Errorf(amqp.IllegalState, "unexpected disconnect on %s", h.connection))
+
+ err := h.connection.Error()
++
+ for l, _ := range h.links {
+ h.linkClosed(l, err)
+ }
++ h.links = nil
+ for _, s := range h.sessions {
+ s.closed(err)
+ }
++ h.sessions = nil
+ for _, sm := range h.sentMessages {
- sm.settled(err)
++ sm.ack <- Outcome{Unacknowledged, err, sm.value}
+ }
++ h.sentMessages = nil
++ }
++}
++
++func (h *handler) incoming(in Incoming) {
++ var err error
++ if h.connection.incoming != nil {
++ h.connection.incoming <- in
++ err = in.wait()
++ } else {
++ err = amqp.Errorf(amqp.NotAllowed, "rejected incoming %s %s",
++ in.pEndpoint().Type(), in.pEndpoint().String())
++ }
++ if err == nil {
++ in.pEndpoint().Open()
++ } else {
++ proton.CloseError(in.pEndpoint(), err)
+ }
+}
+
++func (h *handler) addLink(pl proton.Link, el Link) {
++ h.links[pl] = el
++}
++
+func (h *handler) linkClosed(l proton.Link, err error) {
- if link := h.links[l]; link != nil {
++ if link, ok := h.links[l]; ok {
+ link.closed(err)
+ delete(h.links, l)
+ }
+}
+
- func (h *handler) addLink(rl proton.Link, ll Link) {
- h.links[rl] = ll
++func (h *handler) sessionClosed(ps proton.Session, err error) {
++ if s, ok := h.sessions[ps]; ok {
++ delete(h.sessions, ps)
++ err = s.closed(err)
++ for l, _ := range h.links {
++ if l.Session() == ps {
++ h.linkClosed(l, err)
++ }
++ }
++ }
++}
++
++func (h *handler) connectionClosed(err error) {
++ err = h.connection.closed(err)
++ // Close links first to avoid repeated scans of the link list by sessions.
++ for l, _ := range h.links {
++ h.linkClosed(l, err)
++ }
++ for s, _ := range h.sessions {
++ h.sessionClosed(s, err)
++ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org