You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2015/10/23 16:36:50 UTC

[48/50] [abbrv] qpid-proton git commit: Merge branch 'master' into go1 - 0.11 alpa

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/electron/container.go
----------------------------------------------------------------------
diff --cc electron/container.go
index 0000000,0000000..7bbc4b0
new file mode 100644
--- /dev/null
+++ b/electron/container.go
@@@ -1,0 -1,0 +1,71 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements.  See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership.  The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License.  You may obtain a copy of the License at
++
++  http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied.  See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package electron
++
++import (
++	"net"
++	"qpid.apache.org/internal"
++)
++
++// Container is an AMQP container, it represents a single AMQP "application".It
++// provides functions to create new Connections to remote containers.
++//
++// Create with NewContainer()
++//
++type Container interface {
++	// Id is a unique identifier for the container in your distributed application.
++	Id() string
++
++	// Create a new AMQP Connection over the supplied net.Conn connection.
++	//
++	// You must call Connection.Open() on the returned Connection, after
++	// setting any Connection properties you need to set. Note the net.Conn
++	// can be an outgoing connection (e.g. made with net.Dial) or an incoming
++	// connection (e.g. made with net.Listener.Accept())
++	Connection(net.Conn, ...ConnectionSetting) (Connection, error)
++}
++
++type container struct {
++	id        string
++	linkNames internal.IdCounter
++}
++
++// NewContainer creates a new container. The id must be unique in your
++// distributed application, all connections created by the container
++// will have this container-id.
++//
++// If id == "" a random UUID will be generated for the id.
++func NewContainer(id string) Container {
++	if id == "" {
++		id = internal.UUID4().String()
++	}
++	cont := &container{id: id}
++	return cont
++}
++
++func (cont *container) Id() string { return cont.id }
++
++func (cont *container) nextLinkName() string {
++	return cont.id + "@" + cont.linkNames.Next()
++}
++
++func (cont *container) Connection(conn net.Conn, setting ...ConnectionSetting) (Connection, error) {
++	return newConnection(conn, cont, setting...)
++}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/electron/doc.go
----------------------------------------------------------------------
diff --cc electron/doc.go
index 0000000,0000000..eaa6e7a
new file mode 100644
--- /dev/null
+++ b/electron/doc.go
@@@ -1,0 -1,0 +1,57 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements.  See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership.  The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License.  You may obtain a copy of the License at
++
++  http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied.  See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++/*
++Package electron is a procedural, concurrent-safe Go library for AMQP messaging.
++You can write clients and servers using this library.
++
++Start by creating a Container with NewContainer. A Container represents a client
++or server application that can contain many incoming or outgoing connections.
++
++Create connections with the standard Go 'net' package using net.Dial or
++net.Listen. Create an AMQP connection over a net.Conn with
++Container.Connection() and open it with Connection.Open().
++
++AMQP sends messages over "links". Each link has a Sender end and a Receiver
++end. Connection.Sender() and Connection.Receiver() allow you to create links to
++Send() and Receive() messages.
++
++You can create an AMQP server connection by calling Connection.Server() and
++Connection.Listen() before calling Connection.Open(). A server connection can
++negotiate protocol security details and can accept incoming links opened from
++the remote end of the connection
++*/
++package electron
++
++//#cgo LDFLAGS: -lqpid-proton
++import "C"
++
++// Just for package comment
++
++/* DEVELOPER NOTES
++
++There is a single proton.Engine per connection, each driving it's own event-loop goroutine,
++and each with a 'handler'. Most state for a connection is maintained on the handler, and
++only accessed in the event-loop goroutine, so no locks are required.
++
++The handler sets up channels as needed to get or send data from user goroutines
++using electron types like Sender or Receiver. We also use Engine.Inject to inject
++actions into the event loop from user goroutines.
++
++*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/electron/endpoint.go
----------------------------------------------------------------------
diff --cc electron/endpoint.go
index 0000000,0000000..745fd04
new file mode 100644
--- /dev/null
+++ b/electron/endpoint.go
@@@ -1,0 -1,0 +1,68 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements.  See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership.  The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License.  You may obtain a copy of the License at
++
++  http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied.  See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package electron
++
++import (
++	"io"
++	"qpid.apache.org/internal"
++	"qpid.apache.org/proton"
++)
++
++// Closed is an alias for io.EOF. It is returned as an error when an endpoint
++// was closed cleanly.
++var Closed = io.EOF
++
++// Endpoint is the common interface for Connection, Session, Link, Sender and Receiver.
++//
++// Endpoints can be created locally or by the remote peer. You must Open() an
++// endpoint before you can use it. Some endpoints have additional Set*() methods
++// that must be called before Open() to take effect, see Connection, Session,
++// Link, Sender and Receiver for details.
++//
++type Endpoint interface {
++	// Close an endpoint and signal an error to the remote end if error != nil.
++	Close(error)
++
++	// String is a human readable identifier, useful for debugging and logging.
++	String() string
++
++	// Error returns nil if the endpoint is open, otherwise returns an error.
++	// Error() == Closed means the endpoint was closed without error.
++	Error() error
++
++	// Connection containing the endpoint
++	Connection() Connection
++}
++
++type endpoint struct {
++	err internal.ErrorHolder
++	str string // Must be set by the value that embeds endpoint.
++}
++
++func (e *endpoint) String() string { return e.str }
++func (e *endpoint) Error() error   { return e.err.Get() }
++
++// Call in proton goroutine to close an endpoint locally
++// handler will complete the close when remote end closes.
++func localClose(ep proton.Endpoint, err error) {
++	if ep.State().LocalActive() {
++		proton.CloseError(ep, err)
++	}
++}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/electron/handler.go
----------------------------------------------------------------------
diff --cc electron/handler.go
index 0000000,0000000..b518e42
new file mode 100644
--- /dev/null
+++ b/electron/handler.go
@@@ -1,0 -1,0 +1,158 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements.  See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership.  The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License.  You may obtain a copy of the License at
++
++  http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied.  See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package electron
++
++import (
++	"qpid.apache.org/amqp"
++	"qpid.apache.org/proton"
++)
++
++// NOTE: methods in this file are called only in the proton goroutine unless otherwise indicated.
++
++type handler struct {
++	delegator    *proton.MessagingAdapter
++	connection   *connection
++	links        map[proton.Link]Link
++	sentMessages map[proton.Delivery]*sentMessage
++	sessions     map[proton.Session]*session
++}
++
++func newHandler(c *connection) *handler {
++	h := &handler{
++		connection:   c,
++		links:        make(map[proton.Link]Link),
++		sentMessages: make(map[proton.Delivery]*sentMessage),
++		sessions:     make(map[proton.Session]*session),
++	}
++	h.delegator = proton.NewMessagingAdapter(h)
++	// Disable auto features of MessagingAdapter, we do these ourselves.
++	h.delegator.Prefetch = 0
++	h.delegator.AutoAccept = false
++	h.delegator.AutoSettle = false
++	h.delegator.AutoOpen = false
++	return h
++}
++
++func (h *handler) internalError(fmt string, arg ...interface{}) {
++	proton.CloseError(h.connection.eConnection, amqp.Errorf(amqp.InternalError, fmt, arg...))
++}
++
++func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) {
++	switch t {
++
++	case proton.MMessage:
++		if r, ok := h.links[e.Link()].(*receiver); ok {
++			r.message(e.Delivery())
++		} else {
++			h.internalError("no receiver for link %s", e.Link())
++		}
++
++	case proton.MSettled:
++		if sm := h.sentMessages[e.Delivery()]; sm != nil {
++			sm.settled(nil)
++		}
++
++	case proton.MSendable:
++		if s, ok := h.links[e.Link()].(*sender); ok {
++			s.sendable()
++		} else {
++			h.internalError("no receiver for link %s", e.Link())
++		}
++
++	case proton.MSessionOpening:
++		if e.Session().State().LocalUninit() { // Remotely opened
++			incoming := &IncomingSession{h: h, pSession: e.Session()}
++			h.connection.accept(incoming)
++			if err := incoming.error(); err != nil {
++				proton.CloseError(e.Session(), err)
++			}
++		}
++
++	case proton.MSessionClosed:
++		err := proton.EndpointError(e.Session())
++		for l, _ := range h.links {
++			if l.Session() == e.Session() {
++				h.linkClosed(l, err)
++			}
++		}
++		delete(h.sessions, e.Session())
++
++	case proton.MLinkOpening:
++		l := e.Link()
++		if l.State().LocalActive() { // Already opened locally.
++			break
++		}
++		ss := h.sessions[l.Session()]
++		if ss == nil {
++			h.internalError("no session for link %s", e.Link())
++			break
++		}
++		var incoming Incoming
++		if l.IsReceiver() {
++			incoming = &IncomingReceiver{makeIncomingLink(ss, l)}
++		} else {
++			incoming = &IncomingSender{makeIncomingLink(ss, l)}
++		}
++		h.connection.accept(incoming)
++		if err := incoming.error(); err != nil {
++			proton.CloseError(l, err)
++			break
++		}
++
++	case proton.MLinkClosing:
++		e.Link().Close()
++
++	case proton.MLinkClosed:
++		h.linkClosed(e.Link(), proton.EndpointError(e.Link()))
++
++	case proton.MConnectionClosing:
++		h.connection.err.Set(e.Connection().RemoteCondition().Error())
++
++	case proton.MConnectionClosed:
++		h.connection.err.Set(Closed) // If no error already set, this is an orderly close.
++
++	case proton.MDisconnected:
++		h.connection.err.Set(e.Transport().Condition().Error())
++		// If err not set at this point (e.g. to Closed) then this is unexpected.
++		h.connection.err.Set(amqp.Errorf(amqp.IllegalState, "unexpected disconnect on %s", h.connection))
++
++		err := h.connection.Error()
++		for l, _ := range h.links {
++			h.linkClosed(l, err)
++		}
++		for _, s := range h.sessions {
++			s.closed(err)
++		}
++		for _, sm := range h.sentMessages {
++			sm.settled(err)
++		}
++	}
++}
++
++func (h *handler) linkClosed(l proton.Link, err error) {
++	if link := h.links[l]; link != nil {
++		link.closed(err)
++		delete(h.links, l)
++	}
++}
++
++func (h *handler) addLink(rl proton.Link, ll Link) {
++	h.links[rl] = ll
++}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/electron/link.go
----------------------------------------------------------------------
diff --cc electron/link.go
index 0000000,0000000..4bef53b
new file mode 100644
--- /dev/null
+++ b/electron/link.go
@@@ -1,0 -1,0 +1,247 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements.  See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership.  The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License.  You may obtain a copy of the License at
++
++  http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied.  See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package electron
++
++import (
++	"qpid.apache.org/internal"
++	"qpid.apache.org/proton"
++)
++
++// Link is the common interface for AMQP links. Sender and Receiver provide
++// more methods for the sending or receiving end of a link respectively.
++type Link interface {
++	Endpoint
++
++	// Source address that messages are coming from.
++	Source() string
++
++	// Target address that messages are going to.
++	Target() string
++
++	// Name is a unique name for the link among links between the same
++	// containers in the same direction. By default generated automatically.
++	LinkName() string
++
++	// IsSender is true if this is the sending end of the link.
++	IsSender() bool
++
++	// IsReceiver is true if this is the receiving end of the link.
++	IsReceiver() bool
++
++	// SndSettle defines when the sending end of the link settles message delivery.
++	SndSettle() SndSettleMode
++
++	// RcvSettle defines when the sending end of the link settles message delivery.
++	RcvSettle() RcvSettleMode
++
++	// Session containing the Link
++	Session() Session
++
++	// Called in event loop on closed event.
++	closed(err error)
++	// Called to open a link (local or accepted incoming link)
++	open()
++}
++
++// LinkSetting can be passed when creating a sender or receiver.
++// See functions that return LinkSetting for details
++type LinkSetting func(*link)
++
++// Source sets address that messages are coming from.
++func Source(s string) LinkSetting { return func(l *link) { l.source = s } }
++
++// Target sets address that messages are going to.
++func Target(s string) LinkSetting { return func(l *link) { l.target = s } }
++
++// LinkName sets the link name.
++func LinkName(s string) LinkSetting { return func(l *link) { l.target = s } }
++
++// SndSettle sets the send settle mode
++func SndSettle(m SndSettleMode) LinkSetting { return func(l *link) { l.sndSettle = m } }
++
++// RcvSettle sets the send settle mode
++func RcvSettle(m RcvSettleMode) LinkSetting { return func(l *link) { l.rcvSettle = m } }
++
++// SndSettleMode defines when the sending end of the link settles message delivery.
++type SndSettleMode proton.SndSettleMode
++
++// Capacity sets the link capacity
++func Capacity(n int) LinkSetting { return func(l *link) { l.capacity = n } }
++
++// Prefetch sets a receivers pre-fetch flag. Not relevant for a sender.
++func Prefetch(p bool) LinkSetting { return func(l *link) { l.prefetch = p } }
++
++// AtMostOnce sets "fire and forget" mode, messages are sent but no
++// acknowledgment is received, messages can be lost if there is a network
++// failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst
++func AtMostOnce() LinkSetting {
++	return func(l *link) {
++		SndSettle(SndSettled)(l)
++		RcvSettle(RcvFirst)(l)
++	}
++}
++
++// AtLeastOnce requests acknowledgment for every message, acknowledgment
++// indicates the message was definitely received. In the event of a
++// failure, unacknowledged messages can be re-sent but there is a chance
++// that the message will be received twice in this case.
++// Sets SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst
++func AtLeastOnce() LinkSetting {
++	return func(l *link) {
++		SndSettle(SndUnsettled)(l)
++		RcvSettle(RcvFirst)(l)
++	}
++}
++
++const (
++	// Messages are sent unsettled
++	SndUnsettled = SndSettleMode(proton.SndUnsettled)
++	// Messages are sent already settled
++	SndSettled = SndSettleMode(proton.SndSettled)
++	// Sender can send either unsettled or settled messages.
++	SendMixed = SndSettleMode(proton.SndMixed)
++)
++
++// RcvSettleMode defines when the receiving end of the link settles message delivery.
++type RcvSettleMode proton.RcvSettleMode
++
++const (
++	// Receiver settles first.
++	RcvFirst = RcvSettleMode(proton.RcvFirst)
++	// Receiver waits for sender to settle before settling.
++	RcvSecond = RcvSettleMode(proton.RcvSecond)
++)
++
++type link struct {
++	endpoint
++
++	// Link settings.
++	source    string
++	target    string
++	linkName  string
++	isSender  bool
++	sndSettle SndSettleMode
++	rcvSettle RcvSettleMode
++	capacity  int
++	prefetch  bool
++
++	session *session
++	eLink   proton.Link
++	done    chan struct{} // Closed when link is closed
++}
++
++func (l *link) Source() string           { return l.source }
++func (l *link) Target() string           { return l.target }
++func (l *link) LinkName() string         { return l.linkName }
++func (l *link) IsSender() bool           { return l.isSender }
++func (l *link) IsReceiver() bool         { return !l.isSender }
++func (l *link) SndSettle() SndSettleMode { return l.sndSettle }
++func (l *link) RcvSettle() RcvSettleMode { return l.rcvSettle }
++func (l *link) Session() Session         { return l.session }
++func (l *link) Connection() Connection   { return l.session.Connection() }
++
++func (l *link) engine() *proton.Engine { return l.session.connection.engine }
++func (l *link) handler() *handler      { return l.session.connection.handler }
++
++// Set up link fields and open the proton.Link
++func localLink(sn *session, isSender bool, setting ...LinkSetting) (link, error) {
++	l := link{
++		session:  sn,
++		isSender: isSender,
++		capacity: 1,
++		prefetch: false,
++		done:     make(chan struct{}),
++	}
++	for _, set := range setting {
++		set(&l)
++	}
++	if l.linkName == "" {
++		l.linkName = l.session.connection.container.nextLinkName()
++	}
++	if l.IsSender() {
++		l.eLink = l.session.eSession.Sender(l.linkName)
++	} else {
++		l.eLink = l.session.eSession.Receiver(l.linkName)
++	}
++	if l.eLink.IsNil() {
++		l.err.Set(internal.Errorf("cannot create link %s", l))
++		return l, l.err.Get()
++	}
++	l.eLink.Source().SetAddress(l.source)
++	l.eLink.Target().SetAddress(l.target)
++	l.eLink.SetSndSettleMode(proton.SndSettleMode(l.sndSettle))
++	l.eLink.SetRcvSettleMode(proton.RcvSettleMode(l.rcvSettle))
++	l.str = l.eLink.String()
++	l.eLink.Open()
++	return l, nil
++}
++
++type incomingLink struct {
++	incoming
++	link
++}
++
++// Set up a link from an incoming proton.Link.
++func makeIncomingLink(sn *session, eLink proton.Link) incomingLink {
++	l := incomingLink{
++		link: link{
++			session:   sn,
++			isSender:  eLink.IsSender(),
++			eLink:     eLink,
++			source:    eLink.RemoteSource().Address(),
++			target:    eLink.RemoteTarget().Address(),
++			linkName:  eLink.Name(),
++			sndSettle: SndSettleMode(eLink.RemoteSndSettleMode()),
++			rcvSettle: RcvSettleMode(eLink.RemoteRcvSettleMode()),
++			capacity:  1,
++			prefetch:  false,
++			done:      make(chan struct{}),
++		},
++	}
++	l.str = eLink.String()
++	return l
++}
++
++// Called in proton goroutine on closed or disconnected
++func (l *link) closed(err error) {
++	l.err.Set(err)
++	l.err.Set(Closed) // If no error set, mark as closed.
++	close(l.done)
++}
++
++// Not part of Link interface but use by Sender and Receiver.
++func (l *link) Credit() (credit int, err error) {
++	err = l.engine().InjectWait(func() error {
++		credit = l.eLink.Credit()
++		return nil
++	})
++	return
++}
++
++// Not part of Link interface but use by Sender and Receiver.
++func (l *link) Capacity() int { return l.capacity }
++
++func (l *link) Close(err error) {
++	l.engine().Inject(func() { localClose(l.eLink, err) })
++}
++
++func (l *link) open() {
++	l.eLink.Open()
++}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/electron/messaging_test.go
----------------------------------------------------------------------
diff --cc electron/messaging_test.go
index 0000000,0000000..36b0c24
new file mode 100644
--- /dev/null
+++ b/electron/messaging_test.go
@@@ -1,0 -1,0 +1,416 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements.  See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership.  The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License.  You may obtain a copy of the License at
++
++  http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied.  See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package electron
++
++import (
++	"fmt"
++	"net"
++	"path"
++	"qpid.apache.org/amqp"
++	"runtime"
++	"testing"
++	"time"
++)
++
++func fatalIf(t *testing.T, err error) {
++	if err != nil {
++		_, file, line, ok := runtime.Caller(1) // annotate with location of caller.
++		if ok {
++			_, file = path.Split(file)
++		}
++		t.Fatalf("(from %s:%d) %v", file, line, err)
++	}
++}
++
++// Start a server, return listening addr and channel for incoming Connection.
++func newServer(t *testing.T, cont Container, accept func(Endpoint) error) (net.Addr, <-chan Connection) {
++	listener, err := net.Listen("tcp", "")
++	fatalIf(t, err)
++	addr := listener.Addr()
++	ch := make(chan Connection)
++	go func() {
++		conn, err := listener.Accept()
++		c, err := cont.Connection(conn)
++		fatalIf(t, err)
++		c.Server()
++		c.Listen(accept)
++		fatalIf(t, c.Open())
++		ch <- c
++	}()
++	return addr, ch
++}
++
++// Return open an client connection and session, return the session.
++func newClient(t *testing.T, cont Container, addr net.Addr) Session {
++	conn, err := net.Dial(addr.Network(), addr.String())
++	fatalIf(t, err)
++	c, err := cont.Connection(conn)
++	fatalIf(t, err)
++	c.Open()
++	sn, err := c.Session()
++	fatalIf(t, err)
++	return sn
++}
++
++// Return client and server ends of the same connection.
++func newClientServer(t *testing.T, accept func(Endpoint) error) (client Session, server Connection) {
++	addr, ch := newServer(t, NewContainer(""), accept)
++	client = newClient(t, NewContainer(""), addr)
++	return client, <-ch
++}
++
++// Close client and server
++func closeClientServer(client Session, server Connection) {
++	client.Connection().Close(nil)
++	server.Close(nil)
++}
++
++// Send a message one way with a client sender and server receiver, verify ack.
++func TestClientSendServerReceive(t *testing.T) {
++	timeout := time.Second * 2
++	nLinks := 3
++	nMessages := 3
++
++	rchan := make(chan Receiver, nLinks)
++	client, server := newClientServer(t, func(ep Endpoint) error {
++		if r, ok := ep.(Receiver); ok {
++			r.SetCapacity(1, false)
++			rchan <- r
++		}
++		return nil
++	})
++
++	defer func() {
++		closeClientServer(client, server)
++	}()
++
++	s := make([]Sender, nLinks)
++	for i := 0; i < nLinks; i++ {
++		var err error
++		s[i], err = client.Sender(Target(fmt.Sprintf("foo%d", i)))
++		if err != nil {
++			t.Fatal(err)
++		}
++	}
++	r := make([]Receiver, nLinks)
++	for i := 0; i < nLinks; i++ {
++		r[i] = <-rchan
++	}
++
++	for i := 0; i < nLinks; i++ {
++		for j := 0; j < nMessages; j++ {
++			var sm SentMessage
++
++			// Client send
++			sendDone := make(chan struct{})
++			go func() {
++				defer close(sendDone)
++				m := amqp.NewMessageWith(fmt.Sprintf("foobar%v-%v", i, j))
++				var err error
++				sm, err = s[i].Send(m)
++				if err != nil {
++					t.Fatal(err)
++				}
++			}()
++
++			// Server recieve
++			rm, err := r[i].Receive()
++			if err != nil {
++				t.Fatal(err)
++			}
++			if want, got := interface{}(fmt.Sprintf("foobar%v-%v", i, j)), rm.Message.Body(); want != got {
++				t.Errorf("%#v != %#v", want, got)
++			}
++
++			// Should not be acknowledged on client yet
++			<-sendDone
++			if d, err := sm.DispositionTimeout(0); err != Timeout || NoDisposition != d {
++				t.Errorf("want [no-disposition/timeout] got [%v/%v]", d, err)
++			}
++			// Server ack
++			if err := rm.Acknowledge(Rejected); err != nil {
++				t.Error(err)
++			}
++			// Client get ack.
++			if d, err := sm.DispositionTimeout(timeout); err != nil || Rejected != d {
++				t.Errorf("want [rejected/nil] got [%v/%v]", d, err)
++			}
++		}
++	}
++}
++
++func TestClientReceiver(t *testing.T) {
++	nMessages := 3
++	client, server := newClientServer(t, func(ep Endpoint) error {
++		if s, ok := ep.(Sender); ok {
++			go func() {
++				for i := int32(0); i < int32(nMessages); i++ {
++					sm, err := s.Send(amqp.NewMessageWith(i))
++					if err != nil {
++						t.Error(err)
++						return
++					} else {
++						sm.Disposition() // Sync send.
++					}
++				}
++				s.Close(nil)
++			}()
++		}
++		return nil
++	})
++
++	r, err := client.Receiver(Source("foo"))
++	if err != nil {
++		t.Fatal(err)
++	}
++	for i := int32(0); i < int32(nMessages); i++ {
++		rm, err := r.Receive()
++		if err != nil {
++			if err != Closed {
++				t.Error(err)
++			}
++			break
++		}
++		if err := rm.Accept(); err != nil {
++			t.Error(err)
++		}
++		if b, ok := rm.Message.Body().(int32); !ok || b != i {
++			t.Errorf("want %v, true got %v, %v", i, b, ok)
++		}
++	}
++	server.Close(nil)
++	client.Connection().Close(nil)
++}
++
++// Test timeout versions of waiting functions.
++func TestTimeouts(t *testing.T) {
++	var err error
++	rchan := make(chan Receiver, 1)
++	client, server := newClientServer(t, func(ep Endpoint) error {
++		if r, ok := ep.(Receiver); ok {
++			r.SetCapacity(1, false) // Issue credit only on receive
++			rchan <- r
++		}
++		return nil
++	})
++	defer func() { closeClientServer(client, server) }()
++
++	// Open client sender
++	snd, err := client.Sender(Target("test"))
++	if err != nil {
++		t.Fatal(err)
++	}
++	rcv := <-rchan
++
++	// Test send with timeout
++	short := time.Millisecond
++	long := time.Second
++	m := amqp.NewMessage()
++	if _, err = snd.SendTimeout(m, 0); err != Timeout { // No credit, expect timeout.
++		t.Error("want Timeout got", err)
++	}
++	if _, err = snd.SendTimeout(m, short); err != Timeout { // No credit, expect timeout.
++		t.Error("want Timeout got", err)
++	}
++	// Test receive with timeout
++	if _, err = rcv.ReceiveTimeout(0); err != Timeout { // No credit, expect timeout.
++		t.Error("want Timeout got", err)
++	}
++	// Test receive with timeout
++	if _, err = rcv.ReceiveTimeout(short); err != Timeout { // No credit, expect timeout.
++		t.Error("want Timeout got", err)
++	}
++	// There is now a credit on the link due to receive
++	sm, err := snd.SendTimeout(m, long)
++	if err != nil {
++		t.Fatal(err)
++	}
++	// Disposition should timeout
++	if _, err = sm.DispositionTimeout(long); err != Timeout {
++		t.Error("want Timeout got", err)
++	}
++	if _, err = sm.DispositionTimeout(short); err != Timeout {
++		t.Error("want Timeout got", err)
++	}
++	// Receive and accept
++	rm, err := rcv.ReceiveTimeout(long)
++	if err != nil {
++		t.Fatal(err)
++	}
++	rm.Accept()
++	// Sender get ack
++	d, err := sm.DispositionTimeout(long)
++	if err != nil || d != Accepted {
++		t.Errorf("want (rejected, nil) got (%v, %v)", d, err)
++	}
++}
++
++// clientServer that returns sender/receiver pairs at opposite ends of link.
++type pairs struct {
++	t      *testing.T
++	client Session
++	server Connection
++	rchan  chan Receiver
++	schan  chan Sender
++}
++
++func newPairs(t *testing.T) *pairs {
++	p := &pairs{t: t, rchan: make(chan Receiver, 1), schan: make(chan Sender, 1)}
++	p.client, p.server = newClientServer(t, func(ep Endpoint) error {
++		switch ep := ep.(type) {
++		case Receiver:
++			ep.SetCapacity(1, false)
++			p.rchan <- ep
++		case Sender:
++			p.schan <- ep
++		}
++		return nil
++	})
++	return p
++}
++
++func (p *pairs) close() {
++	closeClientServer(p.client, p.server)
++}
++
++func (p *pairs) senderReceiver() (Sender, Receiver) {
++	snd, err := p.client.Sender()
++	fatalIf(p.t, err)
++	rcv := <-p.rchan
++	return snd, rcv
++}
++
++func (p *pairs) receiverSender() (Receiver, Sender) {
++	rcv, err := p.client.Receiver()
++	fatalIf(p.t, err)
++	snd := <-p.schan
++	return rcv, snd
++}
++
++type result struct {
++	label string
++	err   error
++}
++
++func (r result) String() string { return fmt.Sprintf("%s(%s)", r.err, r.label) }
++
++func doSend(snd Sender, results chan result) {
++	_, err := snd.Send(amqp.NewMessage())
++	results <- result{"send", err}
++}
++
++func doReceive(rcv Receiver, results chan result) {
++	_, err := rcv.Receive()
++	results <- result{"receive", err}
++}
++
++func doDisposition(sm SentMessage, results chan result) {
++	_, err := sm.Disposition()
++	results <- result{"disposition", err}
++}
++
++// Test that closing Links interrupts blocked link functions.
++func TestLinkCloseInterrupt(t *testing.T) {
++	want := amqp.Errorf("x", "all bad")
++	pairs := newPairs(t)
++	results := make(chan result) // Collect expected errors
++
++	// Sender.Close() interrupts Send()
++	snd, rcv := pairs.senderReceiver()
++	go doSend(snd, results)
++	snd.Close(want)
++	if r := <-results; want != r.err {
++		t.Errorf("want %#v got %#v", want, r)
++	}
++
++	// Remote Receiver.Close() interrupts Send()
++	snd, rcv = pairs.senderReceiver()
++	go doSend(snd, results)
++	rcv.Close(want)
++	if r := <-results; want != r.err {
++		t.Errorf("want %#v got %#v", want, r)
++	}
++
++	// Receiver.Close() interrupts Receive()
++	snd, rcv = pairs.senderReceiver()
++	go doReceive(rcv, results)
++	rcv.Close(want)
++	if r := <-results; want != r.err {
++		t.Errorf("want %#v got %#v", want, r)
++	}
++
++	// Remote Sender.Close() interrupts Receive()
++	snd, rcv = pairs.senderReceiver()
++	go doReceive(rcv, results)
++	snd.Close(want)
++	if r := <-results; want != r.err {
++		t.Errorf("want %#v got %#v", want, r)
++	}
++}
++
++// Test closing the server end of a connection.
++func TestConnectionCloseInterrupt1(t *testing.T) {
++	want := amqp.Errorf("x", "bad")
++	pairs := newPairs(t)
++	results := make(chan result) // Collect expected errors
++
++	// Connection.Close() interrupts Send, Receive, Disposition.
++	snd, rcv := pairs.senderReceiver()
++	go doReceive(rcv, results)
++	sm, err := snd.Send(amqp.NewMessage())
++	fatalIf(t, err)
++	go doDisposition(sm, results)
++	snd, rcv = pairs.senderReceiver()
++	go doSend(snd, results)
++	rcv, snd = pairs.receiverSender()
++	go doReceive(rcv, results)
++	pairs.server.Close(want)
++	for i := 0; i < 3; i++ {
++		if r := <-results; want != r.err {
++			// TODO aconway 2015-10-06: Not propagating the correct error, seeing nil and EOF.
++			t.Logf("want %v got %v", want, r)
++		}
++	}
++}
++
++// Test closing the client end of the connection.
++func TestConnectionCloseInterrupt2(t *testing.T) {
++	want := amqp.Errorf("x", "bad")
++	pairs := newPairs(t)
++	results := make(chan result) // Collect expected errors
++
++	// Connection.Close() interrupts Send, Receive, Disposition.
++	snd, rcv := pairs.senderReceiver()
++	go doReceive(rcv, results)
++	sm, err := snd.Send(amqp.NewMessage())
++	fatalIf(t, err)
++	go doDisposition(sm, results)
++	snd, rcv = pairs.senderReceiver()
++	go doSend(snd, results)
++	rcv, snd = pairs.receiverSender()
++	go doReceive(rcv, results)
++	pairs.client.Close(want)
++	for i := 0; i < 3; i++ {
++		if r := <-results; want != r.err {
++			// TODO aconway 2015-10-06: Not propagating the correct error, seeing nil.
++			t.Logf("want %v got %v", want, r)
++		}
++	}
++}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/electron/receiver.go
----------------------------------------------------------------------
diff --cc electron/receiver.go
index 0000000,0000000..59ac018
new file mode 100644
--- /dev/null
+++ b/electron/receiver.go
@@@ -1,0 -1,0 +1,238 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements.  See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership.  The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License.  You may obtain a copy of the License at
++
++  http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied.  See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package electron
++
++import (
++	"qpid.apache.org/amqp"
++	"qpid.apache.org/internal"
++	"qpid.apache.org/proton"
++	"time"
++)
++
++// Receiver is a Link that receives messages.
++//
++type Receiver interface {
++	Link
++
++	// Receive blocks until a message is available or until the Receiver is closed
++	// and has no more buffered messages.
++	Receive() (ReceivedMessage, error)
++
++	// ReceiveTimeout is like Receive but gives up after timeout, see Timeout.
++	//
++	// Note that that if Prefetch is false, after a Timeout the credit issued by
++	// Receive remains on the link. It will be used by the next call to Receive.
++	ReceiveTimeout(timeout time.Duration) (ReceivedMessage, error)
++
++	// Prefetch==true means the Receiver will automatically issue credit to the
++	// remote sender to keep its buffer as full as possible, i.e. it will
++	// "pre-fetch" messages independently of the application calling
++	// Receive(). This gives good throughput for applications that handle a
++	// continuous stream of messages. Larger capacity may improve throughput, the
++	// optimal value depends on the characteristics of your application.
++	//
++	// Prefetch==false means the Receiver will issue only issue credit when you
++	// call Receive(), and will only issue enough credit to satisfy the calls
++	// actually made. This gives lower throughput but will not fetch any messages
++	// in advance. It is good for synchronous applications that need to evaluate
++	// each message before deciding whether to receive another. The
++	// request-response pattern is a typical example.  If you make concurrent
++	// calls to Receive with pre-fetch disabled, you can improve performance by
++	// setting the capacity close to the expected number of concurrent calls.
++	//
++	Prefetch() bool
++
++	// Capacity is the size (number of messages) of the local message buffer
++	// These are messages received but not yet returned to the application by a call to Receive()
++	Capacity() int
++}
++
++// Flow control policy for a receiver.
++type policy interface {
++	// Called at the start of Receive() to adjust credit before fetching a message.
++	Pre(*receiver)
++	// Called after Receive() has received a message from Buffer() before it returns.
++	// Non-nil error means no message was received because of an error.
++	Post(*receiver, error)
++}
++
++type prefetchPolicy struct{}
++
++func (p prefetchPolicy) Flow(r *receiver) {
++	r.engine().Inject(func() {
++		_, _, max := r.credit()
++		if max > 0 {
++			r.eLink.Flow(max)
++		}
++	})
++}
++func (p prefetchPolicy) Pre(r *receiver) { p.Flow(r) }
++func (p prefetchPolicy) Post(r *receiver, err error) {
++	if err == nil {
++		p.Flow(r)
++	}
++}
++
++type noPrefetchPolicy struct{ waiting int }
++
++func (p noPrefetchPolicy) Flow(r *receiver) { // Not called in proton goroutine
++	r.engine().Inject(func() {
++		len, credit, max := r.credit()
++		add := p.waiting - (len + credit)
++		if add > max {
++			add = max // Don't overflow
++		}
++		if add > 0 {
++			r.eLink.Flow(add)
++		}
++	})
++}
++func (p noPrefetchPolicy) Pre(r *receiver) { p.waiting++; p.Flow(r) }
++func (p noPrefetchPolicy) Post(r *receiver, err error) {
++	p.waiting--
++	if err == nil {
++		p.Flow(r)
++	}
++}
++
++// Receiver implementation
++type receiver struct {
++	link
++	buffer chan ReceivedMessage
++	policy policy
++}
++
++func newReceiver(l link) *receiver {
++	r := &receiver{link: l}
++	if r.capacity < 1 {
++		r.capacity = 1
++	}
++	if r.prefetch {
++		r.policy = &prefetchPolicy{}
++	} else {
++		r.policy = &noPrefetchPolicy{}
++	}
++	r.buffer = make(chan ReceivedMessage, r.capacity)
++	r.handler().addLink(r.eLink, r)
++	r.link.open()
++	return r
++}
++
++// call in proton goroutine.
++func (r *receiver) credit() (buffered, credit, max int) {
++	return len(r.buffer), r.eLink.Credit(), cap(r.buffer) - len(r.buffer)
++}
++
++func (r *receiver) Capacity() int  { return cap(r.buffer) }
++func (r *receiver) Prefetch() bool { return r.prefetch }
++
++func (r *receiver) Receive() (rm ReceivedMessage, err error) {
++	return r.ReceiveTimeout(Forever)
++}
++
++func (r *receiver) ReceiveTimeout(timeout time.Duration) (rm ReceivedMessage, err error) {
++	internal.Assert(r.buffer != nil, "Receiver is not open: %s", r)
++	r.policy.Pre(r)
++	defer func() { r.policy.Post(r, err) }()
++	rmi, err := timedReceive(r.buffer, timeout)
++	switch err {
++	case Timeout:
++		return ReceivedMessage{}, Timeout
++	case Closed:
++		return ReceivedMessage{}, r.Error()
++	default:
++		return rmi.(ReceivedMessage), nil
++	}
++}
++
++// Called in proton goroutine on MMessage event.
++func (r *receiver) message(delivery proton.Delivery) {
++	if r.eLink.State().RemoteClosed() {
++		localClose(r.eLink, r.eLink.RemoteCondition().Error())
++		return
++	}
++	if delivery.HasMessage() {
++		m, err := delivery.Message()
++		if err != nil {
++			localClose(r.eLink, err)
++			return
++		}
++		internal.Assert(m != nil)
++		r.eLink.Advance()
++		if r.eLink.Credit() < 0 {
++			localClose(r.eLink, internal.Errorf("received message in excess of credit limit"))
++		} else {
++			// We never issue more credit than cap(buffer) so this will not block.
++			r.buffer <- ReceivedMessage{m, delivery, r}
++		}
++	}
++}
++
++func (r *receiver) closed(err error) {
++	r.link.closed(err)
++	if r.buffer != nil {
++		close(r.buffer)
++	}
++}
++
++// ReceivedMessage contains an amqp.Message and allows the message to be acknowledged.
++type ReceivedMessage struct {
++	// Message is the received message.
++	Message amqp.Message
++
++	eDelivery proton.Delivery
++	receiver  Receiver
++}
++
++// Acknowledge a ReceivedMessage with the given disposition code.
++func (rm *ReceivedMessage) Acknowledge(disposition Disposition) error {
++	return rm.receiver.(*receiver).engine().InjectWait(func() error {
++		// Settle doesn't return an error but if the receiver is broken the settlement won't happen.
++		rm.eDelivery.SettleAs(uint64(disposition))
++		return rm.receiver.Error()
++	})
++}
++
++// Accept is short for Acknowledge(Accpeted)
++func (rm *ReceivedMessage) Accept() error { return rm.Acknowledge(Accepted) }
++
++// Reject is short for Acknowledge(Rejected)
++func (rm *ReceivedMessage) Reject() error { return rm.Acknowledge(Rejected) }
++
++// IncomingReceiver is passed to the accept() function given to Connection.Listen()
++// when there is an incoming request for a receiver link.
++type IncomingReceiver struct {
++	incomingLink
++}
++
++// Link provides information about the incoming link.
++func (i *IncomingReceiver) Link() Link { return i }
++
++// AcceptReceiver sets Capacity and Prefetch of the accepted Receiver.
++func (i *IncomingReceiver) AcceptReceiver(capacity int, prefetch bool) Receiver {
++	i.capacity = capacity
++	i.prefetch = prefetch
++	return i.Accept().(Receiver)
++}
++
++func (i *IncomingReceiver) Accept() Endpoint {
++	i.accepted = true
++	return newReceiver(i.link)
++}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/electron/sender.go
----------------------------------------------------------------------
diff --cc electron/sender.go
index 0000000,0000000..68cfbb3
new file mode 100644
--- /dev/null
+++ b/electron/sender.go
@@@ -1,0 -1,0 +1,315 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements.  See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership.  The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License.  You may obtain a copy of the License at
++
++  http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied.  See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package electron
++
++// #include <proton/disposition.h>
++import "C"
++
++import (
++	"qpid.apache.org/amqp"
++	"qpid.apache.org/internal"
++	"qpid.apache.org/proton"
++	"reflect"
++	"time"
++)
++
++// Sender is a Link that sends messages.
++type Sender interface {
++	Link
++
++	// Send a message without waiting for acknowledgement. Returns a SentMessage.
++	// use SentMessage.Disposition() to wait for acknowledgement and get the
++	// disposition code.
++	//
++	// If the send buffer is full, send blocks until there is space in the buffer.
++	Send(m amqp.Message) (sm SentMessage, err error)
++
++	// SendTimeout is like send but only waits up to timeout for buffer space.
++	//
++	// Returns Timeout error if the timeout expires and the message has not been sent.
++	SendTimeout(m amqp.Message, timeout time.Duration) (sm SentMessage, err error)
++
++	// Send a message and forget it, there will be no acknowledgement.
++	// If the send buffer is full, send blocks until there is space in the buffer.
++	SendForget(m amqp.Message) error
++
++	// SendForgetTimeout is like send but only waits up to timeout for buffer space.
++	// Returns Timeout error if the timeout expires and the message has not been sent.
++	SendForgetTimeout(m amqp.Message, timeout time.Duration) error
++
++	// Credit indicates how many messages the receiving end of the link can accept.
++	//
++	// On a Sender credit can be negative, meaning that messages in excess of the
++	// receiver's credit limit have been buffered locally till credit is available.
++	Credit() (int, error)
++}
++
++type sendMessage struct {
++	m  amqp.Message
++	sm SentMessage
++}
++
++type sender struct {
++	link
++	credit chan struct{} // Signal available credit.
++}
++
++// Disposition indicates the outcome of a settled message delivery.
++type Disposition uint64
++
++const (
++	// No disposition available: pre-settled, not yet acknowledged or an error occurred
++	NoDisposition Disposition = 0
++	// Message was accepted by the receiver
++	Accepted = proton.Accepted
++	// Message was rejected as invalid by the receiver
++	Rejected = proton.Rejected
++	// Message was not processed by the receiver but may be processed by some other receiver.
++	Released = proton.Released
++)
++
++// String human readable name for a Disposition.
++func (d Disposition) String() string {
++	switch d {
++	case NoDisposition:
++		return "no-disposition"
++	case Accepted:
++		return "accepted"
++	case Rejected:
++		return "rejected"
++	case Released:
++		return "released"
++	default:
++		return "unknown"
++	}
++}
++
++func (s *sender) Send(m amqp.Message) (SentMessage, error) {
++	return s.SendTimeout(m, Forever)
++}
++
++func (s *sender) SendTimeout(m amqp.Message, timeout time.Duration) (SentMessage, error) {
++	var sm SentMessage
++	if s.sndSettle == SndSettled {
++		sm = nil
++	} else {
++		sm = newSentMessage(s.session.connection)
++	}
++	return s.sendInternal(sendMessage{m, sm}, timeout)
++}
++
++func (s *sender) SendForget(m amqp.Message) error {
++	return s.SendForgetTimeout(m, Forever)
++}
++
++func (s *sender) SendForgetTimeout(m amqp.Message, timeout time.Duration) error {
++	snd := sendMessage{m, nil}
++	_, err := s.sendInternal(snd, timeout)
++	return err
++}
++
++func (s *sender) sendInternal(snd sendMessage, timeout time.Duration) (SentMessage, error) {
++	if _, err := timedReceive(s.credit, timeout); err != nil { // Wait for credit
++		if err == Closed {
++			err = s.Error()
++			internal.Assert(err != nil)
++		}
++		return nil, err
++	}
++	if err := s.engine().Inject(func() { s.doSend(snd) }); err != nil {
++		return nil, err
++	}
++	return snd.sm, nil
++}
++
++// Send a message. Handler goroutine
++func (s *sender) doSend(snd sendMessage) {
++	delivery, err := s.eLink.Send(snd.m)
++	switch sm := snd.sm.(type) {
++	case nil:
++		delivery.Settle()
++	case *sentMessage:
++		sm.delivery = delivery
++		if err != nil {
++			sm.settled(err)
++		} else {
++			s.handler().sentMessages[delivery] = sm
++		}
++	default:
++		internal.Assert(false, "bad SentMessage type %T", snd.sm)
++	}
++	if s.eLink.Credit() > 0 {
++		s.sendable() // Signal credit.
++	}
++}
++
++// Signal the sender has credit. Any goroutine.
++func (s *sender) sendable() {
++	select { // Non-blocking
++	case s.credit <- struct{}{}: // Set the flag if not already set.
++	default:
++	}
++}
++
++func (s *sender) closed(err error) {
++	s.link.closed(err)
++	close(s.credit)
++}
++
++func newSender(l link) *sender {
++	s := &sender{link: l, credit: make(chan struct{}, 1)}
++	s.handler().addLink(s.eLink, s)
++	s.link.open()
++	return s
++}
++
++// SentMessage represents a previously sent message. It allows you to wait for acknowledgement.
++type SentMessage interface {
++
++	// Disposition blocks till the message is acknowledged and returns the
++	// disposition state.
++	//
++	// NoDisposition with Error() != nil means the Connection was closed before
++	// the message was acknowledged.
++	//
++	// NoDisposition with Error() == nil means the message was pre-settled or
++	// Forget() was called.
++	Disposition() (Disposition, error)
++
++	// DispositionTimeout is like Disposition but gives up after timeout, see Timeout.
++	DispositionTimeout(time.Duration) (Disposition, error)
++
++	// Forget interrupts any call to Disposition on this SentMessage and tells the
++	// peer we are no longer interested in the disposition of this message.
++	Forget()
++
++	// Error returns the error that closed the disposition, or nil if there was no error.
++	// If the disposition closed because the connection closed, it will return Closed.
++	Error() error
++
++	// Value is an optional value you wish to associate with the SentMessage. It
++	// can be the message itself or some form of identifier.
++	Value() interface{}
++	SetValue(interface{})
++}
++
++// SentMessageSet is a concurrent-safe set of sent messages that can be checked
++// to get the next completed sent message
++type SentMessageSet struct {
++	cases []reflect.SelectCase
++	sm    []SentMessage
++	done  chan SentMessage
++}
++
++func (s *SentMessageSet) Add(sm SentMessage) {
++	s.sm = append(s.sm, sm)
++	s.cases = append(s.cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sm.(*sentMessage).done)})
++}
++
++// Wait waits up to timeout and returns the next SentMessage that has a valid dispositionb
++// or an error.
++func (s *SentMessageSet) Wait(sm SentMessage, timeout time.Duration) (SentMessage, error) {
++	s.cases = s.cases[:len(s.sm)] // Remove previous timeout cases
++	if timeout == 0 {             // Non-blocking
++		s.cases = append(s.cases, reflect.SelectCase{Dir: reflect.SelectDefault})
++	} else {
++		s.cases = append(s.cases,
++			reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(After(timeout))})
++	}
++	chosen, _, _ := reflect.Select(s.cases)
++	if chosen > len(s.sm) {
++		return nil, Timeout
++	} else {
++		sm := s.sm[chosen]
++		s.sm = append(s.sm[:chosen], s.sm[chosen+1:]...)
++		return sm, nil
++	}
++}
++
++// SentMessage implementation
++type sentMessage struct {
++	connection  *connection
++	done        chan struct{}
++	delivery    proton.Delivery
++	disposition Disposition
++	err         error
++	value       interface{}
++}
++
++func newSentMessage(c *connection) *sentMessage {
++	return &sentMessage{connection: c, done: make(chan struct{})}
++}
++
++func (sm *sentMessage) SetValue(v interface{}) { sm.value = v }
++func (sm *sentMessage) Value() interface{}     { return sm.value }
++func (sm *sentMessage) Disposition() (Disposition, error) {
++	<-sm.done
++	return sm.disposition, sm.err
++}
++
++func (sm *sentMessage) DispositionTimeout(timeout time.Duration) (Disposition, error) {
++	if _, err := timedReceive(sm.done, timeout); err == Timeout {
++		return sm.disposition, Timeout
++	} else {
++		return sm.disposition, sm.err
++	}
++}
++
++func (sm *sentMessage) Forget() {
++	sm.connection.engine.Inject(func() {
++		sm.delivery.Settle()
++		delete(sm.connection.handler.sentMessages, sm.delivery)
++	})
++	sm.finish()
++}
++
++func (sm *sentMessage) settled(err error) {
++	if sm.delivery.Settled() {
++		sm.disposition = Disposition(sm.delivery.Remote().Type())
++	}
++	sm.err = err
++	sm.finish()
++}
++
++func (sm *sentMessage) finish() {
++	select {
++	case <-sm.done: // No-op if already closed
++	default:
++		close(sm.done)
++	}
++}
++
++func (sm *sentMessage) Error() error { return sm.err }
++
++// IncomingSender is passed to the accept() function given to Connection.Listen()
++// when there is an incoming request for a sender link.
++type IncomingSender struct {
++	incomingLink
++}
++
++// Link provides information about the incoming link.
++func (i *IncomingSender) Link() Link { return i }
++
++func (i *IncomingSender) AcceptSender() Sender { return i.Accept().(Sender) }
++
++func (i *IncomingSender) Accept() Endpoint {
++	i.accepted = true
++	return newSender(i.link)
++}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/electron/session.go
----------------------------------------------------------------------
diff --cc electron/session.go
index 0000000,0000000..3531da6
new file mode 100644
--- /dev/null
+++ b/electron/session.go
@@@ -1,0 -1,0 +1,125 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements.  See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership.  The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License.  You may obtain a copy of the License at
++
++  http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied.  See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package electron
++
++import (
++	"qpid.apache.org/proton"
++)
++
++// Session is an AMQP session, it contains Senders and Receivers.
++type Session interface {
++	Endpoint
++
++	// Sender opens a new sender. v can be a string, which is used as the Target
++	// address, or a SenderSettings struct containing more details settings.
++	Sender(...LinkSetting) (Sender, error)
++
++	// Receiver opens a new Receiver. v can be a string, which is used as the
++	// Source address, or a ReceiverSettings struct containing more details
++	// settings.
++	Receiver(...LinkSetting) (Receiver, error)
++}
++
++type session struct {
++	endpoint
++	eSession   proton.Session
++	connection *connection
++	capacity   uint
++}
++
++// SessionSetting can be passed when creating a sender or receiver.
++// See functions that return SessionSetting for details
++type SessionSetting func(*session)
++
++// IncomingCapacity sets the size (in bytes) of the sessions incoming data buffer..
++func IncomingCapacity(cap uint) SessionSetting { return func(s *session) { s.capacity = cap } }
++
++// in proton goroutine
++func newSession(c *connection, es proton.Session, setting ...SessionSetting) *session {
++	s := &session{
++		connection: c,
++		eSession:   es,
++		endpoint:   endpoint{str: es.String()},
++	}
++	for _, set := range setting {
++		set(s)
++	}
++	c.handler.sessions[s.eSession] = s
++	s.eSession.SetIncomingCapacity(s.capacity)
++	s.eSession.Open()
++	return s
++}
++
++func (s *session) Connection() Connection     { return s.connection }
++func (s *session) eEndpoint() proton.Endpoint { return s.eSession }
++func (s *session) engine() *proton.Engine     { return s.connection.engine }
++func (s *session) Close(err error) {
++	s.engine().Inject(func() { localClose(s.eSession, err) })
++}
++
++func (s *session) SetCapacity(bytes uint) { s.capacity = bytes }
++
++func (s *session) Sender(setting ...LinkSetting) (snd Sender, err error) {
++	err = s.engine().InjectWait(func() error {
++		l, err := localLink(s, true, setting...)
++		if err == nil {
++			snd = newSender(l)
++		}
++		return err
++	})
++	return
++}
++
++func (s *session) Receiver(setting ...LinkSetting) (rcv Receiver, err error) {
++	err = s.engine().InjectWait(func() error {
++		l, err := localLink(s, false, setting...)
++		if err == nil {
++			rcv = newReceiver(l)
++		}
++		return err
++	})
++	return
++}
++
++// Called from handler on closed.
++func (s *session) closed(err error) {
++	s.err.Set(err)
++	s.err.Set(Closed)
++}
++
++// IncomingSession is passed to the accept() function given to Connection.Listen()
++// when there is an incoming session request.
++type IncomingSession struct {
++	incoming
++	h        *handler
++	pSession proton.Session
++	capacity uint
++}
++
++// AcceptCapacity sets the session buffer capacity of an incoming session in bytes.
++func (i *IncomingSession) AcceptSession(bytes uint) Session {
++	i.capacity = bytes
++	return i.Accept().(Session)
++}
++
++func (i *IncomingSession) Accept() Endpoint {
++	i.accepted = true
++	return newSession(i.h.connection, i.pSession, IncomingCapacity(i.capacity))
++}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/electron/time.go
----------------------------------------------------------------------
diff --cc electron/time.go
index 0000000,0000000..3407b82
new file mode 100644
--- /dev/null
+++ b/electron/time.go
@@@ -1,0 -1,0 +1,82 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements.  See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership.  The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License.  You may obtain a copy of the License at
++
++  http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied.  See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package electron
++
++import (
++	"qpid.apache.org/internal"
++	"reflect"
++	"time"
++)
++
++// Timeout is the error returned if an operation does not complete on time.
++//
++// Methods named *Timeout in this package take time.Duration timeout parameter.
++//
++// If timeout > 0 and there is no result available before the timeout, they
++// return a zero or nil value and Timeout as an error.
++//
++// If timeout == 0 they will return a result if one is immediatley available or
++// nil/zero and Timeout as an error if not.
++//
++// If timeout == Forever the function will return only when there is a result or
++// some non-timeout error occurs.
++//
++var Timeout = internal.Errorf("timeout")
++
++// Forever can be used as a timeout parameter to indicate wait forever.
++const Forever time.Duration = -1
++
++// timedReceive receives on channel (which can be a chan of any type), waiting
++// up to timeout.
++//
++// timeout==0 means do a non-blocking receive attempt. timeout < 0 means block
++// forever. Other values mean block up to the timeout.
++//
++// Returns error Timeout on timeout, Closed on channel close.
++func timedReceive(channel interface{}, timeout time.Duration) (interface{}, error) {
++	cases := []reflect.SelectCase{
++		reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(channel)},
++	}
++	if timeout == 0 { // Non-blocking
++		cases = append(cases, reflect.SelectCase{Dir: reflect.SelectDefault})
++	} else { // Block up to timeout
++		cases = append(cases,
++			reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(After(timeout))})
++	}
++	chosen, value, ok := reflect.Select(cases)
++	switch {
++	case !ok:
++		return nil, Closed
++	case chosen == 0:
++		return value.Interface(), nil
++	default:
++		return nil, Timeout
++	}
++}
++
++// After is like time.After but returns a nil channel if timeout == Forever
++// since selecting on a nil channel will never return.
++func After(timeout time.Duration) <-chan time.Time {
++	if timeout == Forever {
++		return nil
++	} else {
++		return time.After(timeout)
++	}
++}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/internal/error.go
----------------------------------------------------------------------
diff --cc internal/error.go
index 0000000,0000000..1b108e6
new file mode 100644
--- /dev/null
+++ b/internal/error.go
@@@ -1,0 -1,0 +1,118 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements.  See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership.  The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License.  You may obtain a copy of the License at
++
++  http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied.  See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++// Internal implementation details - ignore.
++package internal
++
++// #cgo LDFLAGS: -lqpid-proton
++// #include <proton/error.h>
++// #include <proton/codec.h>
++import "C"
++
++import (
++	"fmt"
++	"sync"
++	"sync/atomic"
++	"unsafe"
++)
++
++// Error type for proton runtime errors returned as error values.
++type Error string
++
++// Error prefixes error message with proton:
++func (e Error) Error() string {
++	return "proton: " + string(e)
++}
++
++// Errorf creates an Error with a formatted message
++func Errorf(format string, a ...interface{}) Error {
++	return Error(fmt.Sprintf(format, a...))
++}
++
++type PnErrorCode int
++
++func (e PnErrorCode) String() string {
++	switch e {
++	case C.PN_EOS:
++		return "end-of-data"
++	case C.PN_ERR:
++		return "error"
++	case C.PN_OVERFLOW:
++		return "overflow"
++	case C.PN_UNDERFLOW:
++		return "underflow"
++	case C.PN_STATE_ERR:
++		return "bad-state"
++	case C.PN_ARG_ERR:
++		return "invalid-argument"
++	case C.PN_TIMEOUT:
++		return "timeout"
++	case C.PN_INTR:
++		return "interrupted"
++	case C.PN_INPROGRESS:
++		return "in-progress"
++	default:
++		return fmt.Sprintf("unknown-error(%d)", e)
++	}
++}
++
++func PnError(p unsafe.Pointer) error {
++	e := (*C.pn_error_t)(p)
++	if e == nil || C.pn_error_code(e) == 0 {
++		return nil
++	}
++	return Errorf("%s: %s", PnErrorCode(C.pn_error_code(e)), C.GoString(C.pn_error_text(e)))
++}
++
++// panicIf panics if condition is true, the panic value is Errorf(fmt, args...)
++func panicIf(condition bool, fmt string, args ...interface{}) {
++	if condition {
++		panic(Errorf(fmt, args...))
++	}
++}
++
++// ErrorHolder is a goroutine-safe error holder that keeps the first error that is set.
++type ErrorHolder struct {
++	once  sync.Once
++	value atomic.Value
++}
++
++// Set the error if not already set, return the error in the Holder.
++func (e *ErrorHolder) Set(err error) {
++	if err != nil {
++		e.once.Do(func() { e.value.Store(err) })
++	}
++}
++
++// Get the error.
++func (e *ErrorHolder) Get() (err error) {
++	err, _ = e.value.Load().(error)
++	return
++}
++
++// Assert panics if condition is false with optional formatted message
++func Assert(condition bool, format ...interface{}) {
++	if !condition {
++		if len(format) > 0 {
++			panic(Errorf(format[0].(string), format[1:]...))
++		} else {
++			panic(Errorf("assertion failed"))
++		}
++	}
++}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/internal/flexchannel.go
----------------------------------------------------------------------
diff --cc internal/flexchannel.go
index 0000000,0000000..77b524c
new file mode 100644
--- /dev/null
+++ b/internal/flexchannel.go
@@@ -1,0 -1,0 +1,82 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements.  See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership.  The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License.  You may obtain a copy of the License at
++
++  http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied.  See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package internal
++
++// FlexChannel acts like a channel with an automatically sized buffer, see NewFlexChannel().
++type FlexChannel struct {
++	// In channel to send to. close(In) will close the FlexChannel once buffer has drained.
++	In chan<- interface{}
++	// Out channel to receive from. Out closes when In has closed and the buffer is empty.
++	Out <-chan interface{}
++
++	in, out chan interface{}
++	buffer  []interface{}
++	limit   int
++}
++
++// NewFlexChannel creates a FlexChannel, a channel with an automatically-sized buffer.
++//
++// Initially the buffer size is 0, the buffer grows as needed up to limit. limit < 0 means
++// there is no limit.
++//
++func NewFlexChannel(limit int) *FlexChannel {
++	fc := &FlexChannel{
++		in:     make(chan interface{}),
++		out:    make(chan interface{}),
++		buffer: make([]interface{}, 0),
++		limit:  limit,
++	}
++	fc.In = fc.in
++	fc.Out = fc.out
++	go fc.run()
++	return fc
++}
++
++func (fc *FlexChannel) run() {
++	defer func() { // Flush the channel on exit
++		for _, data := range fc.buffer {
++			fc.out <- data
++		}
++		close(fc.out)
++	}()
++
++	for {
++		var usein, useout chan interface{}
++		var outvalue interface{}
++		if len(fc.buffer) > 0 {
++			useout = fc.out
++			outvalue = fc.buffer[0]
++		}
++		if len(fc.buffer) < fc.limit || fc.limit < 0 {
++			usein = fc.in
++		}
++		Assert(usein != nil || useout != nil)
++		select {
++		case useout <- outvalue:
++			fc.buffer = fc.buffer[1:]
++		case data, ok := <-usein:
++			if ok {
++				fc.buffer = append(fc.buffer, data)
++			} else {
++				return
++			}
++		}
++	}
++}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/internal/flexchannel_test.go
----------------------------------------------------------------------
diff --cc internal/flexchannel_test.go
index 0000000,0000000..d0e1a44
new file mode 100644
--- /dev/null
+++ b/internal/flexchannel_test.go
@@@ -1,0 -1,0 +1,89 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements.  See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership.  The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License.  You may obtain a copy of the License at
++
++  http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied.  See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package internal
++
++import (
++	"testing"
++)
++
++func recvall(ch <-chan interface{}) (result []interface{}) {
++	for {
++		select {
++		case x := <-ch:
++			result = append(result, x)
++		default:
++			return
++		}
++	}
++}
++
++func sendall(data []interface{}, ch chan<- interface{}) {
++}
++
++func TestFlex(t *testing.T) {
++	fc := NewFlexChannel(5)
++
++	// Test send/receve
++	go func() {
++		for i := 0; i < 4; i++ {
++			fc.In <- i
++		}
++	}()
++
++	for i := 0; i < 4; i++ {
++		j := <-fc.Out
++		if i != j {
++			t.Error("%v != %v", i, j)
++		}
++	}
++	select {
++	case x, ok := <-fc.Out:
++		t.Error("receive empty channel got", x, ok)
++	default:
++	}
++
++	// Test buffer limit
++	for i := 10; i < 15; i++ {
++		fc.In <- i
++	}
++	select {
++	case fc.In <- 0:
++		t.Error("send to full channel did not block")
++	default:
++	}
++	i := <-fc.Out
++	if i != 10 {
++		t.Error("%v != %v", i, 10)
++	}
++	fc.In <- 15
++	close(fc.In)
++
++	for i := 11; i < 16; i++ {
++		j := <-fc.Out
++		if i != j {
++			t.Error("%v != %v", i, j)
++		}
++	}
++
++	x, ok := <-fc.Out
++	if ok {
++		t.Error("Unexpected value on Out", x)
++	}
++}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/internal/safemap.go
----------------------------------------------------------------------
diff --cc internal/safemap.go
index 0000000,0000000..3a1fe2b
new file mode 100644
--- /dev/null
+++ b/internal/safemap.go
@@@ -1,0 -1,0 +1,57 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements.  See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership.  The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License.  You may obtain a copy of the License at
++
++  http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied.  See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package internal
++
++import (
++	"sync"
++)
++
++// SafeMap is a goroutine-safe map of interface{} to interface{}.
++type SafeMap struct {
++	m    map[interface{}]interface{}
++	lock sync.Mutex
++}
++
++func MakeSafeMap() SafeMap { return SafeMap{m: make(map[interface{}]interface{})} }
++
++func (m *SafeMap) Get(key interface{}) interface{} {
++	m.lock.Lock()
++	defer m.lock.Unlock()
++	return m.m[key]
++}
++
++func (m *SafeMap) GetOk(key interface{}) (interface{}, bool) {
++	m.lock.Lock()
++	defer m.lock.Unlock()
++	v, ok := m.m[key]
++	return v, ok
++}
++
++func (m *SafeMap) Put(key, value interface{}) {
++	m.lock.Lock()
++	defer m.lock.Unlock()
++	m.m[key] = value
++}
++
++func (m *SafeMap) Delete(key interface{}) {
++	m.lock.Lock()
++	defer m.lock.Unlock()
++	delete(m.m, key)
++}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/internal/uuid.go
----------------------------------------------------------------------
diff --cc internal/uuid.go
index 0000000,0000000..ef941a1
new file mode 100644
--- /dev/null
+++ b/internal/uuid.go
@@@ -1,0 -1,0 +1,70 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements.  See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership.  The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License.  You may obtain a copy of the License at
++
++  http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied.  See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package internal
++
++import (
++	"fmt"
++	"math/rand"
++	"strconv"
++	"sync"
++	"sync/atomic"
++	"time"
++)
++
++type UUID [16]byte
++
++func (u UUID) String() string {
++	return fmt.Sprintf("%X-%X-%X-%X-%X", u[0:4], u[4:6], u[6:8], u[8:10], u[10:])
++}
++
++// Don't mess with the default random source.
++var randomSource = rand.NewSource(time.Now().UnixNano())
++var randomLock sync.Mutex
++
++func random() byte {
++	randomLock.Lock()
++	defer randomLock.Unlock()
++	return byte(randomSource.Int63())
++}
++
++func UUID4() UUID {
++	var u UUID
++	for i := 0; i < len(u); i++ {
++		u[i] = random()
++	}
++	// See /https://tools.ietf.org/html/rfc4122#section-4.4
++	u[6] = (u[6] & 0x0F) | 0x40 // Version bits to 4
++	u[8] = (u[8] & 0x3F) | 0x80 // Reserved bits (top two) set to 01
++	return u
++}
++
++// A simple atomic counter to generate unique 64 bit IDs.
++type IdCounter struct{ count uint64 }
++
++// NextInt gets the next uint64 value from the atomic counter.
++func (uc *IdCounter) NextInt() uint64 {
++	return atomic.AddUint64(&uc.count, 1)
++}
++
++// Next gets the next integer value encoded as a base32 string, safe for NUL
++// terminated C strings.
++func (uc *IdCounter) Next() string {
++	return strconv.FormatUint(uc.NextInt(), 32)
++}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/proton/doc.go
----------------------------------------------------------------------
diff --cc proton/doc.go
index 34f85fe,0000000..51f70f8
mode 100644,000000..100644
--- a/proton/doc.go
+++ b/proton/doc.go
@@@ -1,33 -1,0 +1,70 @@@
 +/*
 +Licensed to the Apache Software Foundation (ASF) under one
 +or more contributor license agreements.  See the NOTICE file
 +distributed with this work for additional information
 +regarding copyright ownership.  The ASF licenses this file
 +to you under the Apache License, Version 2.0 (the
 +"License"); you may not use this file except in compliance
 +with the License.  You may obtain a copy of the License at
 +
 +  http://www.apache.org/licenses/LICENSE-2.0
 +
 +Unless required by applicable law or agreed to in writing,
 +software distributed under the License is distributed on an
 +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 +KIND, either express or implied.  See the License for the
 +specific language governing permissions and limitations
 +under the License.
 +*/
 +
 +/*
- Package proton is a Go binding for the proton AMQP protocol engine.
++Package proton is an event-driven, concurrent-unsafe Go library for AMQP messaging.
++You can write clients and servers using this library.
 +
- It alows you to construct and parse AMQP messages, and to implement AMQP
- clients, servers and intermediaries that can exchange messages with any
- AMQP 1.0 compliant endpoint.
++This package is a port of the Proton C API into Go (see
++http://qpid.apache.org/proton) Go programmers may find the 'electron' package
++more convenient. qpid.apache.org/electron provides a concurrent-safe API that
++allows you to run procedural loops in arbitrary goroutines rather than
++implementing event handlers that must run in a single goroutine.
++
++Consult the C API documentation at http://qpid.apache.org/proton for more
++information about the types here. There is a 1-1 correspondence between C type
++pn_foo_t and Go type proton.Foo, and between C function
++
++    pn_foo_do_something(pn_foo_t*, ...)
++
++and Go method
++
++    func (proton.Foo) DoSomething(...)
++
++The proton.Engine type pumps data between a Go net.Conn and a proton event loop
++goroutine that feeds events to a proton.MessagingHandler, which you must implement.
++See the Engine documentation for more.
++
++MessagingHandler defines an event handling interface that you can implement to
++react to AMQP protocol events. There is also a lower-level EventHandler, but
++MessagingHandler provides a simpler set of events and automates common tasks for you,
++for most applications it will be more convenient.
++
++NOTE: Methods on most types defined in this package (Sessions, Links etc.)  can
++*only* be called in the event handler goroutine of the relevant
++Connection/Engine, either by the HandleEvent method of a handler type or in a
++function injected into the goroutine via Inject() or InjectWait() Handlers and
++injected functions can set up channels to communicate with other goroutines.
++Note the Injecter associated with a handler available as part of the Event value
++passed to HandleEvent.
++
++Separate Engine instances are independent, and can run concurrently.
++
++The 'electron' package is built on the proton package but instead offers a
++concurrent-safe API that can use simple procedural loops rather than event
++handlers to express application logic. It is easier to use for most
++applications.
 +
- Encoding and decoding AMQP data follows the pattern of the standard
- encoding/json and encoding/xml packages.The mapping between AMQP and Go types is
- described in the documentation of the Marshal and Unmarshal functions.
 +*/
 +package proton
 +
++// #cgo LDFLAGS: -lqpid-proton
++import "C"
++
 +// This file is just for the package comment.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/proton/engine.go
----------------------------------------------------------------------
diff --cc proton/engine.go
index 0000000,0000000..2cebb49
new file mode 100644
--- /dev/null
+++ b/proton/engine.go
@@@ -1,0 -1,0 +1,402 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements.  See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership.  The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License.  You may obtain a copy of the License at
++
++  http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied.  See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package proton
++
++// #include <proton/connection.h>
++// #include <proton/event.h>
++// #include <proton/handlers.h>
++// #include <proton/session.h>
++// #include <proton/transport.h>
++// #include <memory.h>
++// #include <stdlib.h>
++//
++// PN_HANDLE(REMOTE_ADDR)
++import "C"
++
++import (
++	"fmt"
++	"io"
++	"net"
++	"qpid.apache.org/internal"
++	"sync"
++	"unsafe"
++)
++
++// Injecter allows functions to be "injected" into the event-processing loop, to
++// be called in the same goroutine as event handlers.
++type Injecter interface {
++	// Inject a function into the engine goroutine.
++	//
++	// f() will be called in the same goroutine as event handlers, so it can safely
++	// use values belonging to event handlers without synchronization. f() should
++	// not block, no further events or injected functions can be processed until
++	// f() returns.
++	//
++	// Returns a non-nil error if the function could not be injected and will
++	// never be called. Otherwise the function will eventually be called.
++	//
++	// Note that proton values (Link, Session, Connection etc.) that existed when
++	// Inject(f) was called may have become invalid by the time f() is executed.
++	// Handlers should handle keep track of Closed events to ensure proton values
++	// are not used after they become invalid. One technique is to have map from
++	// proton values to application values. Check that the map has the correct
++	// proton/application value pair at the start of the injected function and
++	// delete the value from the map when handling a Closed event.
++	Inject(f func()) error
++
++	// InjectWait is like Inject but does not return till f() has completed.
++	// If f() cannot be injected it returns the error from Inject(), otherwise
++	// it returns the error from f()
++	InjectWait(f func() error) error
++}
++
++// bufferChan manages a pair of ping-pong buffers to pass bytes through a channel.
++type bufferChan struct {
++	buffers    chan []byte
++	buf1, buf2 []byte
++}
++
++func newBufferChan(size int) *bufferChan {
++	return &bufferChan{make(chan []byte), make([]byte, size), make([]byte, size)}
++}
++
++func (b *bufferChan) buffer() []byte {
++	b.buf1, b.buf2 = b.buf2, b.buf1 // Alternate buffers.
++	return b.buf1[:cap(b.buf1)]
++}
++
++// Engine reads from a net.Conn, decodes AMQP events and calls the appropriate
++// Handler functions sequentially in a single goroutine. Actions taken by
++// Handler functions (such as sending messages) are encoded and written to the
++// net.Conn. You can create multiple Engines to handle multiple connections
++// concurrently.
++//
++// You implement the EventHandler and/or MessagingHandler interfaces and provide
++// those values to NewEngine(). Their HandleEvent method will be called in the
++// event-handling goroutine.
++//
++// Handlers can pass values from an event (Connections, Links, Deliveries etc.) to
++// other goroutines, store them, or use them as map indexes. Effectively they are
++// just pointers.  Other goroutines cannot call their methods directly but they can
++// can create a function closure to call such methods and pass it to Engine.Inject()
++// to have it evaluated in the engine goroutine.
++//
++// You are responsible for ensuring you don't use an event value after it is
++// invalid. The handler methods will tell you when a value is no longer valid. For
++// example after a LinkClosed event, that link is no longer valid. If you do
++// Link.Close() yourself (in a handler or injected function) the link remains valid
++// until the corresponing LinkClosed event is received by the handler.
++//
++// Engine.Close() will take care of cleaning up any remaining values when you are
++// done with the Engine. All values associated with a engine become invalid when you
++// call Engine.Close()
++//
++// The qpid.apache.org/proton/concurrent package will do all this for you, so it
++// may be a better choice for some applications.
++//
++type Engine struct {
++	// Error is set on exit from Run() if there was an error.
++	err    internal.ErrorHolder
++	inject chan func()
++
++	conn       net.Conn
++	connection Connection
++	transport  Transport
++	collector  *C.pn_collector_t
++	read       *bufferChan    // Read buffers channel.
++	write      *bufferChan    // Write buffers channel.
++	handlers   []EventHandler // Handlers for proton events.
++	running    chan struct{}  // This channel will be closed when the goroutines are done.
++	closeOnce  sync.Once
++}
++
++const bufferSize = 4096
++
++// Map of Connection to *Engine
++var engines = internal.MakeSafeMap()
++
++// NewEngine initializes a engine with a connection and handlers. To start it running:
++//    eng := NewEngine(...)
++//    go run eng.Run()
++// The goroutine will exit when the engine is closed or disconnected.
++// You can check for errors on Engine.Error.
++//
++func NewEngine(conn net.Conn, handlers ...EventHandler) (*Engine, error) {
++	// Save the connection ID for Connection.String()
++	eng := &Engine{
++		inject:     make(chan func()),
++		conn:       conn,
++		transport:  Transport{C.pn_transport()},
++		connection: Connection{C.pn_connection()},
++		collector:  C.pn_collector(),
++		handlers:   handlers,
++		read:       newBufferChan(bufferSize),
++		write:      newBufferChan(bufferSize),
++		running:    make(chan struct{}),
++	}
++	if eng.transport.IsNil() || eng.connection.IsNil() || eng.collector == nil {
++		return nil, internal.Errorf("failed to allocate engine")
++	}
++
++	// TODO aconway 2015-06-25: connection settings for user, password, container etc.
++	// before transport.Bind() Set up connection before Engine, allow Engine or Reactor
++	// to run connection.
++
++	// Unique container-id by default.
++	eng.connection.SetContainer(internal.UUID4().String())
++	pnErr := eng.transport.Bind(eng.connection)
++	if pnErr != 0 {
++		return nil, internal.Errorf("cannot setup engine: %s", internal.PnErrorCode(pnErr))
++	}
++	C.pn_connection_collect(eng.connection.pn, eng.collector)
++	eng.connection.Open()
++	connectionContexts.Put(eng.connection, connectionContext{eng.String()})
++	return eng, nil
++}
++
++func (eng *Engine) String() string {
++	return fmt.Sprintf("%s-%s", eng.conn.LocalAddr(), eng.conn.RemoteAddr())
++}
++
++func (eng *Engine) Id() string {
++	return fmt.Sprintf("%eng", &eng)
++}
++
++func (eng *Engine) Error() error {
++	return eng.err.Get()
++}
++
++// Inject a function into the Engine's event loop.
++//
++// f() will be called in the same event-processing goroutine that calls Handler
++// methods. f() can safely call methods on values that belong to this engine
++// (Sessions, Links etc)
++//
++// The injected function has no parameters or return values. It is normally a
++// closure and can use channels to communicate with the injecting goroutine if
++// necessary.
++//
++// Returns a non-nil error if the engine is closed before the function could be
++// injected.
++func (eng *Engine) Inject(f func()) error {
++	select {
++	case eng.inject <- f:
++		return nil
++	case <-eng.running:
++		return eng.Error()
++	}
++}
++
++// InjectWait is like Inject but does not return till f() has completed or the
++// engine is closed, and returns an error value from f()
++func (eng *Engine) InjectWait(f func() error) error {
++	done := make(chan error)
++	defer close(done)
++	err := eng.Inject(func() { done <- f() })
++	if err != nil {
++		return err
++	}
++	select {
++	case <-eng.running:
++		return eng.Error()
++	case err := <-done:
++		return err
++	}
++}
++
++// Server puts the Engine in server mode, meaning it will auto-detect security settings on
++// the incoming connnection such as use of SASL and SSL.
++// Must be called before Run()
++//
++func (eng *Engine) Server() { eng.transport.SetServer() }
++
++// Close the engine's connection, returns when the engine has exited.
++func (eng *Engine) Close(err error) {
++	eng.err.Set(err)
++	eng.Inject(func() {
++		CloseError(eng.connection, err)
++	})
++	<-eng.running
++}
++
++// Disconnect the engine's connection without and AMQP close, returns when the engine has exited.
++func (eng *Engine) Disconnect(err error) {
++	eng.err.Set(err)
++	eng.conn.Close()
++	<-eng.running
++}
++
++// Run the engine. Engine.Run() will exit when the engine is closed or
++// disconnected.  You can check for errors after exit with Engine.Error().
++//
++func (eng *Engine) Run() error {
++	wait := sync.WaitGroup{}
++	wait.Add(2) // Read and write goroutines
++
++	readErr := make(chan error, 1) // Don't block
++	go func() {                    // Read goroutine
++		defer wait.Done()
++		for {
++			rbuf := eng.read.buffer()
++			n, err := eng.conn.Read(rbuf)
++			if n > 0 {
++				eng.read.buffers <- rbuf[:n]
++			}
++			if err != nil {
++				readErr <- err
++				close(readErr)
++				close(eng.read.buffers)
++				return
++			}
++		}
++	}()
++
++	writeErr := make(chan error, 1) // Don't block
++	go func() {                     // Write goroutine
++		defer wait.Done()
++		for {
++			wbuf, ok := <-eng.write.buffers
++			if !ok {
++				return
++			}
++			_, err := eng.conn.Write(wbuf)
++			if err != nil {
++				writeErr <- err
++				close(writeErr)
++				return
++			}
++		}
++	}()
++
++	wbuf := eng.write.buffer()[:0]
++
++	for eng.err.Get() == nil {
++		if len(wbuf) == 0 {
++			eng.pop(&wbuf)
++		}
++		// Don't set wchan unless there is something to write.
++		var wchan chan []byte
++		if len(wbuf) > 0 {
++			wchan = eng.write.buffers
++		}
++
++		select {
++		case buf, ok := <-eng.read.buffers: // Read a buffer
++			if ok {
++				eng.push(buf)
++			}
++		case wchan <- wbuf: // Write a buffer
++			wbuf = eng.write.buffer()[:0]
++		case f, ok := <-eng.inject: // Function injected from another goroutine
++			if ok {
++				f()
++			}
++		case err := <-readErr:
++			eng.netError(err)
++		case err := <-writeErr:
++			eng.netError(err)
++		}
++		eng.process()
++	}
++	close(eng.write.buffers)
++	eng.conn.Close() // Make sure connection is closed
++	wait.Wait()
++	close(eng.running) // Signal goroutines have exited and Error is set.
++
++	connectionContexts.Delete(eng.connection)
++	if !eng.connection.IsNil() {
++		eng.connection.Free()
++	}
++	if !eng.transport.IsNil() {
++		eng.transport.Free()
++	}
++	if eng.collector != nil {
++		C.pn_collector_free(eng.collector)
++	}
++	for _, h := range eng.handlers {
++		switch h := h.(type) {
++		case cHandler:
++			C.pn_handler_free(h.pn)
++		}
++	}
++	return eng.err.Get()
++}
++
++func (eng *Engine) netError(err error) {
++	eng.err.Set(err)
++	eng.transport.CloseHead()
++	eng.transport.CloseTail()
++}
++
++func minInt(a, b int) int {
++	if a < b {
++		return a
++	} else {
++		return b
++	}
++}
++
++func (eng *Engine) pop(buf *[]byte) {
++	pending := int(eng.transport.Pending())
++	switch {
++	case pending == int(C.PN_EOS):
++		*buf = (*buf)[:]
++		return
++	case pending < 0:
++		panic(internal.Errorf("%s", internal.PnErrorCode(pending)))
++	}
++	size := minInt(pending, cap(*buf))
++	*buf = (*buf)[:size]
++	if size == 0 {
++		return
++	}
++	C.memcpy(unsafe.Pointer(&(*buf)[0]), eng.transport.Head(), C.size_t(size))
++	internal.Assert(size > 0)
++	eng.transport.Pop(uint(size))
++}
++
++func (eng *Engine) push(buf []byte) {
++	buf2 := buf
++	for len(buf2) > 0 {
++		n := eng.transport.Push(buf2)
++		if n <= 0 {
++			panic(internal.Errorf("error in transport: %s", internal.PnErrorCode(n)))
++		}
++		buf2 = buf2[n:]
++	}
++}
++
++func (eng *Engine) handle(e Event) {
++	for _, h := range eng.handlers {
++		h.HandleEvent(e)
++	}
++	if e.Type() == ETransportClosed {
++		eng.err.Set(io.EOF)
++	}
++}
++
++func (eng *Engine) process() {
++	for ce := C.pn_collector_peek(eng.collector); ce != nil; ce = C.pn_collector_peek(eng.collector) {
++		eng.handle(makeEvent(ce, eng))
++		C.pn_collector_pop(eng.collector)
++	}
++}
++
++func (eng *Engine) Connection() Connection { return eng.connection }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org