You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2015/11/23 18:58:18 UTC

[50/50] [abbrv] qpid-proton git commit: Merge branch 'master' into go1

Merge branch 'master' into go1

# Conflicts:
#	readme-branch.md


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/9e788b2d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/9e788b2d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/9e788b2d

Branch: refs/heads/go1
Commit: 9e788b2d484a79154d9d05a93bd6447730ab7738
Parents: e1a83ee 1cfa056
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Nov 23 12:36:33 2015 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Nov 23 12:52:44 2015 -0500

----------------------------------------------------------------------
 amqp/error.go                |  39 ++-
 amqp/interop                 |   1 -
 amqp/marshal.go              |  10 +-
 amqp/message.go              |   9 +-
 amqp/types.go                |   6 +-
 amqp/unmarshal.go            |  41 +--
 amqp/url.go                  |   6 +-
 amqp/url_test.go             |   2 +-
 electron/connection.go       | 128 ++++++----
 electron/container.go        |  20 +-
 electron/doc.go              |  14 +-
 electron/endpoint.go         |  36 ++-
 electron/error.go            |  35 +++
 electron/handler.go          |  97 ++++---
 electron/link.go             |  86 ++++---
 electron/messaging_test.go   | 182 +++++++-------
 electron/receiver.go         |  60 +++--
 electron/sender.go           | 375 ++++++++++++---------------
 electron/session.go          |  73 +++---
 electron/time.go             |  13 +-
 internal/error.go            | 118 ---------
 internal/flexchannel.go      |  82 ------
 internal/flexchannel_test.go |  89 -------
 internal/safemap.go          |  57 -----
 internal/uuid.go             |  70 ------
 proton/engine.go             |  27 +-
 proton/error.go              | 104 ++++----
 proton/handlers.go           |   7 +-
 proton/interop_test.go       | 290 ---------------------
 proton/marshal.go            | 210 ----------------
 proton/message.go            |  29 ++-
 proton/types.go              | 151 -----------
 proton/unfinished.go         |  53 ----
 proton/unmarshal.go          | 517 --------------------------------------
 proton/url.go                |  96 -------
 proton/url_test.go           |  51 ----
 proton/uuid.go               |  57 +++++
 proton/wrappers.go           |  39 +--
 proton/wrappers_gen.go       |   9 +-
 readme-go-get.md             |  18 ++
 40 files changed, 873 insertions(+), 2434 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/amqp/error.go
----------------------------------------------------------------------
diff --cc amqp/error.go
index 868dbf3,0000000..349fc41
mode 100644,000000..100644
--- a/amqp/error.go
+++ b/amqp/error.go
@@@ -1,66 -1,0 +1,103 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package amqp
 +
++// #include <proton/error.h>
++import "C"
++
 +import (
 +	"fmt"
 +	"reflect"
 +)
 +
 +// Error is an AMQP error condition. It has a name and a description.
 +// It implements the Go error interface so can be returned as an error value.
 +//
 +// You can pass amqp.Error to methods that pass an error to a remote endpoint,
 +// this gives you full control over what the remote endpoint will see.
 +//
 +// You can also pass any Go error to such functions, the remote peer
 +// will see the equivalent of MakeError(error)
 +//
 +type Error struct{ Name, Description string }
 +
 +// Error implements the Go error interface for AMQP error errors.
- func (c Error) Error() string { return fmt.Sprintf("proton %s: %s", c.Name, c.Description) }
++func (c Error) Error() string { return fmt.Sprintf("%s: %s", c.Name, c.Description) }
 +
 +// Errorf makes a Error with name and formatted description as per fmt.Sprintf
 +func Errorf(name, format string, arg ...interface{}) Error {
 +	return Error{name, fmt.Sprintf(format, arg...)}
 +}
 +
 +// MakeError makes an AMQP error from a go error using the Go error type as the name
 +// and the err.Error() string as the description.
 +func MakeError(err error) Error {
 +	return Error{reflect.TypeOf(err).Name(), err.Error()}
 +}
 +
 +var (
 +	InternalError      = "amqp:internal-error"
 +	NotFound           = "amqp:not-found"
 +	UnauthorizedAccess = "amqp:unauthorized-access"
 +	DecodeError        = "amqp:decode-error"
 +	ResourceLimit      = "amqp:resource-limit"
 +	NotAllowed         = "amqp:not-allowed"
 +	InvalidField       = "amqp:invalid-field"
 +	NotImplemented     = "amqp:not-implemented"
 +	ResourceLocked     = "amqp:resource-locked"
 +	PreerrorFailed     = "amqp:preerror-failed"
 +	ResourceDeleted    = "amqp:resource-deleted"
 +	IllegalState       = "amqp:illegal-state"
 +	FrameSizeTooSmall  = "amqp:frame-size-too-small"
 +)
++
++type PnErrorCode int
++
++func (e PnErrorCode) String() string {
++	switch e {
++	case C.PN_EOS:
++		return "end-of-data"
++	case C.PN_ERR:
++		return "error"
++	case C.PN_OVERFLOW:
++		return "overflow"
++	case C.PN_UNDERFLOW:
++		return "underflow"
++	case C.PN_STATE_ERR:
++		return "bad-state"
++	case C.PN_ARG_ERR:
++		return "invalid-argument"
++	case C.PN_TIMEOUT:
++		return "timeout"
++	case C.PN_INTR:
++		return "interrupted"
++	case C.PN_INPROGRESS:
++		return "in-progress"
++	default:
++		return fmt.Sprintf("unknown-error(%d)", e)
++	}
++}
++
++func PnError(e *C.pn_error_t) error {
++	if e == nil || C.pn_error_code(e) == 0 {
++		return nil
++	}
++	return fmt.Errorf("%s: %s", PnErrorCode(C.pn_error_code(e)), C.GoString(C.pn_error_text(e)))
++}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/amqp/marshal.go
----------------------------------------------------------------------
diff --cc amqp/marshal.go
index 666b4f6,0000000..9930e13
mode 100644,000000..100644
--- a/amqp/marshal.go
+++ b/amqp/marshal.go
@@@ -1,250 -1,0 +1,250 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package amqp
 +
 +// #include <proton/codec.h>
 +import "C"
 +
 +import (
++	"fmt"
 +	"io"
- 	"qpid.apache.org/internal"
 +	"reflect"
 +	"unsafe"
 +)
 +
 +func dataError(prefix string, data *C.pn_data_t) error {
- 	err := internal.PnError(unsafe.Pointer(C.pn_data_error(data)))
++	err := PnError(C.pn_data_error(data))
 +	if err != nil {
- 		err = internal.Errorf("%s: %s", prefix, err.(internal.Error))
++		err = fmt.Errorf("%s: %s", prefix, err.Error())
 +	}
 +	return err
 +}
 +
 +/*
 +Marshal encodes a Go value as AMQP data in buffer.
 +If buffer is nil, or is not large enough, a new buffer  is created.
 +
 +Returns the buffer used for encoding with len() adjusted to the actual size of data.
 +
 +Go types are encoded as follows
 +
 + +-------------------------------------+--------------------------------------------+
 + |Go type                              |AMQP type                                   |
 + +-------------------------------------+--------------------------------------------+
 + |bool                                 |bool                                        |
 + +-------------------------------------+--------------------------------------------+
 + |int8, int16, int32, int64 (int)      |byte, short, int, long (int or long)        |
 + +-------------------------------------+--------------------------------------------+
 + |uint8, uint16, uint32, uint64 (uint) |ubyte, ushort, uint, ulong (uint or ulong)  |
 + +-------------------------------------+--------------------------------------------+
 + |float32, float64                     |float, double.                              |
 + +-------------------------------------+--------------------------------------------+
 + |string                               |string                                      |
 + +-------------------------------------+--------------------------------------------+
 + |[]byte, Binary                       |binary                                      |
 + +-------------------------------------+--------------------------------------------+
 + |Symbol                               |symbol                                      |
 + +-------------------------------------+--------------------------------------------+
 + |interface{}                          |the contained type                          |
 + +-------------------------------------+--------------------------------------------+
 + |nil                                  |null                                        |
 + +-------------------------------------+--------------------------------------------+
 + |map[K]T                              |map with K and T converted as above         |
 + +-------------------------------------+--------------------------------------------+
 + |Map                                  |map, may have mixed types for keys, values  |
 + +-------------------------------------+--------------------------------------------+
 + |[]T                                  |list with T converted as above              |
 + +-------------------------------------+--------------------------------------------+
 + |List                                 |list, may have mixed types  values          |
 + +-------------------------------------+--------------------------------------------+
 +
 +The following Go types cannot be marshaled: uintptr, function, interface, channel
 +
 +TODO
 +
 +Go types: array, slice, struct, complex64/128.
 +
 +AMQP types: decimal32/64/128, char, timestamp, uuid, array, multi-section message bodies.
 +
 +Described types.
 +
 +*/
 +func Marshal(v interface{}, buffer []byte) (outbuf []byte, err error) {
 +	defer doRecover(&err)
 +	data := C.pn_data(0)
 +	defer C.pn_data_free(data)
 +	marshal(v, data)
 +	encode := func(buf []byte) ([]byte, error) {
 +		n := int(C.pn_data_encode(data, cPtr(buf), cLen(buf)))
 +		switch {
 +		case n == int(C.PN_OVERFLOW):
 +			return buf, overflow
 +		case n < 0:
 +			return buf, dataError("marshal error", data)
 +		default:
 +			return buf[:n], nil
 +		}
 +	}
 +	return encodeGrow(buffer, encode)
 +}
 +
 +const minEncode = 256
 +
 +// overflow is returned when an encoding function can't fit data in the buffer.
- var overflow = internal.Errorf("buffer too small")
++var overflow = fmt.Errorf("buffer too small")
 +
 +// encodeFn encodes into buffer[0:len(buffer)].
 +// Returns buffer with length adjusted for data encoded.
 +// If buffer too small, returns overflow as error.
 +type encodeFn func(buffer []byte) ([]byte, error)
 +
 +// encodeGrow calls encode() into buffer, if it returns overflow grows the buffer.
 +// Returns the final buffer.
 +func encodeGrow(buffer []byte, encode encodeFn) ([]byte, error) {
 +	if buffer == nil || len(buffer) == 0 {
 +		buffer = make([]byte, minEncode)
 +	}
 +	var err error
 +	for buffer, err = encode(buffer); err == overflow; buffer, err = encode(buffer) {
 +		buffer = make([]byte, 2*len(buffer))
 +	}
 +	return buffer, err
 +}
 +
 +func marshal(v interface{}, data *C.pn_data_t) {
 +	switch v := v.(type) {
 +	case nil:
 +		C.pn_data_put_null(data)
 +	case bool:
 +		C.pn_data_put_bool(data, C.bool(v))
 +	case int8:
 +		C.pn_data_put_byte(data, C.int8_t(v))
 +	case int16:
 +		C.pn_data_put_short(data, C.int16_t(v))
 +	case int32:
 +		C.pn_data_put_int(data, C.int32_t(v))
 +	case int64:
 +		C.pn_data_put_long(data, C.int64_t(v))
 +	case int:
 +		if unsafe.Sizeof(0) == 8 {
 +			C.pn_data_put_long(data, C.int64_t(v))
 +		} else {
 +			C.pn_data_put_int(data, C.int32_t(v))
 +		}
 +	case uint8:
 +		C.pn_data_put_ubyte(data, C.uint8_t(v))
 +	case uint16:
 +		C.pn_data_put_ushort(data, C.uint16_t(v))
 +	case uint32:
 +		C.pn_data_put_uint(data, C.uint32_t(v))
 +	case uint64:
 +		C.pn_data_put_ulong(data, C.uint64_t(v))
 +	case uint:
 +		if unsafe.Sizeof(0) == 8 {
 +			C.pn_data_put_ulong(data, C.uint64_t(v))
 +		} else {
 +			C.pn_data_put_uint(data, C.uint32_t(v))
 +		}
 +	case float32:
 +		C.pn_data_put_float(data, C.float(v))
 +	case float64:
 +		C.pn_data_put_double(data, C.double(v))
 +	case string:
 +		C.pn_data_put_string(data, pnBytes([]byte(v)))
 +	case []byte:
 +		C.pn_data_put_binary(data, pnBytes(v))
 +	case Binary:
 +		C.pn_data_put_binary(data, pnBytes([]byte(v)))
 +	case Symbol:
 +		C.pn_data_put_symbol(data, pnBytes([]byte(v)))
 +	case Map: // Special map type
 +		C.pn_data_put_map(data)
 +		C.pn_data_enter(data)
 +		for key, val := range v {
 +			marshal(key, data)
 +			marshal(val, data)
 +		}
 +		C.pn_data_exit(data)
 +	default:
 +		switch reflect.TypeOf(v).Kind() {
 +		case reflect.Map:
 +			putMap(data, v)
 +		case reflect.Slice:
 +			putList(data, v)
 +		default:
- 			panic(internal.Errorf("cannot marshal %s to AMQP", reflect.TypeOf(v)))
++			panic(fmt.Errorf("cannot marshal %s to AMQP", reflect.TypeOf(v)))
 +		}
 +	}
 +	err := dataError("marshal", data)
 +	if err != nil {
 +		panic(err)
 +	}
 +	return
 +}
 +
 +func clearMarshal(v interface{}, data *C.pn_data_t) {
 +	C.pn_data_clear(data)
 +	marshal(v, data)
 +}
 +
 +func putMap(data *C.pn_data_t, v interface{}) {
 +	mapValue := reflect.ValueOf(v)
 +	C.pn_data_put_map(data)
 +	C.pn_data_enter(data)
 +	for _, key := range mapValue.MapKeys() {
 +		marshal(key.Interface(), data)
 +		marshal(mapValue.MapIndex(key).Interface(), data)
 +	}
 +	C.pn_data_exit(data)
 +}
 +
 +func putList(data *C.pn_data_t, v interface{}) {
 +	listValue := reflect.ValueOf(v)
 +	C.pn_data_put_list(data)
 +	C.pn_data_enter(data)
 +	for i := 0; i < listValue.Len(); i++ {
 +		marshal(listValue.Index(i).Interface(), data)
 +	}
 +	C.pn_data_exit(data)
 +}
 +
 +// Encoder encodes AMQP values to an io.Writer
 +type Encoder struct {
 +	writer io.Writer
 +	buffer []byte
 +}
 +
 +// New encoder returns a new encoder that writes to w.
 +func NewEncoder(w io.Writer) *Encoder {
 +	return &Encoder{w, make([]byte, minEncode)}
 +}
 +
 +func (e *Encoder) Encode(v interface{}) (err error) {
 +	e.buffer, err = Marshal(v, e.buffer)
 +	if err == nil {
 +		e.writer.Write(e.buffer)
 +	}
 +	return err
 +}
 +
 +func replace(data *C.pn_data_t, v interface{}) {
 +	C.pn_data_clear(data)
 +	marshal(v, data)
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/amqp/message.go
----------------------------------------------------------------------
diff --cc amqp/message.go
index 5ba4f4f,0000000..e36c6f2
mode 100644,000000..100644
--- a/amqp/message.go
+++ b/amqp/message.go
@@@ -1,347 -1,0 +1,346 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package amqp
 +
 +// #include <proton/types.h>
 +// #include <proton/message.h>
 +// #include <proton/codec.h>
 +// #include <stdlib.h>
 +//
 +// /* Helper for setting message string fields */
 +// typedef int (*set_fn)(pn_message_t*, const char*);
 +// int msg_set_str(pn_message_t* m, char* s, set_fn set) {
 +//     int result = set(m, s);
 +//     free(s);
 +//     return result;
 +// }
 +//
 +import "C"
 +
 +import (
- 	"qpid.apache.org/internal"
++	"fmt"
 +	"runtime"
 +	"time"
 +	"unsafe"
 +)
 +
 +// Message is the interface to an AMQP message.
 +type Message interface {
 +	// Durable indicates that any parties taking responsibility
 +	// for the message must durably store the content.
 +	Durable() bool
 +	SetDurable(bool)
 +
 +	// Priority impacts ordering guarantees. Within a
 +	// given ordered context, higher priority messages may jump ahead of
 +	// lower priority messages.
 +	Priority() uint8
 +	SetPriority(uint8)
 +
 +	// TTL or Time To Live, a message it may be dropped after this duration
 +	TTL() time.Duration
 +	SetTTL(time.Duration)
 +
 +	// FirstAcquirer indicates
 +	// that the recipient of the message is the first recipient to acquire
 +	// the message, i.e. there have been no failed delivery attempts to
 +	// other acquirers. Note that this does not mean the message has not
 +	// been delivered to, but not acquired, by other recipients.
 +	FirstAcquirer() bool
 +	SetFirstAcquirer(bool)
 +
 +	// DeliveryCount tracks how many attempts have been made to
 +	// delivery a message.
 +	DeliveryCount() uint32
 +	SetDeliveryCount(uint32)
 +
 +	// MessageId provides a unique identifier for a message.
 +	// it can be an a string, an unsigned long, a uuid or a
 +	// binary value.
 +	MessageId() interface{}
 +	SetMessageId(interface{})
 +
 +	UserId() string
 +	SetUserId(string)
 +
 +	Address() string
 +	SetAddress(string)
 +
 +	Subject() string
 +	SetSubject(string)
 +
 +	ReplyTo() string
 +	SetReplyTo(string)
 +
 +	// CorrelationId is set on correlated request and response messages. It can be
 +	// an a string, an unsigned long, a uuid or a binary value.
 +	CorrelationId() interface{}
 +	SetCorrelationId(interface{})
 +
 +	ContentType() string
 +	SetContentType(string)
 +
 +	ContentEncoding() string
 +	SetContentEncoding(string)
 +
 +	// ExpiryTime indicates an absoulte time when the message may be dropped.
 +	// A Zero time (i.e. t.isZero() == true) indicates a message never expires.
 +	ExpiryTime() time.Time
 +	SetExpiryTime(time.Time)
 +
 +	CreationTime() time.Time
 +	SetCreationTime(time.Time)
 +
 +	GroupId() string
 +	SetGroupId(string)
 +
 +	GroupSequence() int32
 +	SetGroupSequence(int32)
 +
 +	ReplyToGroupId() string
 +	SetReplyToGroupId(string)
 +
 +	// Instructions - AMQP delivery instructions.
 +	Instructions() map[string]interface{}
 +	SetInstructions(v map[string]interface{})
 +
 +	// Annotations - AMQP annotations.
 +	Annotations() map[string]interface{}
 +	SetAnnotations(v map[string]interface{})
 +
 +	// Properties - Application properties.
 +	Properties() map[string]interface{}
 +	SetProperties(v map[string]interface{})
 +
 +	// Inferred indicates how the message content
 +	// is encoded into AMQP sections. If inferred is true then binary and
 +	// list values in the body of the message will be encoded as AMQP DATA
 +	// and AMQP SEQUENCE sections, respectively. If inferred is false,
 +	// then all values in the body of the message will be encoded as AMQP
 +	// VALUE sections regardless of their type.
 +	Inferred() bool
 +	SetInferred(bool)
 +
 +	// Marshal a Go value into the message body. See amqp.Marshal() for details.
 +	Marshal(interface{})
 +
 +	// Unmarshal the message body into the value pointed to by v. See amqp.Unmarshal() for details.
 +	Unmarshal(interface{})
 +
 +	// Body value resulting from the default unmarshalling of message body as interface{}
 +	Body() interface{}
 +
 +	// Encode encodes the message as AMQP data. If buffer is non-nil and is large enough
 +	// the message is encoded into it, otherwise a new buffer is created.
 +	// Returns the buffer containing the message.
 +	Encode(buffer []byte) ([]byte, error)
 +
 +	// Decode data into this message. Overwrites an existing message content.
 +	Decode(buffer []byte) error
 +
 +	// Clear the message contents.
 +	Clear()
 +
 +	// Copy the contents of another message to this one.
 +	Copy(m Message) error
 +}
 +
 +type message struct{ pn *C.pn_message_t }
 +
 +func freeMessage(m *message) {
 +	C.pn_message_free(m.pn)
 +	m.pn = nil
 +}
 +
 +// NewMessage creates a new message instance.
 +func NewMessage() Message {
 +	m := &message{C.pn_message()}
 +	runtime.SetFinalizer(m, freeMessage)
 +	return m
 +}
 +
 +// NewMessageWith creates a message with value as the body. Equivalent to
 +//     m := NewMessage(); m.Marshal(body)
 +func NewMessageWith(value interface{}) Message {
 +	m := NewMessage()
 +	m.Marshal(value)
 +	return m
 +}
 +
 +func (m *message) Clear() { C.pn_message_clear(m.pn) }
 +
 +func (m *message) Copy(x Message) error {
 +	if data, err := x.Encode(nil); err == nil {
 +		return m.Decode(data)
 +	} else {
 +		return err
 +	}
 +}
 +
 +// ==== message get functions
 +
 +func rewindGet(data *C.pn_data_t) (v interface{}) {
 +	C.pn_data_rewind(data)
 +	C.pn_data_next(data)
 +	unmarshal(&v, data)
 +	return v
 +}
 +
 +func rewindMap(data *C.pn_data_t) (v map[string]interface{}) {
 +	C.pn_data_rewind(data)
 +	C.pn_data_next(data)
 +	unmarshal(&v, data)
 +	return v
 +}
 +
 +func (m *message) Inferred() bool  { return bool(C.pn_message_is_inferred(m.pn)) }
 +func (m *message) Durable() bool   { return bool(C.pn_message_is_durable(m.pn)) }
 +func (m *message) Priority() uint8 { return uint8(C.pn_message_get_priority(m.pn)) }
 +func (m *message) TTL() time.Duration {
 +	return time.Duration(C.pn_message_get_ttl(m.pn)) * time.Millisecond
 +}
 +func (m *message) FirstAcquirer() bool        { return bool(C.pn_message_is_first_acquirer(m.pn)) }
 +func (m *message) DeliveryCount() uint32      { return uint32(C.pn_message_get_delivery_count(m.pn)) }
 +func (m *message) MessageId() interface{}     { return rewindGet(C.pn_message_id(m.pn)) }
 +func (m *message) UserId() string             { return goString(C.pn_message_get_user_id(m.pn)) }
 +func (m *message) Address() string            { return C.GoString(C.pn_message_get_address(m.pn)) }
 +func (m *message) Subject() string            { return C.GoString(C.pn_message_get_subject(m.pn)) }
 +func (m *message) ReplyTo() string            { return C.GoString(C.pn_message_get_reply_to(m.pn)) }
 +func (m *message) CorrelationId() interface{} { return rewindGet(C.pn_message_correlation_id(m.pn)) }
 +func (m *message) ContentType() string        { return C.GoString(C.pn_message_get_content_type(m.pn)) }
 +func (m *message) ContentEncoding() string    { return C.GoString(C.pn_message_get_content_encoding(m.pn)) }
 +
 +func (m *message) ExpiryTime() time.Time {
 +	return time.Unix(0, int64(time.Millisecond*time.Duration(C.pn_message_get_expiry_time(m.pn))))
 +}
 +func (m *message) CreationTime() time.Time {
 +	return time.Unix(0, int64(time.Millisecond)*int64(C.pn_message_get_creation_time(m.pn)))
 +}
 +func (m *message) GroupId() string        { return C.GoString(C.pn_message_get_group_id(m.pn)) }
 +func (m *message) GroupSequence() int32   { return int32(C.pn_message_get_group_sequence(m.pn)) }
 +func (m *message) ReplyToGroupId() string { return C.GoString(C.pn_message_get_reply_to_group_id(m.pn)) }
 +
 +func (m *message) Instructions() map[string]interface{} {
 +	return rewindMap(C.pn_message_instructions(m.pn))
 +}
 +func (m *message) Annotations() map[string]interface{} {
 +	return rewindMap(C.pn_message_annotations(m.pn))
 +}
 +func (m *message) Properties() map[string]interface{} {
 +	return rewindMap(C.pn_message_properties(m.pn))
 +}
 +
 +// ==== message set methods
 +
 +func setData(v interface{}, data *C.pn_data_t) {
 +	C.pn_data_clear(data)
 +	marshal(v, data)
 +}
 +
 +func dataString(data *C.pn_data_t) string {
 +	str := C.pn_string(C.CString(""))
 +	defer C.pn_free(unsafe.Pointer(str))
 +	C.pn_inspect(unsafe.Pointer(data), str)
 +	return C.GoString(C.pn_string_get(str))
 +}
 +
 +func (m *message) SetInferred(b bool)  { C.pn_message_set_inferred(m.pn, C.bool(m.Inferred())) }
 +func (m *message) SetDurable(b bool)   { C.pn_message_set_durable(m.pn, C.bool(b)) }
 +func (m *message) SetPriority(b uint8) { C.pn_message_set_priority(m.pn, C.uint8_t(b)) }
 +func (m *message) SetTTL(d time.Duration) {
 +	C.pn_message_set_ttl(m.pn, C.pn_millis_t(d/time.Millisecond))
 +}
 +func (m *message) SetFirstAcquirer(b bool)     { C.pn_message_set_first_acquirer(m.pn, C.bool(b)) }
 +func (m *message) SetDeliveryCount(c uint32)   { C.pn_message_set_delivery_count(m.pn, C.uint32_t(c)) }
 +func (m *message) SetMessageId(id interface{}) { setData(id, C.pn_message_id(m.pn)) }
 +func (m *message) SetUserId(s string)          { C.pn_message_set_user_id(m.pn, pnBytes(([]byte)(s))) }
 +func (m *message) SetAddress(s string) {
 +	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_address))
 +}
 +func (m *message) SetSubject(s string) {
 +	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_subject))
 +}
 +func (m *message) SetReplyTo(s string) {
 +	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to))
 +}
 +func (m *message) SetCorrelationId(c interface{}) { setData(c, C.pn_message_correlation_id(m.pn)) }
 +func (m *message) SetContentType(s string) {
 +	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_type))
 +}
 +func (m *message) SetContentEncoding(s string) {
 +	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_encoding))
 +}
 +func (m *message) SetExpiryTime(t time.Time)   { C.pn_message_set_expiry_time(m.pn, pnTime(t)) }
 +func (m *message) SetCreationTime(t time.Time) { C.pn_message_set_creation_time(m.pn, pnTime(t)) }
 +func (m *message) SetGroupId(s string) {
 +	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_group_id))
 +}
 +func (m *message) SetGroupSequence(s int32) {
 +	C.pn_message_set_group_sequence(m.pn, C.pn_sequence_t(s))
 +}
 +func (m *message) SetReplyToGroupId(s string) {
 +	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to_group_id))
 +}
 +
 +func (m *message) SetInstructions(v map[string]interface{}) {
 +	setData(v, C.pn_message_instructions(m.pn))
 +}
 +func (m *message) SetAnnotations(v map[string]interface{}) { setData(v, C.pn_message_annotations(m.pn)) }
 +func (m *message) SetProperties(v map[string]interface{})  { setData(v, C.pn_message_properties(m.pn)) }
 +
 +// Marshal/Unmarshal body
 +func (m *message) Marshal(v interface{})   { clearMarshal(v, C.pn_message_body(m.pn)) }
 +func (m *message) Unmarshal(v interface{}) { rewindUnmarshal(v, C.pn_message_body(m.pn)) }
 +func (m *message) Body() (v interface{})   { m.Unmarshal(&v); return }
 +
 +func (m *message) Decode(data []byte) error {
 +	m.Clear()
 +	if len(data) == 0 {
- 		return internal.Errorf("empty buffer for decode")
++		return fmt.Errorf("empty buffer for decode")
 +	}
 +	if C.pn_message_decode(m.pn, cPtr(data), cLen(data)) < 0 {
- 		return internal.Errorf("decoding message: %s",
- 			internal.PnError(unsafe.Pointer(C.pn_message_error(m.pn))))
++		return fmt.Errorf("decoding message: %s", PnError(C.pn_message_error(m.pn)))
 +	}
 +	return nil
 +}
 +
 +func DecodeMessage(data []byte) (m Message, err error) {
 +	m = NewMessage()
 +	err = m.Decode(data)
 +	return
 +}
 +
 +func (m *message) Encode(buffer []byte) ([]byte, error) {
 +	encode := func(buf []byte) ([]byte, error) {
 +		len := cLen(buf)
 +		result := C.pn_message_encode(m.pn, cPtr(buf), &len)
 +		switch {
 +		case result == C.PN_OVERFLOW:
 +			return buf, overflow
 +		case result < 0:
- 			return buf, internal.Errorf("cannot encode message: %s", internal.PnErrorCode(result))
++			return buf, fmt.Errorf("cannot encode message: %s", PnErrorCode(result))
 +		default:
 +			return buf[:len], nil
 +		}
 +	}
 +	return encodeGrow(buffer, encode)
 +}
 +
 +// TODO aconway 2015-09-14: Multi-section messages.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/amqp/types.go
----------------------------------------------------------------------
diff --cc amqp/types.go
index 796da66,0000000..abcff25
mode 100644,000000..100644
--- a/amqp/types.go
+++ b/amqp/types.go
@@@ -1,199 -1,0 +1,203 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package amqp
 +
 +// #include <proton/codec.h>
 +import "C"
 +
 +import (
 +	"bytes"
 +	"fmt"
 +	"reflect"
 +	"time"
 +	"unsafe"
 +)
 +
 +type Type C.pn_type_t
 +
++// Older proton versions don't define C.PN_INVALID, so define it here.
++// In C it is pn_type_t(-1), in Go use the bitwise NOT operator to get the same value.
++const pnInvalid = ^C.pn_type_t(0)
++
 +func (t Type) String() string {
 +	switch C.pn_type_t(t) {
 +	case C.PN_NULL:
 +		return "null"
 +	case C.PN_BOOL:
 +		return "bool"
 +	case C.PN_UBYTE:
 +		return "ubyte"
 +	case C.PN_BYTE:
 +		return "byte"
 +	case C.PN_USHORT:
 +		return "ushort"
 +	case C.PN_SHORT:
 +		return "short"
 +	case C.PN_CHAR:
 +		return "char"
 +	case C.PN_UINT:
 +		return "uint"
 +	case C.PN_INT:
 +		return "int"
 +	case C.PN_ULONG:
 +		return "ulong"
 +	case C.PN_LONG:
 +		return "long"
 +	case C.PN_TIMESTAMP:
 +		return "timestamp"
 +	case C.PN_FLOAT:
 +		return "float"
 +	case C.PN_DOUBLE:
 +		return "double"
 +	case C.PN_DECIMAL32:
 +		return "decimal32"
 +	case C.PN_DECIMAL64:
 +		return "decimal64"
 +	case C.PN_DECIMAL128:
 +		return "decimal128"
 +	case C.PN_UUID:
 +		return "uuid"
 +	case C.PN_BINARY:
 +		return "binary"
 +	case C.PN_STRING:
 +		return "string"
 +	case C.PN_SYMBOL:
 +		return "symbol"
 +	case C.PN_DESCRIBED:
 +		return "described"
 +	case C.PN_ARRAY:
 +		return "array"
 +	case C.PN_LIST:
 +		return "list"
 +	case C.PN_MAP:
 +		return "map"
 +	default:
- 		if uint32(t) == uint32(C.PN_INVALID) {
++		if uint32(t) == uint32(pnInvalid) {
 +			return "no-data"
 +		}
 +		return fmt.Sprintf("unknown-type(%d)", t)
 +	}
 +}
 +
 +// Go types
 +var (
 +	bytesType = reflect.TypeOf([]byte{})
 +	valueType = reflect.TypeOf(reflect.Value{})
 +)
 +
 +// TODO aconway 2015-04-08: can't handle AMQP maps with key types that are not valid Go map keys.
 +
 +// Map is a generic map that can have mixed key and value types and so can represent any AMQP map
 +type Map map[interface{}]interface{}
 +
 +// List is a generic list that can hold mixed values and can represent any AMQP list.
 +//
 +type List []interface{}
 +
 +// Symbol is a string that is encoded as an AMQP symbol
 +type Symbol string
 +
 +func (s Symbol) GoString() string { return fmt.Sprintf("s\"%s\"", s) }
 +
 +// Binary is a string that is encoded as an AMQP binary.
 +// It is a string rather than a byte[] because byte[] is not hashable and can't be used as
 +// a map key, AMQP frequently uses binary types as map keys. It can convert to and from []byte
 +type Binary string
 +
 +func (b Binary) GoString() string { return fmt.Sprintf("b\"%s\"", b) }
 +
 +// GoString for Map prints values with their types, useful for debugging.
 +func (m Map) GoString() string {
 +	out := &bytes.Buffer{}
 +	fmt.Fprintf(out, "%T{", m)
 +	i := len(m)
 +	for k, v := range m {
 +		fmt.Fprintf(out, "%T(%#v): %T(%#v)", k, k, v, v)
 +		i--
 +		if i > 0 {
 +			fmt.Fprint(out, ", ")
 +		}
 +	}
 +	fmt.Fprint(out, "}")
 +	return out.String()
 +}
 +
 +// GoString for List prints values with their types, useful for debugging.
 +func (l List) GoString() string {
 +	out := &bytes.Buffer{}
 +	fmt.Fprintf(out, "%T{", l)
 +	for i := 0; i < len(l); i++ {
 +		fmt.Fprintf(out, "%T(%#v)", l[i], l[i])
 +		if i == len(l)-1 {
 +			fmt.Fprint(out, ", ")
 +		}
 +	}
 +	fmt.Fprint(out, "}")
 +	return out.String()
 +}
 +
 +// pnTime converts Go time.Time to Proton millisecond Unix time.
 +func pnTime(t time.Time) C.pn_timestamp_t {
 +	secs := t.Unix()
 +	// Note: sub-second accuracy is not guaraunteed if the Unix time in
 +	// nanoseconds cannot be represented by an int64 (sometime around year 2260)
 +	msecs := (t.UnixNano() % int64(time.Second)) / int64(time.Millisecond)
 +	return C.pn_timestamp_t(secs*1000 + msecs)
 +}
 +
 +// goTime converts a pn_timestamp_t to a Go time.Time.
 +func goTime(t C.pn_timestamp_t) time.Time {
 +	secs := int64(t) / 1000
 +	nsecs := (int64(t) % 1000) * int64(time.Millisecond)
 +	return time.Unix(secs, nsecs)
 +}
 +
 +func goBytes(cBytes C.pn_bytes_t) (bytes []byte) {
 +	if cBytes.start != nil {
 +		bytes = C.GoBytes(unsafe.Pointer(cBytes.start), C.int(cBytes.size))
 +	}
 +	return
 +}
 +
 +func goString(cBytes C.pn_bytes_t) (str string) {
 +	if cBytes.start != nil {
 +		str = C.GoStringN(cBytes.start, C.int(cBytes.size))
 +	}
 +	return
 +}
 +
 +func pnBytes(b []byte) C.pn_bytes_t {
 +	if len(b) == 0 {
 +		return C.pn_bytes_t{0, nil}
 +	} else {
 +		return C.pn_bytes_t{C.size_t(len(b)), (*C.char)(unsafe.Pointer(&b[0]))}
 +	}
 +}
 +
 +func cPtr(b []byte) *C.char {
 +	if len(b) == 0 {
 +		return nil
 +	}
 +	return (*C.char)(unsafe.Pointer(&b[0]))
 +}
 +
 +func cLen(b []byte) C.size_t {
 +	return C.size_t(len(b))
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/amqp/unmarshal.go
----------------------------------------------------------------------
diff --cc amqp/unmarshal.go
index 751921d,0000000..25bb519
mode 100644,000000..100644
--- a/amqp/unmarshal.go
+++ b/amqp/unmarshal.go
@@@ -1,558 -1,0 +1,561 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +oor more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package amqp
 +
 +// #include <proton/codec.h>
 +import "C"
 +
 +import (
 +	"bytes"
 +	"fmt"
 +	"io"
- 	"qpid.apache.org/internal"
 +	"reflect"
 +	"unsafe"
 +)
 +
 +const minDecode = 1024
 +
 +// Error returned if AMQP data cannot be unmarshaled as the desired Go type.
 +type UnmarshalError struct {
 +	// The name of the AMQP type.
 +	AMQPType string
 +	// The Go type.
 +	GoType reflect.Type
 +}
 +
 +func newUnmarshalError(pnType C.pn_type_t, v interface{}) *UnmarshalError {
 +	return &UnmarshalError{Type(pnType).String(), reflect.TypeOf(v)}
 +}
 +
 +func (e UnmarshalError) Error() string {
 +	if e.GoType.Kind() != reflect.Ptr {
- 		return fmt.Sprintf("proton: cannot unmarshal to type %s, not a pointer", e.GoType)
++		return fmt.Sprintf("cannot unmarshal to type %s, not a pointer", e.GoType)
 +	} else {
- 		return fmt.Sprintf("proton: cannot unmarshal AMQP %s to %s", e.AMQPType, e.GoType)
++		return fmt.Sprintf("cannot unmarshal AMQP %s to %s", e.AMQPType, e.GoType)
 +	}
 +}
 +
 +func doRecover(err *error) {
 +	r := recover()
 +	switch r := r.(type) {
 +	case nil:
- 	case *UnmarshalError, internal.Error:
- 		*err = r.(error)
++	case *UnmarshalError:
++		*err = r
 +	default:
 +		panic(r)
 +	}
 +}
 +
 +//
 +// Decoding from a pn_data_t
 +//
 +// NOTE: we use panic() to signal a decoding error, simplifies decoding logic.
 +// We recover() at the highest possible level - i.e. in the exported Unmarshal or Decode.
 +//
 +
 +// Decoder decodes AMQP values from an io.Reader.
 +//
 +type Decoder struct {
 +	reader io.Reader
 +	buffer bytes.Buffer
 +}
 +
 +// NewDecoder returns a new decoder that reads from r.
 +//
 +// The decoder has it's own buffer and may read more data than required for the
 +// AMQP values requested.  Use Buffered to see if there is data left in the
 +// buffer.
 +//
 +func NewDecoder(r io.Reader) *Decoder {
 +	return &Decoder{r, bytes.Buffer{}}
 +}
 +
 +// Buffered returns a reader of the data remaining in the Decoder's buffer. The
 +// reader is valid until the next call to Decode.
 +//
 +func (d *Decoder) Buffered() io.Reader {
 +	return bytes.NewReader(d.buffer.Bytes())
 +}
 +
 +// Decode reads the next AMQP value from the Reader and stores it in the value pointed to by v.
 +//
 +// See the documentation for Unmarshal for details about the conversion of AMQP into a Go value.
 +//
 +func (d *Decoder) Decode(v interface{}) (err error) {
 +	defer doRecover(&err)
 +	data := C.pn_data(0)
 +	defer C.pn_data_free(data)
 +	var n int
- 	for n == 0 && err == nil {
- 		n = decode(data, d.buffer.Bytes())
++	for n == 0 {
++		n, err = decode(data, d.buffer.Bytes())
++		if err != nil {
++			return err
++		}
 +		if n == 0 { // n == 0 means not enough data, read more
 +			err = d.more()
 +		} else {
 +			unmarshal(v, data)
 +		}
 +	}
 +	d.buffer.Next(n)
 +	return
 +}
 +
 +/*
 +Unmarshal decodes AMQP-encoded bytes and stores the result in the value pointed to by v.
 +Types are converted as follows:
 +
 + +---------------------------+----------------------------------------------------------------------+
 + |To Go types                |From AMQP types                                                       |
 + +===========================+======================================================================+
 + |bool                       |bool                                                                  |
 + +---------------------------+----------------------------------------------------------------------+
 + |int, int8, int16,          |Equivalent or smaller signed integer type: byte, short, int, long.    |
 + |int32, int64               |                                                                      |
 + +---------------------------+----------------------------------------------------------------------+
 + |uint, uint8, uint16,       |Equivalent or smaller unsigned integer type: ubyte, ushort, uint,     |
 + |uint32, uint64 types       |ulong                                                                 |
 + +---------------------------+----------------------------------------------------------------------+
 + |float32, float64           |Equivalent or smaller float or double.                                |
 + +---------------------------+----------------------------------------------------------------------+
 + |string, []byte             |string, symbol or binary.                                             |
 + +---------------------------+----------------------------------------------------------------------+
 + |Symbol                     |symbol                                                                |
 + +---------------------------+----------------------------------------------------------------------+
 + |map[K]T                    |map, provided all keys and values can unmarshal to types K, T         |
 + +---------------------------+----------------------------------------------------------------------+
 + |Map                        |map, any AMQP map                                                     |
 + +---------------------------+----------------------------------------------------------------------+
 + |interface{}                |Any AMQP value can be unmarshaled to an interface{} as follows:       |
 + |                           +------------------------+---------------------------------------------+
 + |                           |AMQP Type               |Go Type in interface{}                       |
 + |                           +========================+=============================================+
 + |                           |bool                    |bool                                         |
 + |                           +------------------------+---------------------------------------------+
 + |                           |byte,short,int,long     |int8,int16,int32,int64                       |
 + |                           +------------------------+---------------------------------------------+
 + |                           |ubyte,ushort,uint,ulong |uint8,uint16,uint32,uint64                   |
 + |                           +------------------------+---------------------------------------------+
 + |                           |float, double           |float32, float64                             |
 + |                           +------------------------+---------------------------------------------+
 + |                           |string                  |string                                       |
 + |                           +------------------------+---------------------------------------------+
 + |                           |symbol                  |Symbol                                       |
 + |                           +------------------------+---------------------------------------------+
 + |                           |binary                  |Binary                                       |
 + |                           +------------------------+---------------------------------------------+
 + |                           |nulll                   |nil                                          |
 + |                           +------------------------+---------------------------------------------+
 + |                           |map                     |Map                                          |
 + |                           +------------------------+---------------------------------------------+
 + |                           |list                    |List                                         |
 + +---------------------------+------------------------+---------------------------------------------+
 +
 +The following Go types cannot be unmarshaled: uintptr, function, interface, channel.
 +
 +TODO
 +
 +Go types: array, struct.
 +
 +AMQP types: decimal32/64/128, char (round trip), timestamp, uuid, array, multi-section message bodies.
 +
 +AMQP maps with mixed/unhashable key types need an alternate representation.
 +
 +Described types.
 +*/
 +func Unmarshal(bytes []byte, v interface{}) (n int, err error) {
 +	defer doRecover(&err)
 +
 +	data := C.pn_data(0)
 +	defer C.pn_data_free(data)
- 	n = decode(data, bytes)
++	n, err = decode(data, bytes)
++	if err != nil {
++		return 0, err
++	}
 +	if n == 0 {
- 		err = internal.Errorf("not enough data")
++		return 0, fmt.Errorf("not enough data")
 +	} else {
 +		unmarshal(v, data)
 +	}
- 	return
++	return n, nil
 +}
 +
 +// more reads more data when we can't parse a complete AMQP type
 +func (d *Decoder) more() error {
 +	var readSize int64 = minDecode
 +	if int64(d.buffer.Len()) > readSize { // Grow by doubling
 +		readSize = int64(d.buffer.Len())
 +	}
 +	var n int64
 +	n, err := d.buffer.ReadFrom(io.LimitReader(d.reader, readSize))
 +	if n == 0 && err == nil { // ReadFrom won't report io.EOF, just returns 0
 +		err = io.EOF
 +	}
 +	return err
 +}
 +
 +// Unmarshal from data into value pointed at by v.
 +func unmarshal(v interface{}, data *C.pn_data_t) {
 +	pnType := C.pn_data_type(data)
 +	switch v := v.(type) {
 +	case *bool:
 +		switch pnType {
 +		case C.PN_BOOL:
 +			*v = bool(C.pn_data_get_bool(data))
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +	case *int8:
 +		switch pnType {
 +		case C.PN_CHAR:
 +			*v = int8(C.pn_data_get_char(data))
 +		case C.PN_BYTE:
 +			*v = int8(C.pn_data_get_byte(data))
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +	case *uint8:
 +		switch pnType {
 +		case C.PN_CHAR:
 +			*v = uint8(C.pn_data_get_char(data))
 +		case C.PN_UBYTE:
 +			*v = uint8(C.pn_data_get_ubyte(data))
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +	case *int16:
 +		switch pnType {
 +		case C.PN_CHAR:
 +			*v = int16(C.pn_data_get_char(data))
 +		case C.PN_BYTE:
 +			*v = int16(C.pn_data_get_byte(data))
 +		case C.PN_SHORT:
 +			*v = int16(C.pn_data_get_short(data))
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +	case *uint16:
 +		switch pnType {
 +		case C.PN_CHAR:
 +			*v = uint16(C.pn_data_get_char(data))
 +		case C.PN_UBYTE:
 +			*v = uint16(C.pn_data_get_ubyte(data))
 +		case C.PN_USHORT:
 +			*v = uint16(C.pn_data_get_ushort(data))
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +	case *int32:
 +		switch pnType {
 +		case C.PN_CHAR:
 +			*v = int32(C.pn_data_get_char(data))
 +		case C.PN_BYTE:
 +			*v = int32(C.pn_data_get_byte(data))
 +		case C.PN_SHORT:
 +			*v = int32(C.pn_data_get_short(data))
 +		case C.PN_INT:
 +			*v = int32(C.pn_data_get_int(data))
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +	case *uint32:
 +		switch pnType {
 +		case C.PN_CHAR:
 +			*v = uint32(C.pn_data_get_char(data))
 +		case C.PN_UBYTE:
 +			*v = uint32(C.pn_data_get_ubyte(data))
 +		case C.PN_USHORT:
 +			*v = uint32(C.pn_data_get_ushort(data))
 +		case C.PN_UINT:
 +			*v = uint32(C.pn_data_get_uint(data))
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +
 +	case *int64:
 +		switch pnType {
 +		case C.PN_CHAR:
 +			*v = int64(C.pn_data_get_char(data))
 +		case C.PN_BYTE:
 +			*v = int64(C.pn_data_get_byte(data))
 +		case C.PN_SHORT:
 +			*v = int64(C.pn_data_get_short(data))
 +		case C.PN_INT:
 +			*v = int64(C.pn_data_get_int(data))
 +		case C.PN_LONG:
 +			*v = int64(C.pn_data_get_long(data))
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +
 +	case *uint64:
 +		switch pnType {
 +		case C.PN_CHAR:
 +			*v = uint64(C.pn_data_get_char(data))
 +		case C.PN_UBYTE:
 +			*v = uint64(C.pn_data_get_ubyte(data))
 +		case C.PN_USHORT:
 +			*v = uint64(C.pn_data_get_ushort(data))
 +		case C.PN_ULONG:
 +			*v = uint64(C.pn_data_get_ulong(data))
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +
 +	case *int:
 +		switch pnType {
 +		case C.PN_CHAR:
 +			*v = int(C.pn_data_get_char(data))
 +		case C.PN_BYTE:
 +			*v = int(C.pn_data_get_byte(data))
 +		case C.PN_SHORT:
 +			*v = int(C.pn_data_get_short(data))
 +		case C.PN_INT:
 +			*v = int(C.pn_data_get_int(data))
 +		case C.PN_LONG:
 +			if unsafe.Sizeof(0) == 8 {
 +				*v = int(C.pn_data_get_long(data))
 +			} else {
 +				panic(newUnmarshalError(pnType, v))
 +			}
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +
 +	case *uint:
 +		switch pnType {
 +		case C.PN_CHAR:
 +			*v = uint(C.pn_data_get_char(data))
 +		case C.PN_UBYTE:
 +			*v = uint(C.pn_data_get_ubyte(data))
 +		case C.PN_USHORT:
 +			*v = uint(C.pn_data_get_ushort(data))
 +		case C.PN_UINT:
 +			*v = uint(C.pn_data_get_uint(data))
 +		case C.PN_ULONG:
 +			if unsafe.Sizeof(0) == 8 {
 +				*v = uint(C.pn_data_get_ulong(data))
 +			} else {
 +				panic(newUnmarshalError(pnType, v))
 +			}
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +
 +	case *float32:
 +		switch pnType {
 +		case C.PN_FLOAT:
 +			*v = float32(C.pn_data_get_float(data))
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +
 +	case *float64:
 +		switch pnType {
 +		case C.PN_FLOAT:
 +			*v = float64(C.pn_data_get_float(data))
 +		case C.PN_DOUBLE:
 +			*v = float64(C.pn_data_get_double(data))
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +
 +	case *string:
 +		switch pnType {
 +		case C.PN_STRING:
 +			*v = goString(C.pn_data_get_string(data))
 +		case C.PN_SYMBOL:
 +			*v = goString(C.pn_data_get_symbol(data))
 +		case C.PN_BINARY:
 +			*v = goString(C.pn_data_get_binary(data))
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +
 +	case *[]byte:
 +		switch pnType {
 +		case C.PN_STRING:
 +			*v = goBytes(C.pn_data_get_string(data))
 +		case C.PN_SYMBOL:
 +			*v = goBytes(C.pn_data_get_symbol(data))
 +		case C.PN_BINARY:
 +			*v = goBytes(C.pn_data_get_binary(data))
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +
 +	case *Binary:
 +		switch pnType {
 +		case C.PN_BINARY:
 +			*v = Binary(goBytes(C.pn_data_get_binary(data)))
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +
 +	case *Symbol:
 +		switch pnType {
 +		case C.PN_SYMBOL:
 +			*v = Symbol(goBytes(C.pn_data_get_symbol(data)))
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +
 +	case *interface{}:
 +		getInterface(data, v)
 +
 +	default:
 +		if reflect.TypeOf(v).Kind() != reflect.Ptr {
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +		switch reflect.TypeOf(v).Elem().Kind() {
 +		case reflect.Map:
 +			getMap(data, v)
 +		case reflect.Slice:
 +			getList(data, v)
 +		default:
 +			panic(newUnmarshalError(pnType, v))
 +		}
 +	}
 +	err := dataError("unmarshaling", data)
 +	if err != nil {
 +		panic(err)
 +	}
 +	return
 +}
 +
 +func rewindUnmarshal(v interface{}, data *C.pn_data_t) {
 +	C.pn_data_rewind(data)
 +	C.pn_data_next(data)
 +	unmarshal(v, data)
 +}
 +
 +// Getting into an interface is driven completely by the AMQP type, since the interface{}
 +// target is type-neutral.
 +func getInterface(data *C.pn_data_t, v *interface{}) {
 +	pnType := C.pn_data_type(data)
 +	switch pnType {
- 	// Note PN_INVALID is defined outside the enum, older Go versions don't consider it a C.pn_type_t
- 	case C.PN_NULL, C.pn_type_t(C.PN_INVALID): // No data.
++	case C.PN_NULL, C.pn_type_t(pnInvalid): // No data.
 +		*v = nil
 +	case C.PN_BOOL:
 +		*v = bool(C.pn_data_get_bool(data))
 +	case C.PN_UBYTE:
 +		*v = uint8(C.pn_data_get_ubyte(data))
 +	case C.PN_BYTE:
 +		*v = int8(C.pn_data_get_byte(data))
 +	case C.PN_USHORT:
 +		*v = uint16(C.pn_data_get_ushort(data))
 +	case C.PN_SHORT:
 +		*v = int16(C.pn_data_get_short(data))
 +	case C.PN_UINT:
 +		*v = uint32(C.pn_data_get_uint(data))
 +	case C.PN_INT:
 +		*v = int32(C.pn_data_get_int(data))
 +	case C.PN_CHAR:
 +		*v = uint8(C.pn_data_get_char(data))
 +	case C.PN_ULONG:
 +		*v = uint64(C.pn_data_get_ulong(data))
 +	case C.PN_LONG:
 +		*v = int64(C.pn_data_get_long(data))
 +	case C.PN_FLOAT:
 +		*v = float32(C.pn_data_get_float(data))
 +	case C.PN_DOUBLE:
 +		*v = float64(C.pn_data_get_double(data))
 +	case C.PN_BINARY:
 +		*v = Binary(goBytes(C.pn_data_get_binary(data)))
 +	case C.PN_STRING:
 +		*v = goString(C.pn_data_get_string(data))
 +	case C.PN_SYMBOL:
 +		*v = Symbol(goString(C.pn_data_get_symbol(data)))
 +	case C.PN_MAP:
 +		m := make(Map)
 +		unmarshal(&m, data)
 +		*v = m
 +	case C.PN_LIST:
 +		l := make(List, 0)
 +		unmarshal(&l, data)
 +		*v = l
 +	default:
 +		panic(newUnmarshalError(pnType, v))
 +	}
 +}
 +
 +// get into map pointed at by v
 +func getMap(data *C.pn_data_t, v interface{}) {
 +	mapValue := reflect.ValueOf(v).Elem()
 +	mapValue.Set(reflect.MakeMap(mapValue.Type())) // Clear the map
 +	switch pnType := C.pn_data_type(data); pnType {
 +	case C.PN_MAP:
 +		count := int(C.pn_data_get_map(data))
 +		if bool(C.pn_data_enter(data)) {
 +			defer C.pn_data_exit(data)
 +			for i := 0; i < count/2; i++ {
 +				if bool(C.pn_data_next(data)) {
 +					key := reflect.New(mapValue.Type().Key())
 +					unmarshal(key.Interface(), data)
 +					if bool(C.pn_data_next(data)) {
 +						val := reflect.New(mapValue.Type().Elem())
 +						unmarshal(val.Interface(), data)
 +						mapValue.SetMapIndex(key.Elem(), val.Elem())
 +					}
 +				}
 +			}
 +		}
- 		// Note PN_INVALID is defined outside the enum, older Go versions don't consider it a C.pn_type_t
- 	case C.pn_type_t(C.PN_INVALID): // Leave the map empty
++	case C.pn_type_t(pnInvalid): // Leave the map empty
 +	default:
 +		panic(newUnmarshalError(pnType, v))
 +	}
 +}
 +
 +func getList(data *C.pn_data_t, v interface{}) {
 +	pnType := C.pn_data_type(data)
 +	if pnType != C.PN_LIST {
 +		panic(newUnmarshalError(pnType, v))
 +	}
 +	count := int(C.pn_data_get_list(data))
 +	listValue := reflect.MakeSlice(reflect.TypeOf(v).Elem(), count, count)
 +	if bool(C.pn_data_enter(data)) {
 +		for i := 0; i < count; i++ {
 +			if bool(C.pn_data_next(data)) {
 +				val := reflect.New(listValue.Type().Elem())
 +				unmarshal(val.Interface(), data)
 +				listValue.Index(i).Set(val.Elem())
 +			}
 +		}
 +		C.pn_data_exit(data)
 +	}
 +	reflect.ValueOf(v).Elem().Set(listValue)
 +}
 +
 +// decode from bytes.
 +// Return bytes decoded or 0 if we could not decode a complete object.
 +//
- func decode(data *C.pn_data_t, bytes []byte) int {
++func decode(data *C.pn_data_t, bytes []byte) (int, error) {
 +	if len(bytes) == 0 {
- 		return 0
++		return 0, nil
 +	}
 +	n := int(C.pn_data_decode(data, cPtr(bytes), cLen(bytes)))
 +	if n == int(C.PN_UNDERFLOW) {
 +		C.pn_error_clear(C.pn_data_error(data))
- 		return 0
++		return 0, nil
 +	} else if n <= 0 {
- 		panic(internal.Errorf("unmarshal %s", internal.PnErrorCode(n)))
++		return 0, fmt.Errorf("unmarshal %s", PnErrorCode(n))
 +	}
- 	return n
++	return n, nil
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/amqp/url.go
----------------------------------------------------------------------
diff --cc amqp/url.go
index 0d0c662,0000000..70545d2
mode 100644,000000..100644
--- a/amqp/url.go
+++ b/amqp/url.go
@@@ -1,96 -1,0 +1,96 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package amqp
 +
 +/*
 +#include <stdlib.h>
 +#include <string.h>
 +#include <proton/url.h>
 +
 +// Helper function for setting URL fields.
 +typedef void (*setter_fn)(pn_url_t* url, const char* value);
 +inline void	set(pn_url_t *url, setter_fn s, const char* value) {
 +  s(url, value);
 +}
 +*/
 +import "C"
 +
 +import (
++	"fmt"
 +	"net"
 +	"net/url"
- 	"qpid.apache.org/internal"
 +	"unsafe"
 +)
 +
 +const (
 +	amqp  string = "amqp"
 +	amqps        = "amqps"
 +)
 +
 +// ParseUrl parses an AMQP URL string and returns a net/url.Url.
 +//
 +// It is more forgiving than net/url.Parse and allows most of the parts of the
 +// URL to be missing, assuming AMQP defaults.
 +//
 +func ParseURL(s string) (u *url.URL, err error) {
 +	cstr := C.CString(s)
 +	defer C.free(unsafe.Pointer(cstr))
 +	pnUrl := C.pn_url_parse(cstr)
 +	if pnUrl == nil {
- 		return nil, internal.Errorf("bad URL %#v", s)
++		return nil, fmt.Errorf("bad URL %#v", s)
 +	}
 +	defer C.pn_url_free(pnUrl)
 +
 +	scheme := C.GoString(C.pn_url_get_scheme(pnUrl))
 +	username := C.GoString(C.pn_url_get_username(pnUrl))
 +	password := C.GoString(C.pn_url_get_password(pnUrl))
 +	host := C.GoString(C.pn_url_get_host(pnUrl))
 +	port := C.GoString(C.pn_url_get_port(pnUrl))
 +	path := C.GoString(C.pn_url_get_path(pnUrl))
 +
 +	if err != nil {
- 		return nil, internal.Errorf("bad URL %#v: %s", s, err)
++		return nil, fmt.Errorf("bad URL %#v: %s", s, err)
 +	}
 +	if scheme == "" {
 +		scheme = amqp
 +	}
 +	if port == "" {
 +		if scheme == amqps {
 +			port = amqps
 +		} else {
 +			port = amqp
 +		}
 +	}
 +	var user *url.Userinfo
 +	if password != "" {
 +		user = url.UserPassword(username, password)
 +	} else if username != "" {
 +		user = url.User(username)
 +	}
 +
 +	u = &url.URL{
 +		Scheme: scheme,
 +		User:   user,
 +		Host:   net.JoinHostPort(host, port),
 +		Path:   path,
 +	}
 +
 +	return u, nil
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/amqp/url_test.go
----------------------------------------------------------------------
diff --cc amqp/url_test.go
index f80f1c4,0000000..99b656d
mode 100644,000000..100644
--- a/amqp/url_test.go
+++ b/amqp/url_test.go
@@@ -1,51 -1,0 +1,51 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package amqp
 +
 +import (
 +	"fmt"
 +)
 +
 +func ExampleParseURL() {
 +	for _, s := range []string{
 +		"amqp://username:password@host:1234/path",
 +		"host:1234",
 +		"host",
 +		":1234",
 +		"host/path",
 +		"amqps://host",
 +		"",
 +	} {
 +		u, err := ParseURL(s)
 +		if err != nil {
 +			fmt.Println(err)
 +		} else {
 +			fmt.Println(u)
 +		}
 +	}
 +	// Output:
 +	// amqp://username:password@host:1234/path
 +	// amqp://host:1234
 +	// amqp://host:amqp
 +	// amqp://:1234
 +	// amqp://host:amqp/path
 +	// amqps://host:amqps
- 	// proton: bad URL ""
++	// bad URL ""
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/connection.go
----------------------------------------------------------------------
diff --cc electron/connection.go
index d6761d6,0000000..8a9e6cd
mode 100644,000000..100644
--- a/electron/connection.go
+++ b/electron/connection.go
@@@ -1,218 -1,0 +1,238 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package electron
 +
 +// #include <proton/disposition.h>
 +import "C"
 +
 +import (
 +	"net"
- 	"qpid.apache.org/amqp"
- 	"qpid.apache.org/internal"
 +	"qpid.apache.org/proton"
 +	"sync"
 +	"time"
 +)
 +
 +// Connection is an AMQP connection, created by a Container.
 +type Connection interface {
 +	Endpoint
 +
 +	// Sender opens a new sender on the DefaultSession.
- 	//
- 	// v can be a string, which is used as the Target address, or a SenderSettings
- 	// struct containing more details settings.
- 	Sender(...LinkSetting) (Sender, error)
++	Sender(...LinkOption) (Sender, error)
 +
 +	// Receiver opens a new Receiver on the DefaultSession().
- 	//
- 	// v can be a string, which is used as the
- 	// Source address, or a ReceiverSettings struct containing more details
- 	// settings.
- 	Receiver(...LinkSetting) (Receiver, error)
++	Receiver(...LinkOption) (Receiver, error)
 +
 +	// DefaultSession() returns a default session for the connection. It is opened
 +	// on the first call to DefaultSession and returned on subsequent calls.
 +	DefaultSession() (Session, error)
 +
 +	// Session opens a new session.
- 	Session(...SessionSetting) (Session, error)
++	Session(...SessionOption) (Session, error)
 +
 +	// Container for the connection.
 +	Container() Container
 +
 +	// Disconnect the connection abruptly with an error.
 +	Disconnect(error)
 +
 +	// Wait waits for the connection to be disconnected.
 +	Wait() error
 +
 +	// WaitTimeout is like Wait but returns Timeout if the timeout expires.
 +	WaitTimeout(time.Duration) error
++
++	// Incoming returns a channel for incoming endpoints opened by the remote end.
++	//
++	// To enable, pass AllowIncoming() when creating the Connection. Otherwise all
++	// incoming endpoint requests are automatically rejected and Incoming()
++	// returns nil.
++	//
++	// An Incoming value can be an *IncomingSession, *IncomingSender or
++	// *IncomingReceiver.  You must call Accept() to open the endpoint or Reject()
++	// to close it with an error. The specific Incoming types have additional
++	// methods to configure the endpoint.
++	//
++	// Not receiving from Incoming() or not calling Accept/Reject will block the
++	// electron event loop. Normally you would have a dedicated goroutine receive
++	// from Incoming() and start new goroutines to serve each incoming endpoint.
++	// The channel is closed when the Connection closes.
++	//
++	Incoming() <-chan Incoming
 +}
 +
- // ConnectionSetting can be passed when creating a connection.
- // See functions that return ConnectionSetting for details
- type ConnectionSetting func(*connection)
++// ConnectionOption can be passed when creating a connection to configure various options
++type ConnectionOption func(*connection)
 +
- // Server setting puts the connection in server mode.
++// Server returns a ConnectionOption to put the connection in server mode.
 +//
 +// A server connection will do protocol negotiation to accept a incoming AMQP
 +// connection. Normally you would call this for a connection created by
 +// net.Listener.Accept()
 +//
- func Server() ConnectionSetting { return func(c *connection) { c.engine.Server() } }
++func Server() ConnectionOption { return func(c *connection) { c.engine.Server() } }
 +
- // Accepter provides a function to be called when a connection receives an incoming
- // request to open an endpoint, one of IncomingSession, IncomingSender or IncomingReceiver.
- //
- // The accept() function must not block or use the accepted endpoint.
- // It can pass the endpoint to another goroutine for processing.
- //
- // By default all incoming endpoints are rejected.
- func Accepter(accept func(Incoming)) ConnectionSetting {
- 	return func(c *connection) { c.accept = accept }
++// AllowIncoming returns a ConnectionOption to enable incoming endpoint open requests.
++// See Connection.Incoming()
++func AllowIncoming() ConnectionOption {
++	return func(c *connection) { c.incoming = make(chan Incoming) }
 +}
 +
 +type connection struct {
 +	endpoint
- 	listenOnce, defaultSessionOnce, closeOnce sync.Once
++	defaultSessionOnce, closeOnce sync.Once
 +
 +	container   *container
 +	conn        net.Conn
- 	accept      func(Incoming)
++	incoming    chan Incoming
 +	handler     *handler
 +	engine      *proton.Engine
- 	err         internal.ErrorHolder
 +	eConnection proton.Connection
 +
 +	defaultSession Session
- 	done           chan struct{}
 +}
 +
- func newConnection(conn net.Conn, cont *container, setting ...ConnectionSetting) (*connection, error) {
- 	c := &connection{container: cont, conn: conn, accept: func(Incoming) {}, done: make(chan struct{})}
++func newConnection(conn net.Conn, cont *container, setting ...ConnectionOption) (*connection, error) {
++	c := &connection{container: cont, conn: conn}
 +	c.handler = newHandler(c)
 +	var err error
 +	c.engine, err = proton.NewEngine(c.conn, c.handler.delegator)
 +	if err != nil {
 +		return nil, err
 +	}
 +	for _, set := range setting {
 +		set(c)
 +	}
- 	c.str = c.engine.String()
++	c.endpoint = makeEndpoint(c.engine.String())
 +	c.eConnection = c.engine.Connection()
- 	go func() { c.engine.Run(); close(c.done) }()
++	go c.run()
 +	return c, nil
 +}
 +
++func (c *connection) run() {
++	c.engine.Run()
++	if c.incoming != nil {
++		close(c.incoming)
++	}
++	c.closed(Closed)
++}
++
 +func (c *connection) Close(err error) { c.err.Set(err); c.engine.Close(err) }
 +
 +func (c *connection) Disconnect(err error) { c.err.Set(err); c.engine.Disconnect(err) }
 +
- func (c *connection) Session(setting ...SessionSetting) (Session, error) {
++func (c *connection) Session(setting ...SessionOption) (Session, error) {
 +	var s Session
 +	err := c.engine.InjectWait(func() error {
++		if c.Error() != nil {
++			return c.Error()
++		}
 +		eSession, err := c.engine.Connection().Session()
 +		if err == nil {
 +			eSession.Open()
 +			if err == nil {
 +				s = newSession(c, eSession, setting...)
 +			}
 +		}
 +		return err
 +	})
 +	return s, err
 +}
 +
 +func (c *connection) Container() Container { return c.container }
 +
 +func (c *connection) DefaultSession() (s Session, err error) {
 +	c.defaultSessionOnce.Do(func() {
 +		c.defaultSession, err = c.Session()
 +	})
 +	if err == nil {
 +		err = c.Error()
 +	}
 +	return c.defaultSession, err
 +}
 +
- func (c *connection) Sender(setting ...LinkSetting) (Sender, error) {
++func (c *connection) Sender(setting ...LinkOption) (Sender, error) {
 +	if s, err := c.DefaultSession(); err == nil {
 +		return s.Sender(setting...)
 +	} else {
 +		return nil, err
 +	}
 +}
 +
- func (c *connection) Receiver(setting ...LinkSetting) (Receiver, error) {
++func (c *connection) Receiver(setting ...LinkOption) (Receiver, error) {
 +	if s, err := c.DefaultSession(); err == nil {
 +		return s.Receiver(setting...)
 +	} else {
 +		return nil, err
 +	}
 +}
 +
 +func (c *connection) Connection() Connection { return c }
 +
 +func (c *connection) Wait() error { return c.WaitTimeout(Forever) }
 +func (c *connection) WaitTimeout(timeout time.Duration) error {
 +	_, err := timedReceive(c.done, timeout)
 +	if err == Timeout {
 +		return Timeout
 +	}
 +	return c.Error()
 +}
 +
++func (c *connection) Incoming() <-chan Incoming { return c.incoming }
++
 +// Incoming is the interface for incoming requests to open an endpoint.
 +// Implementing types are IncomingSession, IncomingSender and IncomingReceiver.
 +type Incoming interface {
- 	// Accept the endpoint with default settings.
- 	//
- 	// You must not use the returned endpoint in the accept() function that
- 	// receives the Incoming value, but you can pass it to other goroutines.
- 	//
- 	// Implementing types provide type-specific Accept functions that take additional settings.
++	// Accept and open the endpoint.
 +	Accept() Endpoint
 +
 +	// Reject the endpoint with an error
 +	Reject(error)
 +
- 	error() error
++	// wait for and call the accept function, call in proton goroutine.
++	wait() error
++	pEndpoint() proton.Endpoint
 +}
 +
 +type incoming struct {
- 	err      error
- 	accepted bool
++	endpoint proton.Endpoint
++	acceptCh chan func() error
++}
++
++func makeIncoming(e proton.Endpoint) incoming {
++	return incoming{endpoint: e, acceptCh: make(chan func() error)}
 +}
 +
- func (i *incoming) Reject(err error) { i.err = err }
++func (in *incoming) Reject(err error) { in.acceptCh <- func() error { return err } }
++
++// Call in proton goroutine, wait for and call the accept function fr
++func (in *incoming) wait() error { return (<-in.acceptCh)() }
++
++func (in *incoming) pEndpoint() proton.Endpoint { return in.endpoint }
 +
- func (i *incoming) error() error {
- 	switch {
- 	case i.err != nil:
- 		return i.err
- 	case !i.accepted:
- 		return amqp.Errorf(amqp.NotAllowed, "remote open rejected")
- 	default:
++// Called in app goroutine to send an accept function to proton and return the resulting endpoint.
++func (in *incoming) accept(f func() Endpoint) Endpoint {
++	done := make(chan Endpoint)
++	in.acceptCh <- func() error {
++		ep := f()
++		done <- ep
 +		return nil
 +	}
++	return <-done
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/container.go
----------------------------------------------------------------------
diff --cc electron/container.go
index 7bbc4b0,0000000..b5ce6c0
mode 100644,000000..100644
--- a/electron/container.go
+++ b/electron/container.go
@@@ -1,71 -1,0 +1,77 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package electron
 +
 +import (
 +	"net"
- 	"qpid.apache.org/internal"
++	"qpid.apache.org/proton"
++	"strconv"
++	"sync/atomic"
 +)
 +
 +// Container is an AMQP container, it represents a single AMQP "application".It
 +// provides functions to create new Connections to remote containers.
 +//
 +// Create with NewContainer()
 +//
 +type Container interface {
 +	// Id is a unique identifier for the container in your distributed application.
 +	Id() string
 +
 +	// Create a new AMQP Connection over the supplied net.Conn connection.
 +	//
 +	// You must call Connection.Open() on the returned Connection, after
 +	// setting any Connection properties you need to set. Note the net.Conn
 +	// can be an outgoing connection (e.g. made with net.Dial) or an incoming
 +	// connection (e.g. made with net.Listener.Accept())
- 	Connection(net.Conn, ...ConnectionSetting) (Connection, error)
++	Connection(net.Conn, ...ConnectionOption) (Connection, error)
 +}
 +
 +type container struct {
- 	id        string
- 	linkNames internal.IdCounter
++	id         string
++	tagCounter uint64
++}
++
++func (cont *container) nextTag() string {
++	return strconv.FormatUint(atomic.AddUint64(&cont.tagCounter, 1), 32)
 +}
 +
 +// NewContainer creates a new container. The id must be unique in your
 +// distributed application, all connections created by the container
 +// will have this container-id.
 +//
 +// If id == "" a random UUID will be generated for the id.
 +func NewContainer(id string) Container {
 +	if id == "" {
- 		id = internal.UUID4().String()
++		id = proton.UUID4().String()
 +	}
 +	cont := &container{id: id}
 +	return cont
 +}
 +
 +func (cont *container) Id() string { return cont.id }
 +
 +func (cont *container) nextLinkName() string {
- 	return cont.id + "@" + cont.linkNames.Next()
++	return cont.id + "@" + cont.nextTag()
 +}
 +
- func (cont *container) Connection(conn net.Conn, setting ...ConnectionSetting) (Connection, error) {
++func (cont *container) Connection(conn net.Conn, setting ...ConnectionOption) (Connection, error) {
 +	return newConnection(conn, cont, setting...)
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/doc.go
----------------------------------------------------------------------
diff --cc electron/doc.go
index eaa6e7a,0000000..46bde37
mode 100644,000000..100644
--- a/electron/doc.go
+++ b/electron/doc.go
@@@ -1,57 -1,0 +1,63 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +/*
 +Package electron is a procedural, concurrent-safe Go library for AMQP messaging.
 +You can write clients and servers using this library.
 +
 +Start by creating a Container with NewContainer. A Container represents a client
 +or server application that can contain many incoming or outgoing connections.
 +
 +Create connections with the standard Go 'net' package using net.Dial or
 +net.Listen. Create an AMQP connection over a net.Conn with
 +Container.Connection() and open it with Connection.Open().
 +
 +AMQP sends messages over "links". Each link has a Sender end and a Receiver
 +end. Connection.Sender() and Connection.Receiver() allow you to create links to
 +Send() and Receive() messages.
 +
 +You can create an AMQP server connection by calling Connection.Server() and
 +Connection.Listen() before calling Connection.Open(). A server connection can
 +negotiate protocol security details and can accept incoming links opened from
- the remote end of the connection
++the remote end of the connection.
++
 +*/
 +package electron
 +
 +//#cgo LDFLAGS: -lqpid-proton
 +import "C"
 +
 +// Just for package comment
 +
 +/* DEVELOPER NOTES
 +
 +There is a single proton.Engine per connection, each driving it's own event-loop goroutine,
 +and each with a 'handler'. Most state for a connection is maintained on the handler, and
- only accessed in the event-loop goroutine, so no locks are required.
++only accessed in the event-loop goroutine, so no locks are required there.
 +
 +The handler sets up channels as needed to get or send data from user goroutines
- using electron types like Sender or Receiver. We also use Engine.Inject to inject
- actions into the event loop from user goroutines.
++using electron types like Sender or Receiver.
++
++We also use Engine.Inject to inject actions into the event loop from user
++goroutines. It is important to check at the start of an injected function that
++required objects are still valid, for example a link may be remotely closed
++between the time a Sender function calls Inject and the time the injected
++function is execute by the handler goroutine. See comments in endpoint.go for more.
 +
 +*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/endpoint.go
----------------------------------------------------------------------
diff --cc electron/endpoint.go
index 745fd04,0000000..8cbeadb
mode 100644,000000..100644
--- a/electron/endpoint.go
+++ b/electron/endpoint.go
@@@ -1,68 -1,0 +1,94 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package electron
 +
 +import (
 +	"io"
- 	"qpid.apache.org/internal"
 +	"qpid.apache.org/proton"
 +)
 +
 +// Closed is an alias for io.EOF. It is returned as an error when an endpoint
 +// was closed cleanly.
 +var Closed = io.EOF
 +
 +// Endpoint is the common interface for Connection, Session, Link, Sender and Receiver.
 +//
 +// Endpoints can be created locally or by the remote peer. You must Open() an
 +// endpoint before you can use it. Some endpoints have additional Set*() methods
 +// that must be called before Open() to take effect, see Connection, Session,
 +// Link, Sender and Receiver for details.
 +//
 +type Endpoint interface {
 +	// Close an endpoint and signal an error to the remote end if error != nil.
 +	Close(error)
 +
 +	// String is a human readable identifier, useful for debugging and logging.
 +	String() string
 +
 +	// Error returns nil if the endpoint is open, otherwise returns an error.
 +	// Error() == Closed means the endpoint was closed without error.
 +	Error() error
 +
 +	// Connection containing the endpoint
 +	Connection() Connection
++
++	// Done returns a channel that will close when the endpoint closes.
++	// Error() will contain the reason.
++	Done() <-chan struct{}
 +}
 +
++// DEVELOPER NOTES
++//
++// An electron.Endpoint corresponds to a proton.Endpoint, which can be invalidated
++//
 +type endpoint struct {
- 	err internal.ErrorHolder
- 	str string // Must be set by the value that embeds endpoint.
++	err  proton.ErrorHolder
++	str  string // Must be set by the value that embeds endpoint.
++	done chan struct{}
++}
++
++func makeEndpoint(s string) endpoint { return endpoint{str: s, done: make(chan struct{})} }
++
++// Called in handler on a Closed event. Marks the endpoint as closed and the corresponding
++// proton.Endpoint pointer as invalid. Injected functions should check Error() to ensure
++// the pointer has not been invalidated.
++//
++// Returns the error stored on the endpoint, which may not be different to err if there was
++// already a n error
++func (e *endpoint) closed(err error) error {
++	e.err.Set(err)
++	e.err.Set(Closed)
++	close(e.done)
++	return e.err.Get()
 +}
 +
 +func (e *endpoint) String() string { return e.str }
- func (e *endpoint) Error() error   { return e.err.Get() }
 +
- // Call in proton goroutine to close an endpoint locally
++func (e *endpoint) Error() error { return e.err.Get() }
++
++func (e *endpoint) Done() <-chan struct{} { return e.done }
++
++// Call in proton goroutine to initiate closing an endpoint locally
 +// handler will complete the close when remote end closes.
 +func localClose(ep proton.Endpoint, err error) {
 +	if ep.State().LocalActive() {
 +		proton.CloseError(ep, err)
 +	}
 +}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/error.go
----------------------------------------------------------------------
diff --cc electron/error.go
index 0000000,0000000..4dcfd94
new file mode 100644
--- /dev/null
+++ b/electron/error.go
@@@ -1,0 -1,0 +1,35 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements.  See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership.  The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License.  You may obtain a copy of the License at
++
++  http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied.  See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package electron
++
++import (
++	"fmt"
++)
++
++// assert panics if condition is false with optional formatted message
++func assert(condition bool, format ...interface{}) {
++	if !condition {
++		if len(format) > 0 {
++			panic(fmt.Errorf(format[0].(string), format[1:]...))
++		} else {
++			panic(fmt.Errorf("assertion failed"))
++		}
++	}
++}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/handler.go
----------------------------------------------------------------------
diff --cc electron/handler.go
index b518e42,0000000..0237156
mode 100644,000000..100644
--- a/electron/handler.go
+++ b/electron/handler.go
@@@ -1,158 -1,0 +1,187 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +package electron
 +
 +import (
 +	"qpid.apache.org/amqp"
 +	"qpid.apache.org/proton"
 +)
 +
 +// NOTE: methods in this file are called only in the proton goroutine unless otherwise indicated.
 +
 +type handler struct {
 +	delegator    *proton.MessagingAdapter
 +	connection   *connection
 +	links        map[proton.Link]Link
- 	sentMessages map[proton.Delivery]*sentMessage
++	sentMessages map[proton.Delivery]sentMessage
 +	sessions     map[proton.Session]*session
 +}
 +
 +func newHandler(c *connection) *handler {
 +	h := &handler{
 +		connection:   c,
 +		links:        make(map[proton.Link]Link),
- 		sentMessages: make(map[proton.Delivery]*sentMessage),
++		sentMessages: make(map[proton.Delivery]sentMessage),
 +		sessions:     make(map[proton.Session]*session),
 +	}
 +	h.delegator = proton.NewMessagingAdapter(h)
 +	// Disable auto features of MessagingAdapter, we do these ourselves.
 +	h.delegator.Prefetch = 0
 +	h.delegator.AutoAccept = false
 +	h.delegator.AutoSettle = false
 +	h.delegator.AutoOpen = false
 +	return h
 +}
 +
- func (h *handler) internalError(fmt string, arg ...interface{}) {
- 	proton.CloseError(h.connection.eConnection, amqp.Errorf(amqp.InternalError, fmt, arg...))
++func (h *handler) linkError(l proton.Link, msg string) {
++	proton.CloseError(l, amqp.Errorf(amqp.InternalError, "%s for %s %s", msg, l.Type(), l))
 +}
 +
 +func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) {
 +	switch t {
 +
 +	case proton.MMessage:
 +		if r, ok := h.links[e.Link()].(*receiver); ok {
 +			r.message(e.Delivery())
 +		} else {
- 			h.internalError("no receiver for link %s", e.Link())
++			h.linkError(e.Link(), "no receiver")
 +		}
 +
 +	case proton.MSettled:
- 		if sm := h.sentMessages[e.Delivery()]; sm != nil {
- 			sm.settled(nil)
++		if sm, ok := h.sentMessages[e.Delivery()]; ok {
++			d := e.Delivery().Remote()
++			sm.ack <- Outcome{sentStatus(d.Type()), d.Condition().Error(), sm.value}
++			delete(h.sentMessages, e.Delivery())
 +		}
 +
 +	case proton.MSendable:
 +		if s, ok := h.links[e.Link()].(*sender); ok {
 +			s.sendable()
 +		} else {
- 			h.internalError("no receiver for link %s", e.Link())
++			h.linkError(e.Link(), "no sender")
 +		}
 +
 +	case proton.MSessionOpening:
 +		if e.Session().State().LocalUninit() { // Remotely opened
- 			incoming := &IncomingSession{h: h, pSession: e.Session()}
- 			h.connection.accept(incoming)
- 			if err := incoming.error(); err != nil {
- 				proton.CloseError(e.Session(), err)
- 			}
++			h.incoming(newIncomingSession(h, e.Session()))
 +		}
 +
 +	case proton.MSessionClosed:
- 		err := proton.EndpointError(e.Session())
- 		for l, _ := range h.links {
- 			if l.Session() == e.Session() {
- 				h.linkClosed(l, err)
- 			}
- 		}
- 		delete(h.sessions, e.Session())
++		h.sessionClosed(e.Session(), proton.EndpointError(e.Session()))
 +
 +	case proton.MLinkOpening:
 +		l := e.Link()
 +		if l.State().LocalActive() { // Already opened locally.
 +			break
 +		}
 +		ss := h.sessions[l.Session()]
 +		if ss == nil {
- 			h.internalError("no session for link %s", e.Link())
++			h.linkError(e.Link(), "no session")
 +			break
 +		}
- 		var incoming Incoming
 +		if l.IsReceiver() {
- 			incoming = &IncomingReceiver{makeIncomingLink(ss, l)}
++			h.incoming(&IncomingReceiver{makeIncomingLink(ss, l)})
 +		} else {
- 			incoming = &IncomingSender{makeIncomingLink(ss, l)}
- 		}
- 		h.connection.accept(incoming)
- 		if err := incoming.error(); err != nil {
- 			proton.CloseError(l, err)
- 			break
++			h.incoming(&IncomingSender{makeIncomingLink(ss, l)})
 +		}
 +
 +	case proton.MLinkClosing:
 +		e.Link().Close()
 +
 +	case proton.MLinkClosed:
 +		h.linkClosed(e.Link(), proton.EndpointError(e.Link()))
 +
 +	case proton.MConnectionClosing:
 +		h.connection.err.Set(e.Connection().RemoteCondition().Error())
 +
 +	case proton.MConnectionClosed:
- 		h.connection.err.Set(Closed) // If no error already set, this is an orderly close.
++		h.connectionClosed(proton.EndpointError(e.Connection()))
 +
 +	case proton.MDisconnected:
 +		h.connection.err.Set(e.Transport().Condition().Error())
 +		// If err not set at this point (e.g. to Closed) then this is unexpected.
 +		h.connection.err.Set(amqp.Errorf(amqp.IllegalState, "unexpected disconnect on %s", h.connection))
 +
 +		err := h.connection.Error()
++
 +		for l, _ := range h.links {
 +			h.linkClosed(l, err)
 +		}
++		h.links = nil
 +		for _, s := range h.sessions {
 +			s.closed(err)
 +		}
++		h.sessions = nil
 +		for _, sm := range h.sentMessages {
- 			sm.settled(err)
++			sm.ack <- Outcome{Unacknowledged, err, sm.value}
 +		}
++		h.sentMessages = nil
++	}
++}
++
++func (h *handler) incoming(in Incoming) {
++	var err error
++	if h.connection.incoming != nil {
++		h.connection.incoming <- in
++		err = in.wait()
++	} else {
++		err = amqp.Errorf(amqp.NotAllowed, "rejected incoming %s %s",
++			in.pEndpoint().Type(), in.pEndpoint().String())
++	}
++	if err == nil {
++		in.pEndpoint().Open()
++	} else {
++		proton.CloseError(in.pEndpoint(), err)
 +	}
 +}
 +
++func (h *handler) addLink(pl proton.Link, el Link) {
++	h.links[pl] = el
++}
++
 +func (h *handler) linkClosed(l proton.Link, err error) {
- 	if link := h.links[l]; link != nil {
++	if link, ok := h.links[l]; ok {
 +		link.closed(err)
 +		delete(h.links, l)
 +	}
 +}
 +
- func (h *handler) addLink(rl proton.Link, ll Link) {
- 	h.links[rl] = ll
++func (h *handler) sessionClosed(ps proton.Session, err error) {
++	if s, ok := h.sessions[ps]; ok {
++		delete(h.sessions, ps)
++		err = s.closed(err)
++		for l, _ := range h.links {
++			if l.Session() == ps {
++				h.linkClosed(l, err)
++			}
++		}
++	}
++}
++
++func (h *handler) connectionClosed(err error) {
++	err = h.connection.closed(err)
++	// Close links first to avoid repeated scans of the link list by sessions.
++	for l, _ := range h.links {
++		h.linkClosed(l, err)
++	}
++	for s, _ := range h.sessions {
++		h.sessionClosed(s, err)
++	}
 +}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org