You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2016/09/20 21:49:27 UTC

[1/6] qpid-proton git commit: NO-JIRA: Fixed missing error checks reported by go-errcheck

Repository: qpid-proton
Updated Branches:
  refs/heads/master 4bb26814c -> 6134e216c


NO-JIRA: Fixed missing error checks reported by go-errcheck


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/77b907b8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/77b907b8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/77b907b8

Branch: refs/heads/master
Commit: 77b907b8d8e26701866d4567981fc5e3f6e099c3
Parents: 0e37353
Author: Alan Conway <ac...@redhat.com>
Authored: Mon Sep 12 16:13:09 2016 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue Sep 20 17:39:15 2016 -0400

----------------------------------------------------------------------
 .../bindings/go/src/qpid.apache.org/amqp/marshal.go |  2 +-
 .../go/src/qpid.apache.org/electron/connection.go   |  4 ++--
 .../src/qpid.apache.org/electron/electron_test.go   | 16 ++++++++++++----
 .../go/src/qpid.apache.org/electron/handler.go      |  6 +++---
 .../go/src/qpid.apache.org/electron/link.go         |  2 +-
 .../go/src/qpid.apache.org/electron/receiver.go     |  4 ++--
 .../go/src/qpid.apache.org/electron/session.go      |  2 +-
 .../go/src/qpid.apache.org/proton/engine.go         | 16 +++++++++-------
 .../go/src/qpid.apache.org/proton/handlers.go       |  8 ++++----
 9 files changed, 35 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/77b907b8/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go
index 66e14d8..3b4a59e 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go
@@ -239,7 +239,7 @@ func NewEncoder(w io.Writer) *Encoder {
 func (e *Encoder) Encode(v interface{}) (err error) {
 	e.buffer, err = Marshal(v, e.buffer)
 	if err == nil {
-		e.writer.Write(e.buffer)
+		_, err = e.writer.Write(e.buffer)
 	}
 	return err
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/77b907b8/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
index 3bc5dcf..d0733f2 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
@@ -185,11 +185,11 @@ func (c *connection) run() {
 	if !c.server {
 		c.pConnection.Open()
 	}
-	c.engine.Run()
+	_ = c.engine.Run()
 	if c.incoming != nil {
 		close(c.incoming)
 	}
-	c.closed(Closed)
+	_ = c.closed(Closed)
 }
 
 func (c *connection) Close(err error) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/77b907b8/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go
index 56b91bf..aa37d57 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go
@@ -286,7 +286,9 @@ func TestTimeouts(t *testing.T) {
 	if err != nil {
 		t.Fatal(err)
 	}
-	rm.Accept()
+	if err := rm.Accept(); err != nil {
+		t.Fatal(err)
+	}
 	// Sender get ack
 	if a := <-ack; a.Status != Accepted || a.Error != nil {
 		t.Errorf("want (accepted, nil) got %#v", a)
@@ -433,13 +435,17 @@ func TestConnectionCloseInterrupt1(t *testing.T) {
 	snd, rcv := pairs.senderReceiver()
 	go doSend(snd, results)
 
-	rcv.Receive()
+	if _, err := rcv.Receive(); err != nil {
+		t.Error("receive", err)
+	}
 	rcv, snd = pairs.receiverSender()
 	go doReceive(rcv, results)
 
 	snd, rcv = pairs.senderReceiver()
 	ack := snd.SendWaitable(amqp.NewMessage())
-	rcv.Receive()
+	if _, err := rcv.Receive(); err != nil {
+		t.Error("receive", err)
+	}
 	go doDisposition(ack, results)
 
 	pairs.server.Close(want)
@@ -459,7 +465,9 @@ func TestConnectionCloseInterrupt2(t *testing.T) {
 	// Connection.Close() interrupts Send, Receive, Disposition.
 	snd, rcv := pairs.senderReceiver()
 	go doSend(snd, results)
-	rcv.Receive()
+	if _, err := rcv.Receive(); err != nil {
+		t.Error("receive", err)
+	}
 
 	rcv, snd = pairs.receiverSender()
 	go doReceive(rcv, results)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/77b907b8/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
index ede7b6c..2a426aa 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
@@ -155,7 +155,7 @@ func (h *handler) addLink(pl proton.Link, el Endpoint) {
 
 func (h *handler) linkClosed(l proton.Link, err error) {
 	if link, ok := h.links[l]; ok {
-		link.closed(err)
+		_ = link.closed(err)
 		delete(h.links, l)
 	}
 }
@@ -187,11 +187,11 @@ func (h *handler) shutdown(err error) {
 	}
 	h.sentMessages = nil
 	for _, l := range h.links {
-		l.closed(err)
+		_ = l.closed(err)
 	}
 	h.links = nil
 	for _, s := range h.sessions {
-		s.closed(err)
+		_ = s.closed(err)
 	}
 	h.sessions = nil
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/77b907b8/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
index e0f6cb4..1d17894 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
@@ -213,7 +213,7 @@ func (l *link) Credit() (credit int, err error) {
 func (l *link) Capacity() int { return l.capacity }
 
 func (l *link) Close(err error) {
-	l.engine().Inject(func() {
+	_ = l.engine().Inject(func() {
 		if l.Error() == nil {
 			localClose(l.pLink, err)
 		}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/77b907b8/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
index fb234e2..781fd7c 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
@@ -103,7 +103,7 @@ func (r *receiver) flow(credit int) {
 // Inject flow check per-caller call when prefetch is off.
 // Called with inc=1 at start of call, inc = -1 at end
 func (r *receiver) caller(inc int) {
-	r.engine().Inject(func() {
+	_ = r.engine().Inject(func() {
 		r.callers += inc
 		need := r.callers - (len(r.buffer) + r.pLink.Credit())
 		max := r.maxFlow()
@@ -117,7 +117,7 @@ func (r *receiver) caller(inc int) {
 // Inject flow top-up if prefetch is enabled
 func (r *receiver) flowTopUp() {
 	if r.prefetch {
-		r.engine().Inject(func() { r.flow(r.maxFlow()) })
+		_ = r.engine().Inject(func() { r.flow(r.maxFlow()) })
 	}
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/77b907b8/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
index 4b663aa..66a8995 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
@@ -69,7 +69,7 @@ func (s *session) pEndpoint() proton.Endpoint { return s.pSession }
 func (s *session) engine() *proton.Engine     { return s.connection.engine }
 
 func (s *session) Close(err error) {
-	s.engine().Inject(func() {
+	_ = s.engine().Inject(func() {
 		if s.Error() == nil {
 			localClose(s.pSession, err)
 		}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/77b907b8/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
index d9dcefd..6439fc1 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
@@ -205,8 +205,10 @@ func (eng *Engine) InjectWait(f func() error) error {
 //
 func (eng *Engine) Server() { eng.Transport().SetServer() }
 
-func (eng *Engine) disconnect() {
-	eng.conn.Close()
+func (eng *Engine) disconnect(err error) {
+	cond := eng.Transport().Condition()
+	cond.SetError(err)              // Set the provided error.
+	cond.SetError(eng.conn.Close()) // Use connection error if cond is not already set.
 	C.pn_connection_engine_disconnected(&eng.engine)
 }
 
@@ -214,13 +216,13 @@ func (eng *Engine) disconnect() {
 // If err != nil pass it to the remote end as the close condition.
 // Returns when the remote end closes or disconnects.
 func (eng *Engine) Close(err error) {
-	eng.Inject(func() { CloseError(eng.Connection(), err) })
+	_ = eng.Inject(func() { CloseError(eng.Connection(), err) })
 	<-eng.running
 }
 
 // CloseTimeout like Close but disconnect if the remote end doesn't close within timeout.
 func (eng *Engine) CloseTimeout(err error, timeout time.Duration) {
-	eng.Inject(func() { CloseError(eng.Connection(), err) })
+	_ = eng.Inject(func() { CloseError(eng.Connection(), err) })
 	select {
 	case <-eng.running:
 	case <-time.After(timeout):
@@ -231,7 +233,7 @@ func (eng *Engine) CloseTimeout(err error, timeout time.Duration) {
 // Disconnect the engine's connection immediately without an AMQP close.
 // Process any termination events before returning.
 func (eng *Engine) Disconnect(err error) {
-	eng.Inject(func() { eng.Transport().Condition().SetError(err); eng.disconnect() })
+	_ = eng.Inject(func() { eng.disconnect(err) })
 	<-eng.running
 }
 
@@ -358,8 +360,8 @@ func (eng *Engine) Run() error {
 	eng.err.Set(eng.Transport().Condition().Error())
 	close(readsIn)
 	close(writesIn)
-	eng.conn.Close() // Make sure connection is closed
-	wait.Wait()      // Wait for goroutines
+	_ = eng.conn.Close() // Make sure connection is closed
+	wait.Wait()          // Wait for goroutines
 
 	close(eng.running) // Signal goroutines have exited and Error is set.
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/77b907b8/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go
index 0fd652c..24e5eb3 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go
@@ -351,7 +351,7 @@ func (d *MessagingAdapter) HandleEvent(e Event) {
 	}
 }
 
-func (d *MessagingAdapter) incoming(e Event) (err error) {
+func (d *MessagingAdapter) incoming(e Event) {
 	delivery := e.Delivery()
 	if delivery.HasMessage() {
 		d.mhandler.HandleMessagingEvent(MMessage, e)
@@ -367,7 +367,7 @@ func (d *MessagingAdapter) incoming(e Event) (err error) {
 	return
 }
 
-func (d *MessagingAdapter) outgoing(e Event) (err error) {
+func (d *MessagingAdapter) outgoing(e Event) {
 	delivery := e.Delivery()
 	if delivery.Updated() {
 		switch delivery.Remote().Type() {
@@ -378,11 +378,11 @@ func (d *MessagingAdapter) outgoing(e Event) (err error) {
 		case Released, Modified:
 			d.mhandler.HandleMessagingEvent(MReleased, e)
 		}
-		if err == nil && delivery.Settled() {
+		if delivery.Settled() {
 			// The delivery was settled remotely, inform the local end.
 			d.mhandler.HandleMessagingEvent(MSettled, e)
 		}
-		if err == nil && d.AutoSettle {
+		if d.AutoSettle {
 			delivery.Settle() // Local settle, don't mhandler MSettled till the remote end settles.
 		}
 	}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/6] qpid-proton git commit: NO-JIRA: go config.sh added env vars to Go tests directly.

Posted by ac...@apache.org.
NO-JIRA: go config.sh added env vars to Go tests directly.

You can run go tests like this:

    . config.sh
    go test qpid.apache.org/...   # All package tests
    go test qpid.apache.org/proton # proton package tests.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/158ad910
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/158ad910
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/158ad910

Branch: refs/heads/master
Commit: 158ad9105b2c6adcd9415c863d6f0c29cb0ec5d4
Parents: 4bb2681
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Sep 7 15:04:05 2016 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue Sep 20 17:39:15 2016 -0400

----------------------------------------------------------------------
 config.sh.in | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/158ad910/config.sh.in
----------------------------------------------------------------------
diff --git a/config.sh.in b/config.sh.in
index 7e58883..4902b61 100755
--- a/config.sh.in
+++ b/config.sh.in
@@ -81,3 +81,6 @@ fi
 if [[ -x "$(type -p saslpasswd2)" ]] ; then
     export SASLPASSWD=$(type -p saslpasswd2)
 fi
+
+# Location of interop test files.
+export PN_INTEROP_DIR=$PROTON_HOME/tests/interop


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[5/6] qpid-proton git commit: PROTON-1255: connection_engine separate bind from _init

Posted by ac...@apache.org.
PROTON-1255: connection_engine separate bind from _init

SASL/SSL properties must be set on connection and transport *before* the
pn_transport_bind() call, move it out of pn_connection_engine_init() and add
pn_connection_engine_start() that must be called before using the engine but
after security (and other) properties are set.

Updated the Go and C++ engines accordingly.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/3d9fe620
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/3d9fe620
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/3d9fe620

Branch: refs/heads/master
Commit: 3d9fe620758bd55afae6af76b50a2bd5d6839232
Parents: b4d0912
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Sep 6 12:39:36 2016 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue Sep 20 17:39:15 2016 -0400

----------------------------------------------------------------------
 .../bindings/cpp/src/io/connection_engine.cpp   |  1 +
 .../go/src/qpid.apache.org/proton/engine.go     |  2 +
 proton-c/include/proton/connection_engine.h     | 40 ++++++++++++++------
 proton-c/src/engine/connection_engine.c         | 17 +++++----
 4 files changed, 40 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3d9fe620/proton-c/bindings/cpp/src/io/connection_engine.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/io/connection_engine.cpp b/proton-c/bindings/cpp/src/io/connection_engine.cpp
index 4712b3e..d3f2667 100644
--- a/proton-c/bindings/cpp/src/io/connection_engine.cpp
+++ b/proton-c/bindings/cpp/src/io/connection_engine.cpp
@@ -65,6 +65,7 @@ void connection_engine::configure(const connection_options& opts) {
     opts.apply_bound(c);
     handler_ =  opts.handler();
     connection_context::get(connection()).collector = c_engine_.collector;
+    pn_connection_engine_start(&c_engine_);
 }
 
 connection_engine::~connection_engine() {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3d9fe620/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
index 3cc6524..bfcb41c 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
@@ -248,6 +248,8 @@ func (eng *Engine) Disconnect(err error) {
 // disconnected.  You can check for errors after exit with Engine.Error().
 //
 func (eng *Engine) Run() error {
+	C.pn_connection_engine_start(&eng.engine)
+
 	// Channels for read and write buffers going in and out of the read/write goroutines.
 	// The channels are unbuffered: we want to exchange buffers in seuquence.
 	readsIn, writesIn := make(chan []byte), make(chan []byte)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3d9fe620/proton-c/include/proton/connection_engine.h
----------------------------------------------------------------------
diff --git a/proton-c/include/proton/connection_engine.h b/proton-c/include/proton/connection_engine.h
index cf5006b..b1476c7 100644
--- a/proton-c/include/proton/connection_engine.h
+++ b/proton-c/include/proton/connection_engine.h
@@ -43,16 +43,21 @@
 /// Summary of use:
 ///
 /// - while !pn_connection_engine_finished()
+///   - Call pn_connection_engine_dispatch() to dispatch events until it returns NULL.
 ///   - Read data from your source into pn_connection_engine_read_buffer()
 ///   - Call pn_connection_engine_read_done() when complete.
 ///   - Write data from pn_connection_engine_write_buffer() to your destination.
 ///   - Call pn_connection_engine_write_done() to indicate how much was written.
-///   - Call pn_connection_engine_dispatch() to dispatch events until it returns NULL.
 ///
-/// Note on error handling: most of the pn_connection_engine_*() functions do
-/// not return an error code. If a fatal error occurs, the transport error
-/// condition will be set and the next call to pn_connection_engine_dispatch()
-/// report it to the handler as a  PN_TRANSPORT_ERROR event and return false.
+/// Note on blocking: the _read/write_buffer and _read/write_done functions can
+/// all generate events that may cause the engine to finish. Before you wait for
+/// IO, always drain pn_connection_engine_dispatch() till it returns NULL and
+/// check pn_connection_engine_finished() in case there is nothing more to do..
+///
+/// Note on error handling: the pn_connection_engine_*() functions do not return
+/// an error code. If an error occurs it will be reported as a
+/// PN_TRANSPORT_ERROR event and pn_connection_engine_finished() will return
+/// true once all final events have been processed.
 ///
 /// @defgroup connection_engine The Connection Engine
 /// @{
@@ -79,16 +84,27 @@ typedef struct pn_connection_engine_t {
     pn_event_t* event;
 } pn_connection_engine_t;
 
-/// Initialize a pn_connection_engine_t struct with a new connection, transport
-/// and collector. Return 0 on success, a proton error code on failure.
+/// Initialize a pn_connection_engine_t struct with a new connection and
+/// transport.
+///
+/// Configure connection properties and call connection_engine_start() before
+/// using the engine.
+///
+/// Call pn_connection_engine_final to free resources when you are done.
+///
+///@return 0 on success, a proton error code on failure (@see error.h)
+///
 PN_EXTERN int pn_connection_engine_init(pn_connection_engine_t* engine);
 
-/// Release the connection, transport and collector associated with engine, set all the pointers
-/// to NULL. Only call on an engine that was initialized with pn_connection_engine_init
+/// Start the engine, call after setting security and host properties.
+PN_EXTERN void pn_connection_engine_start(pn_connection_engine_t* engine);
+
+/// Free resources used by the engine, set the connection and transport pointers
+/// to NULL.
 PN_EXTERN void pn_connection_engine_final(pn_connection_engine_t* engine);
 
-/// The engine's read buffer. Read data from your IO source to buf.start, up to
-/// a max of buf.size. Then call pn_connection_engine_read_done().
+/// Get the engine's read buffer. Read data from your IO source to buf.start, up
+/// to a max of buf.size. Then call pn_connection_engine_read_done().
 ///
 /// buf.size==0 means the engine cannot read presently, calling
 /// pn_connection_engine_dispatch() may create more buffer space.
@@ -104,7 +120,7 @@ PN_EXTERN void pn_connection_engine_read_done(pn_connection_engine_t*, size_t n)
 /// in pn_connection_engine_write_buffer()
 PN_EXTERN void pn_connection_engine_read_close(pn_connection_engine_t*);
 
-/// The engine's write buffer. Write data from buf.start to your IO destination,
+/// Get the engine's write buffer. Write data from buf.start to your IO destination,
 /// up to a max of buf.size. Then call pn_connection_engine_write_done().
 ///
 /// buf.size==0 means the engine has nothing to write presently.  Calling

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/3d9fe620/proton-c/src/engine/connection_engine.c
----------------------------------------------------------------------
diff --git a/proton-c/src/engine/connection_engine.c b/proton-c/src/engine/connection_engine.c
index 41a1bdc..adfb145 100644
--- a/proton-c/src/engine/connection_engine.c
+++ b/proton-c/src/engine/connection_engine.c
@@ -30,19 +30,20 @@ int pn_connection_engine_init(pn_connection_engine_t* e) {
     e->collector = pn_collector();
     if (!e->connection || !e->transport || !e->collector) {
         pn_connection_engine_final(e);
-        return PN_ERR;
-    }
-    int err;
-    // Bind before collect: don't expose the connection until it has a transport.
-    err = pn_transport_bind(e->transport, e->connection);
-    if (err) {
-        pn_connection_engine_final(e);
-        return err;
+        return PN_OUT_OF_MEMORY;
     }
     pn_connection_collect(e->connection, e->collector);
     return PN_OK;
 }
 
+void pn_connection_engine_start(pn_connection_engine_t* e) {
+    /*
+      Ignore bind errors. PN_STATE_ERR means we are already bound, any
+      other error will be delivered as an event.
+    */
+    pn_transport_bind(e->transport, e->connection);
+}
+
 void pn_connection_engine_final(pn_connection_engine_t* e) {
     if (e->transport && e->connection) {
         pn_transport_unbind(e->transport);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[4/6] qpid-proton git commit: PROTON-1305: Go sporadic test failure in electron_test

Posted by ac...@apache.org.
PROTON-1305: Go sporadic test failure in electron_test

Reliably reproduced by:
    . config.sh && go test qpid.apache.org/electron -run TestLinkCloseInterrupt -count 100

Fixed missing error check in receiver.ReceiveTimeout()


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/b4d0912b
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/b4d0912b
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/b4d0912b

Branch: refs/heads/master
Commit: b4d0912b0d8fa7c2ff2115635dece381f5d48868
Parents: 158ad91
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Sep 8 19:37:25 2016 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue Sep 20 17:39:15 2016 -0400

----------------------------------------------------------------------
 .../qpid.apache.org/electron/messaging_test.go  |  9 +++---
 .../go/src/qpid.apache.org/electron/receiver.go | 29 +++++++++++---------
 2 files changed, 21 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b4d0912b/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
index 4a9e652..c5c351a 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
@@ -324,22 +324,23 @@ func (p *pairs) receiverSender() (Receiver, Sender) {
 type result struct {
 	label string
 	err   error
+	value interface{}
 }
 
 func (r result) String() string { return fmt.Sprintf("%v(%v)", r.err, r.label) }
 
 func doSend(snd Sender, results chan result) {
 	err := snd.SendSync(amqp.NewMessage()).Error
-	results <- result{"send", err}
+	results <- result{"send", err, nil}
 }
 
 func doReceive(rcv Receiver, results chan result) {
-	_, err := rcv.Receive()
-	results <- result{"receive", err}
+	msg, err := rcv.Receive()
+	results <- result{"receive", err, msg}
 }
 
 func doDisposition(ack <-chan Outcome, results chan result) {
-	results <- result{"disposition", (<-ack).Error}
+	results <- result{"disposition", (<-ack).Error, nil}
 }
 
 // Senders get credit immediately if receivers have prefetch set

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/b4d0912b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
index 026a385..606e4d6 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
@@ -125,28 +125,31 @@ func (r *receiver) Receive() (rm ReceivedMessage, err error) {
 	return r.ReceiveTimeout(Forever)
 }
 
-func (r *receiver) ReceiveTimeout(timeout time.Duration) (ReceivedMessage, error) {
+func (r *receiver) ReceiveTimeout(timeout time.Duration) (rm ReceivedMessage, err error) {
 	assert(r.buffer != nil, "Receiver is not open: %s", r)
-	select { // Check for immediate availability, avoid caller() inject.
-	case rm := <-r.buffer:
-		r.flowTopUp()
-		return rm, nil
-	default:
-	}
 	if !r.prefetch { // Per-caller flow control
-		r.caller(+1)
-		defer r.caller(-1)
+		select { // Check for immediate availability, avoid caller() inject
+		case rm2, ok := <-r.buffer:
+			if ok {
+				rm = rm2
+			} else {
+				err = r.Error()
+			}
+			return
+		default: // Not immediately available, inject caller() counts
+			r.caller(+1)
+			defer r.caller(-1)
+		}
 	}
 	rmi, err := timedReceive(r.buffer, timeout)
 	switch err {
 	case nil:
 		r.flowTopUp()
-		return rmi.(ReceivedMessage), err
+		rm = rmi.(ReceivedMessage)
 	case Closed:
-		return ReceivedMessage{}, r.Error()
-	default:
-		return ReceivedMessage{}, err
+		err = r.Error()
 	}
+	return
 }
 
 // Called in proton goroutine on MMessage event.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/6] qpid-proton git commit: PROTON-1293: Go binding SASL support.

Posted by ac...@apache.org.
PROTON-1293: Go binding SASL support.

proton package
- added sasl.h to the Go proton wrapper generator.
- capitalized as "SASL" *not* "Sasl", consistent with Go conventions (e.g. URL)
- added SASL() accessor to proton.Transport.
- Engine.Run() extra dispatch and check as pn_connection_engine_read/write_buffer()
  can generate events that close the transport in an auth failure.
- Engine.Id()/String() use transport address to match PN_TRACE_ logs.
- Drop auto-open of engine: let user set security first.

electron package:
- new ConnectionOption: User, Password, VirtualHost, SASLAllowInsecure, SASLAllowedMechs
- global settings with GlobalSASLConfigDir, GobalSASLConfName
- IncomingConnection allows user to accept/reject authenticated incoming connections
- updated & improved documentation
- Endpoint.Sync() synchronous check of endpoint open, to verify authenticated.

Note: Password option takes []byte not string. There is no way to securely erase
a string from memory, an array can be over-written.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/0e37353d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/0e37353d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/0e37353d

Branch: refs/heads/master
Commit: 0e37353d7410841ebb291b14e05b7dc9943cd1ce
Parents: 3d9fe62
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Sep 6 13:11:09 2016 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue Sep 20 17:39:15 2016 -0400

----------------------------------------------------------------------
 proton-c/bindings/go/genwrap.go                 |  42 +-
 .../src/qpid.apache.org/electron/auth_test.go   | 124 +++++
 .../src/qpid.apache.org/electron/connection.go  | 209 ++++++--
 .../src/qpid.apache.org/electron/container.go   |  15 +-
 .../go/src/qpid.apache.org/electron/doc.go      |  35 +-
 .../qpid.apache.org/electron/electron_test.go   | 477 +++++++++++++++++++
 .../go/src/qpid.apache.org/electron/endpoint.go | 122 ++++-
 .../go/src/qpid.apache.org/electron/handler.go  |  80 ++--
 .../go/src/qpid.apache.org/electron/link.go     |  36 +-
 .../qpid.apache.org/electron/messaging_test.go  | 454 ------------------
 .../go/src/qpid.apache.org/electron/receiver.go |  10 +-
 .../go/src/qpid.apache.org/electron/sender.go   |  10 +-
 .../go/src/qpid.apache.org/proton/engine.go     |  83 ++--
 .../go/src/qpid.apache.org/proton/handlers.go   |   2 +-
 .../go/src/qpid.apache.org/proton/wrappers.go   |  21 +-
 .../src/qpid.apache.org/proton/wrappers_gen.go  |  80 +++-
 proton-c/src/engine/engine.c                    |   4 +
 17 files changed, 1126 insertions(+), 678 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/bindings/go/genwrap.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/genwrap.go b/proton-c/bindings/go/genwrap.go
index c904638..f295a32 100644
--- a/proton-c/bindings/go/genwrap.go
+++ b/proton-c/bindings/go/genwrap.go
@@ -19,7 +19,9 @@ under the License.
 
 // Code generator to generate a thin Go wrapper API around the C proton API.
 //
-
+// Not run automatically, generated sources are checked in. To update the
+// generated sources run `go run genwrap.go` in this directory.
+//
 package main
 
 import (
@@ -44,7 +46,7 @@ func main() {
 	panicIf(err)
 	defer out.Close()
 
-	apis := []string{"session", "link", "delivery", "disposition", "condition", "terminus", "connection", "transport"}
+	apis := []string{"session", "link", "delivery", "disposition", "condition", "terminus", "connection", "transport", "sasl"}
 	fmt.Fprintln(out, copyright)
 	fmt.Fprint(out, `
 package proton
@@ -89,12 +91,18 @@ import (
 	}
 }
 
+// Identify acronyms that should be uppercase not Mixedcase
+var acronym = regexp.MustCompile("(?i)SASL|AMQP")
+
 func mixedCase(s string) string {
 	result := ""
 	for _, w := range strings.Split(s, "_") {
-		if w != "" {
-			result = result + strings.ToUpper(w[0:1]) + strings.ToLower(w[1:])
+		if acronym.MatchString(w) {
+			w = strings.ToUpper(w)
+		} else {
+			w = strings.ToUpper(w[0:1]) + strings.ToLower(w[1:])
 		}
+		result = result + w
 	}
 	return result
 }
@@ -122,7 +130,13 @@ func findEnums(header string) (enums []enumType) {
 	return enums
 }
 
+// Types that are integral, not wrappers. Enums are added automatically.
+var simpleType = map[string]bool{
+	"State": true, // integral typedef
+}
+
 func genEnum(out io.Writer, name string, values []string) {
+	simpleType[mixedCase(name)] = true
 	doTemplate(out, []interface{}{name, values}, `
 {{$enumName := index . 0}}{{$values := index . 1}}
 type {{mixedCase $enumName}} C.pn_{{$enumName}}_t
@@ -140,10 +154,6 @@ func (e {{mixedCase $enumName}}) String() string {
 `)
 }
 
-var (
-	reSpace = regexp.MustCompile("\\s+")
-)
-
 func panicIf(err error) {
 	if err != nil {
 		panic(err)
@@ -202,7 +212,7 @@ var (
 	enumDefRe   = regexp.MustCompile("typedef enum {([^}]*)} pn_([a-z_]+)_t;")
 	enumValRe   = regexp.MustCompile("PN_[A-Z_]+")
 	skipEventRe = regexp.MustCompile("EVENT_NONE|REACTOR|SELECTABLE|TIMER")
-	skipFnRe    = regexp.MustCompile("attach|context|class|collect|link_recv|link_send|transport_.*logf$|transport_.*trace|transport_head|transport_push")
+	skipFnRe    = regexp.MustCompile("attach|context|class|collect|link_recv|link_send|transport_.*logf$|transport_.*trace|transport_head|transport_push|connection_set_password")
 )
 
 // Generate event wrappers.
@@ -268,20 +278,10 @@ func (g genType) goConvert(value string) string {
 	}
 }
 
-var notStruct = map[string]bool{
-	"EventType":        true,
-	"SndSettleMode":    true,
-	"RcvSettleMode":    true,
-	"TerminusType":     true,
-	"State":            true,
-	"Durability":       true,
-	"ExpiryPolicy":     true,
-	"DistributionMode": true,
-}
-
 func mapType(ctype string) (g genType) {
 	g.Ctype = "C." + strings.Trim(ctype, " \n")
 
+	// Special-case mappings for C types, default: is the general wrapper type case.
 	switch g.Ctype {
 	case "C.void":
 		g.Gotype = ""
@@ -331,7 +331,7 @@ func mapType(ctype string) (g genType) {
 			panic(fmt.Errorf("unknown C type %#v", g.Ctype))
 		}
 		g.Gotype = mixedCase(match[1])
-		if !notStruct[g.Gotype] {
+		if !simpleType[g.Gotype] {
 			g.ToGo = g.goLiteral
 			g.ToC = func(v string) string { return v + ".pn" }
 		}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/bindings/go/src/qpid.apache.org/electron/auth_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/auth_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/auth_test.go
new file mode 100644
index 0000000..a090b78
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/auth_test.go
@@ -0,0 +1,124 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
+	"fmt"
+	"io/ioutil"
+	"os"
+	"os/exec"
+	"path/filepath"
+	"strings"
+	"testing"
+)
+
+func testAuthClientServer(t *testing.T, copts []ConnectionOption, sopts []ConnectionOption) (got connectionSettings, err error) {
+	client, server := newClientServerOpts(t, copts, sopts)
+	defer closeClientServer(client, server)
+
+	go func() {
+		for in := range server.Incoming() {
+			switch in := in.(type) {
+			case *IncomingConnection:
+				got = connectionSettings{user: in.User(), virtualHost: in.VirtualHost()}
+			}
+			in.Accept()
+		}
+	}()
+
+	err = client.Sync()
+	return
+}
+
+func TestAuthAnonymous(t *testing.T) {
+	fatalIf(t, configureSASL())
+	got, err := testAuthClientServer(t,
+		[]ConnectionOption{User("fred"), VirtualHost("vhost"), SASLAllowInsecure(true)},
+		[]ConnectionOption{SASLAllowedMechs("ANONYMOUS"), SASLAllowInsecure(true)})
+	fatalIf(t, err)
+	errorIf(t, checkEqual(connectionSettings{"anonymous", "vhost"}, got))
+}
+
+func TestAuthPlain(t *testing.T) {
+	fatalIf(t, configureSASL())
+	got, err := testAuthClientServer(t,
+		[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN"), User("fred@proton"), Password([]byte("xxx"))},
+		[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN")})
+	fatalIf(t, err)
+	errorIf(t, checkEqual(connectionSettings{"fred@proton", ""}, got))
+}
+
+func TestAuthBadPass(t *testing.T) {
+	fatalIf(t, configureSASL())
+	_, err := testAuthClientServer(t,
+		[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN"), User("fred@proton"), Password([]byte("yyy"))},
+		[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN")})
+	if err == nil {
+		t.Error("Expected auth failure for bad pass")
+	}
+}
+
+func TestAuthBadUser(t *testing.T) {
+	fatalIf(t, configureSASL())
+	_, err := testAuthClientServer(t,
+		[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN"), User("foo@bar"), Password([]byte("yyy"))},
+		[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN")})
+	if err == nil {
+		t.Error("Expected auth failure for bad user")
+	}
+}
+
+var confDir string
+var confErr error
+
+func configureSASL() error {
+	if confDir != "" || confErr != nil {
+		return confErr
+	}
+	confDir, confErr = ioutil.TempDir("", "")
+	if confErr != nil {
+		return confErr
+	}
+
+	GlobalSASLConfigDir(confDir)
+	GlobalSASLConfigName("test")
+	conf := filepath.Join(confDir, "test.conf")
+
+	db := filepath.Join(confDir, "proton.sasldb")
+	cmd := exec.Command("saslpasswd2", "-c", "-p", "-f", db, "-u", "proton", "fred")
+	cmd.Stdin = strings.NewReader("xxx") // Password
+	if out, err := cmd.CombinedOutput(); err != nil {
+		confErr = fmt.Errorf("saslpasswd2 failed: %s\n%s", err, out)
+		return confErr
+	}
+	confStr := "sasldb_path: " + db + "\nmech_list: EXTERNAL DIGEST-MD5 SCRAM-SHA-1 CRAM-MD5 PLAIN ANONYMOUS\n"
+	if err := ioutil.WriteFile(conf, []byte(confStr), os.ModePerm); err != nil {
+		confErr = fmt.Errorf("write conf file %s failed: %s", conf, err)
+	}
+	return confErr
+}
+
+func TestMain(m *testing.M) {
+	status := m.Run()
+	if confDir != "" {
+		os.RemoveAll(confDir)
+	}
+	os.Exit(status)
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
index 96feb1f..3bc5dcf 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
@@ -23,16 +23,34 @@ package electron
 import "C"
 
 import (
-	"fmt"
 	"net"
 	"qpid.apache.org/proton"
 	"sync"
 	"time"
 )
 
+// Settings associated with a Connection.
+type ConnectionSettings interface {
+	// Authenticated user name associated with the connection.
+	User() string
+
+	// The AMQP virtual host name for the connection.
+	//
+	// Optional, useful when the server has multiple names and provides different
+	// service based on the name the client uses to connect.
+	//
+	// By default it is set to the DNS host name that the client uses to connect,
+	// but it can be set to something different at the client side with the
+	// VirtualHost() option.
+	//
+	// Returns error if the connection fails to authenticate.
+	VirtualHost() string
+}
+
 // Connection is an AMQP connection, created by a Container.
 type Connection interface {
 	Endpoint
+	ConnectionSettings
 
 	// Sender opens a new sender on the DefaultSession.
 	Sender(...LinkOption) (Sender, error)
@@ -59,48 +77,79 @@ type Connection interface {
 	// WaitTimeout is like Wait but returns Timeout if the timeout expires.
 	WaitTimeout(time.Duration) error
 
-	// Incoming returns a channel for incoming endpoints opened by the remote end.
-	//
-	// To enable, pass AllowIncoming() when creating the Connection. Otherwise all
-	// incoming endpoint requests are automatically rejected and Incoming()
-	// returns nil.
-	//
-	// An Incoming value can be an *IncomingSession, *IncomingSender or
-	// *IncomingReceiver.  You must call Accept() to open the endpoint or Reject()
-	// to close it with an error. The specific Incoming types have additional
-	// methods to configure the endpoint.
-	//
-	// Not receiving from Incoming() or not calling Accept/Reject will block the
-	// electron event loop. Normally you would have a dedicated goroutine receive
-	// from Incoming() and start new goroutines to serve each incoming endpoint.
-	// The channel is closed when the Connection closes.
+	// Incoming returns a channel for incoming endpoints opened by the remote peer.
+	// See the Incoming interface for more.
 	//
+	// Not receiving from Incoming() and calling Accept/Reject will block the
+	// electron event loop. You should run a loop to handle the types that
+	// interest you in a switch{} and and Accept() all others.
 	Incoming() <-chan Incoming
 }
 
+type connectionSettings struct {
+	user, virtualHost string
+}
+
+func (c connectionSettings) User() string        { return c.user }
+func (c connectionSettings) VirtualHost() string { return c.virtualHost }
+
 // ConnectionOption can be passed when creating a connection to configure various options
 type ConnectionOption func(*connection)
 
-// Server returns a ConnectionOption to put the connection in server mode.
+// User returns a ConnectionOption sets the user name for a connection
+func User(user string) ConnectionOption {
+	return func(c *connection) {
+		c.user = user
+		c.pConnection.SetUser(user)
+	}
+}
+
+// VirtualHost returns a ConnectionOption to set the AMQP virtual host for the connection.
+// Only applies to outbound client connection.
+func VirtualHost(virtualHost string) ConnectionOption {
+	return func(c *connection) {
+		c.virtualHost = virtualHost
+		c.pConnection.SetHostname(virtualHost)
+	}
+}
+
+// Password returns a ConnectionOption to set the password used to establish a
+// connection.  Only applies to outbound client connection.
+//
+// The connection will erase its copy of the password from memory as soon as it
+// has been used to authenticate. If you are concerned about paswords staying in
+// memory you should never store them as strings, and should overwrite your
+// copy as soon as you are done with it.
+//
+func Password(password []byte) ConnectionOption {
+	return func(c *connection) { c.pConnection.SetPassword(password) }
+}
+
+// Server returns a ConnectionOption to put the connection in server mode for incoming connections.
 //
 // A server connection will do protocol negotiation to accept a incoming AMQP
 // connection. Normally you would call this for a connection created by
 // net.Listener.Accept()
 //
-func Server() ConnectionOption { return func(c *connection) { c.engine.Server() } }
+func Server() ConnectionOption {
+	return func(c *connection) { c.engine.Server(); c.server = true; AllowIncoming()(c) }
+}
 
-// AllowIncoming returns a ConnectionOption to enable incoming endpoint open requests.
-// See Connection.Incoming()
+// AllowIncoming returns a ConnectionOption to enable incoming endpoints, see
+// Connection.Incoming() This is automatically set for Server() connections.
 func AllowIncoming() ConnectionOption {
 	return func(c *connection) { c.incoming = make(chan Incoming) }
 }
 
 type connection struct {
 	endpoint
+	connectionSettings
+
 	defaultSessionOnce, closeOnce sync.Once
 
 	container   *container
 	conn        net.Conn
+	server      bool
 	incoming    chan Incoming
 	handler     *handler
 	engine      *proton.Engine
@@ -110,23 +159,32 @@ type connection struct {
 }
 
 func newConnection(conn net.Conn, cont *container, setting ...ConnectionOption) (*connection, error) {
-	c := &connection{container: cont, conn: conn}
+	c := &connection{
+		container: cont,
+		conn:      conn,
+	}
 	c.handler = newHandler(c)
 	var err error
 	c.engine, err = proton.NewEngine(c.conn, c.handler.delegator)
 	if err != nil {
 		return nil, err
 	}
+	c.pConnection = c.engine.Connection()
+	c.pConnection.SetContainer(cont.Id())
 	for _, set := range setting {
 		set(c)
 	}
+	globalSASLInit(c.engine)
+
 	c.endpoint.init(c.engine.String())
-	c.pConnection = c.engine.Connection()
 	go c.run()
 	return c, nil
 }
 
 func (c *connection) run() {
+	if !c.server {
+		c.pConnection.Open()
+	}
 	c.engine.Run()
 	if c.incoming != nil {
 		close(c.incoming)
@@ -201,46 +259,95 @@ func (c *connection) WaitTimeout(timeout time.Duration) error {
 	return c.Error()
 }
 
-func (c *connection) Incoming() <-chan Incoming { return c.incoming }
+func (c *connection) Incoming() <-chan Incoming {
+	assert(c.incoming != nil, "electron.Connection.Incoming() disabled for %s", c)
+	return c.incoming
+}
 
-// Incoming is the interface for incoming requests to open an endpoint.
-// Implementing types are IncomingSession, IncomingSender and IncomingReceiver.
-type Incoming interface {
-	// Accept and open the endpoint.
-	Accept() Endpoint
+type IncomingConnection struct {
+	incoming
+	connectionSettings
+	c *connection
+}
 
-	// Reject the endpoint with an error
-	Reject(error)
+func newIncomingConnection(c *connection) *IncomingConnection {
+	c.user = c.pConnection.Transport().User()
+	c.virtualHost = c.pConnection.RemoteHostname()
+	return &IncomingConnection{
+		incoming:           makeIncoming(c.pConnection),
+		connectionSettings: c.connectionSettings,
+		c:                  c}
+}
 
-	// wait for and call the accept function, call in proton goroutine.
-	wait() error
-	pEndpoint() proton.Endpoint
+func (in *IncomingConnection) Accept() Endpoint {
+	return in.accept(func() Endpoint {
+		in.c.pConnection.Open()
+		return in.c
+	})
 }
 
-type incoming struct {
-	pep      proton.Endpoint
-	acceptCh chan func() error
+func sasl(c *connection) proton.SASL { return c.engine.Transport().SASL() }
+
+// SASLEnable returns a ConnectionOption that enables SASL authentication.
+// Only required if you don't set any other SASL options.
+func SASLEnable() ConnectionOption { return func(c *connection) { sasl(c) } }
+
+// SASLAllowedMechs returns a ConnectionOption to set the list of allowed SASL
+// mechanisms.
+//
+// Can be used on the client or the server to restrict the SASL for a connection.
+// mechs is a space-separated list of mechanism names.
+//
+func SASLAllowedMechs(mechs string) ConnectionOption {
+	return func(c *connection) { sasl(c).AllowedMechs(mechs) }
 }
 
-func makeIncoming(e proton.Endpoint) incoming {
-	return incoming{pep: e, acceptCh: make(chan func() error)}
+// SASLAllowInsecure returns a ConnectionOption that allows or disallows clear
+// text SASL authentication mechanisms
+//
+// By default the SASL layer is configured not to allow mechanisms that disclose
+// the clear text of the password over an unencrypted AMQP connection. This specifically
+// will disallow the use of the PLAIN mechanism without using SSL encryption.
+//
+// This default is to avoid disclosing password information accidentally over an
+// insecure network.
+//
+func SASLAllowInsecure(b bool) ConnectionOption {
+	return func(c *connection) { sasl(c).SetAllowInsecureMechs(b) }
 }
 
-func (in *incoming) String() string   { return fmt.Sprintf("%s: %s", in.pep.Type(), in.pep) }
-func (in *incoming) Reject(err error) { in.acceptCh <- func() error { return err } }
+// GlobalSASLConfigDir sets the SASL configuration directory for every
+// Connection created in this process. If not called, the default is determined
+// by your SASL installation.
+//
+// You can set SASLAllowInsecure and SASLAllowedMechs on individual connections.
+//
+func GlobalSASLConfigDir(dir string) { globalSASLConfigDir = dir }
 
-// Call in proton goroutine, wait for and call the accept function fr
-func (in *incoming) wait() error { return (<-in.acceptCh)() }
+// GlobalSASLConfigName sets the SASL configuration name for every Connection
+// created in this process. If not called the default is "proton-server".
+//
+// The complete configuration file name is
+//     <sasl-config-dir>/<sasl-config-name>.conf
+//
+// You can set SASLAllowInsecure and SASLAllowedMechs on individual connections.
+//
+func GlobalSASLConfigName(dir string) { globalSASLConfigName = dir }
 
-func (in *incoming) pEndpoint() proton.Endpoint { return in.pep }
+var (
+	globalSASLConfigName string
+	globalSASLConfigDir  string
+)
 
-// Called in app goroutine to send an accept function to proton and return the resulting endpoint.
-func (in *incoming) accept(f func() Endpoint) Endpoint {
-	done := make(chan Endpoint)
-	in.acceptCh <- func() error {
-		ep := f()
-		done <- ep
-		return nil
+// TODO aconway 2016-09-15: Current pn_sasl C impl config is broken, so all we
+// can realistically offer is global configuration. Later if/when the pn_sasl C
+// impl is fixed we can offer per connection over-rides.
+func globalSASLInit(eng *proton.Engine) {
+	sasl := eng.Transport().SASL()
+	if globalSASLConfigName != "" {
+		sasl.ConfigName(globalSASLConfigName)
+	}
+	if globalSASLConfigDir != "" {
+		sasl.ConfigPath(globalSASLConfigDir)
 	}
-	return <-done
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/container.go b/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
index b5ce6c0..1ab4df2 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
@@ -35,12 +35,17 @@ type Container interface {
 	// Id is a unique identifier for the container in your distributed application.
 	Id() string
 
-	// Create a new AMQP Connection over the supplied net.Conn connection.
+	// Enable AMQP over the supplied net.Conn. Returns a Connection endpoint.
 	//
-	// You must call Connection.Open() on the returned Connection, after
-	// setting any Connection properties you need to set. Note the net.Conn
-	// can be an outgoing connection (e.g. made with net.Dial) or an incoming
-	// connection (e.g. made with net.Listener.Accept())
+	// For client connections (e.g. established with net.Dial()), you can start
+	// using the connection immediately. Connection.Incoming() is disabled by
+	// default for clients, pass an AllowIncoming() option to enable incoming
+	// sessions and links.
+	//
+	// For server connection (e.g. established with net.Listener.Accept()) you
+	// must pass the Server() option and receive from the Connection.Incoming()
+	// channel. The first Incoming value will be an *IncomingConnection that lets
+	// you examine the connection properties before Accept() or Reject()
 	Connection(net.Conn, ...ConnectionOption) (Connection, error)
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go b/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
index 46bde37..207d8ba 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
@@ -21,21 +21,20 @@ under the License.
 Package electron is a procedural, concurrent-safe Go library for AMQP messaging.
 You can write clients and servers using this library.
 
-Start by creating a Container with NewContainer. A Container represents a client
-or server application that can contain many incoming or outgoing connections.
+Start by creating a Container with NewContainer. An AMQP Container represents a
+single AMQP "application" and can contain client and server connections.
 
-Create connections with the standard Go 'net' package using net.Dial or
-net.Listen. Create an AMQP connection over a net.Conn with
-Container.Connection() and open it with Connection.Open().
+You can enable AMQP over any connection that implements the standard net.Conn
+interface. Typically you can connect with net.Dial() or listen for server
+connections with net.Listen.  Enable AMQP by passing the net.Conn to
+Container.Connection().
 
-AMQP sends messages over "links". Each link has a Sender end and a Receiver
-end. Connection.Sender() and Connection.Receiver() allow you to create links to
-Send() and Receive() messages.
-
-You can create an AMQP server connection by calling Connection.Server() and
-Connection.Listen() before calling Connection.Open(). A server connection can
-negotiate protocol security details and can accept incoming links opened from
-the remote end of the connection.
+AMQP allows bi-direction peer-to-peer message exchange as well as
+client-to-broker. Messages are sent over "links". Each link is one-way and has a
+Sender and Receiver end. Connection.Sender() and Connection.Receiver() open
+links to Send() and Receive() messages. Connection.Incoming() lets you accept
+incoming links opened by the remote peer. You can open and accept multiple links
+in both directions on a single Connection.
 
 */
 package electron
@@ -54,10 +53,10 @@ only accessed in the event-loop goroutine, so no locks are required there.
 The handler sets up channels as needed to get or send data from user goroutines
 using electron types like Sender or Receiver.
 
-We also use Engine.Inject to inject actions into the event loop from user
-goroutines. It is important to check at the start of an injected function that
-required objects are still valid, for example a link may be remotely closed
-between the time a Sender function calls Inject and the time the injected
-function is execute by the handler goroutine. See comments in endpoint.go for more.
+Engine.Inject injects actions into the event loop from user goroutines. It is
+important to check at the start of an injected function that required objects
+are still valid, for example a link may be remotely closed between the time a
+Sender function calls Inject and the time the injected function is execute by
+the handler goroutine.
 
 */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go
new file mode 100644
index 0000000..56b91bf
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/electron_test.go
@@ -0,0 +1,477 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
+	"fmt"
+	"net"
+	"path"
+	"qpid.apache.org/amqp"
+	"reflect"
+	"runtime"
+	"testing"
+	"time"
+)
+
+func fatalIf(t *testing.T, err error) {
+	if err != nil {
+		_, file, line, ok := runtime.Caller(1) // annotate with location of caller.
+		if ok {
+			_, file = path.Split(file)
+		}
+		t.Fatalf("(from %s:%d) %v", file, line, err)
+	}
+}
+
+func errorIf(t *testing.T, err error) {
+	if err != nil {
+		_, file, line, ok := runtime.Caller(1) // annotate with location of caller.
+		if ok {
+			_, file = path.Split(file)
+		}
+		t.Errorf("(from %s:%d) %v", file, line, err)
+	}
+}
+
+func checkEqual(want interface{}, got interface{}) error {
+	if !reflect.DeepEqual(want, got) {
+		return fmt.Errorf("%#v != %#v", want, got)
+	}
+	return nil
+}
+
+// Start a server, return listening addr and channel for incoming Connections.
+func newServer(t *testing.T, cont Container, opts ...ConnectionOption) (net.Addr, <-chan Connection) {
+	listener, err := net.Listen("tcp", "")
+	fatalIf(t, err)
+	addr := listener.Addr()
+	ch := make(chan Connection)
+	go func() {
+		conn, err := listener.Accept()
+		c, err := cont.Connection(conn, append([]ConnectionOption{Server()}, opts...)...)
+		fatalIf(t, err)
+		ch <- c
+	}()
+	return addr, ch
+}
+
+// Open a client connection and session, return the session.
+func newClient(t *testing.T, cont Container, addr net.Addr, opts ...ConnectionOption) Session {
+	conn, err := net.Dial(addr.Network(), addr.String())
+	fatalIf(t, err)
+	c, err := cont.Connection(conn, opts...)
+	fatalIf(t, err)
+	sn, err := c.Session()
+	fatalIf(t, err)
+	return sn
+}
+
+// Return client and server ends of the same connection.
+func newClientServerOpts(t *testing.T, copts []ConnectionOption, sopts []ConnectionOption) (client Session, server Connection) {
+	addr, ch := newServer(t, NewContainer("test-server"), sopts...)
+	client = newClient(t, NewContainer("test-client"), addr, copts...)
+	return client, <-ch
+}
+
+// Return client and server ends of the same connection.
+func newClientServer(t *testing.T) (client Session, server Connection) {
+	return newClientServerOpts(t, nil, nil)
+}
+
+// Close client and server
+func closeClientServer(client Session, server Connection) {
+	client.Connection().Close(nil)
+	server.Close(nil)
+}
+
+// Send a message one way with a client sender and server receiver, verify ack.
+func TestClientSendServerReceive(t *testing.T) {
+	nLinks := 3
+	nMessages := 3
+
+	rchan := make(chan Receiver, nLinks)
+	client, server := newClientServer(t)
+	go func() {
+		for in := range server.Incoming() {
+			switch in := in.(type) {
+			case *IncomingReceiver:
+				in.SetCapacity(1)
+				in.SetPrefetch(false)
+				rchan <- in.Accept().(Receiver)
+			default:
+				in.Accept()
+			}
+		}
+	}()
+
+	defer func() { closeClientServer(client, server) }()
+
+	s := make([]Sender, nLinks)
+	for i := 0; i < nLinks; i++ {
+		var err error
+		s[i], err = client.Sender(Target(fmt.Sprintf("foo%d", i)))
+		if err != nil {
+			t.Fatal(err)
+		}
+	}
+	r := make([]Receiver, nLinks)
+	for i := 0; i < nLinks; i++ {
+		r[i] = <-rchan
+	}
+
+	for i := 0; i < nLinks; i++ {
+		for j := 0; j < nMessages; j++ {
+			// Client send
+			ack := make(chan Outcome, 1)
+			sendDone := make(chan struct{})
+			go func() {
+				defer close(sendDone)
+				m := amqp.NewMessageWith(fmt.Sprintf("foobar%v-%v", i, j))
+				var err error
+				s[i].SendAsync(m, ack, "testing")
+				if err != nil {
+					t.Fatal(err)
+				}
+			}()
+
+			// Server recieve
+			rm, err := r[i].Receive()
+			if err != nil {
+				t.Fatal(err)
+			}
+			if want, got := interface{}(fmt.Sprintf("foobar%v-%v", i, j)), rm.Message.Body(); want != got {
+				t.Errorf("%#v != %#v", want, got)
+			}
+
+			// Should not be acknowledged on client yet
+			<-sendDone
+			select {
+			case <-ack:
+				t.Errorf("unexpected ack")
+			default:
+			}
+
+			// Server send ack
+			if err := rm.Reject(); err != nil {
+				t.Error(err)
+			}
+			// Client get ack.
+			if a := <-ack; a.Value != "testing" || a.Error != nil || a.Status != Rejected {
+				t.Error("unexpected ack: ", a.Status, a.Error, a.Value)
+			}
+		}
+	}
+}
+
+func TestClientReceiver(t *testing.T) {
+	nMessages := 3
+	client, server := newClientServer(t)
+	go func() {
+		for in := range server.Incoming() {
+			switch in := in.(type) {
+			case *IncomingSender:
+				s := in.Accept().(Sender)
+				go func() {
+					for i := int32(0); i < int32(nMessages); i++ {
+						out := s.SendSync(amqp.NewMessageWith(i))
+						if out.Error != nil {
+							t.Error(out.Error)
+							return
+						}
+					}
+					s.Close(nil)
+				}()
+			default:
+				in.Accept()
+			}
+		}
+	}()
+
+	r, err := client.Receiver(Source("foo"))
+	if err != nil {
+		t.Fatal(err)
+	}
+	for i := int32(0); i < int32(nMessages); i++ {
+		rm, err := r.Receive()
+		if err != nil {
+			if err != Closed {
+				t.Error(err)
+			}
+			break
+		}
+		if err := rm.Accept(); err != nil {
+			t.Error(err)
+		}
+		if b, ok := rm.Message.Body().(int32); !ok || b != i {
+			t.Errorf("want %v, true got %v, %v", i, b, ok)
+		}
+	}
+	server.Close(nil)
+	client.Connection().Close(nil)
+}
+
+// Test timeout versions of waiting functions.
+func TestTimeouts(t *testing.T) {
+	var err error
+	rchan := make(chan Receiver, 1)
+	client, server := newClientServer(t)
+	go func() {
+		for i := range server.Incoming() {
+			switch i := i.(type) {
+			case *IncomingReceiver:
+				i.SetCapacity(1)
+				i.SetPrefetch(false)
+				rchan <- i.Accept().(Receiver) // Issue credit only on receive
+			default:
+				i.Accept()
+			}
+		}
+	}()
+	defer func() { closeClientServer(client, server) }()
+
+	// Open client sender
+	snd, err := client.Sender(Target("test"))
+	if err != nil {
+		t.Fatal(err)
+	}
+	rcv := <-rchan
+
+	// Test send with timeout
+	short := time.Millisecond
+	long := time.Second
+	m := amqp.NewMessage()
+	if err := snd.SendSyncTimeout(m, 0).Error; err != Timeout { // No credit, expect timeout.
+		t.Error("want Timeout got", err)
+	}
+	if err := snd.SendSyncTimeout(m, short).Error; err != Timeout { // No credit, expect timeout.
+		t.Error("want Timeout got", err)
+	}
+	// Test receive with timeout
+	if _, err = rcv.ReceiveTimeout(0); err != Timeout { // No credit, expect timeout.
+		t.Error("want Timeout got", err)
+	}
+	// Test receive with timeout
+	if _, err = rcv.ReceiveTimeout(short); err != Timeout { // No credit, expect timeout.
+		t.Error("want Timeout got", err)
+	}
+	// There is now a credit on the link due to receive
+	ack := make(chan Outcome)
+	snd.SendAsyncTimeout(m, ack, nil, short)
+	// Disposition should timeout
+	select {
+	case <-ack:
+		t.Errorf("want Timeout got %#v", ack)
+	case <-time.After(short):
+	}
+
+	// Receive and accept
+	rm, err := rcv.ReceiveTimeout(long)
+	if err != nil {
+		t.Fatal(err)
+	}
+	rm.Accept()
+	// Sender get ack
+	if a := <-ack; a.Status != Accepted || a.Error != nil {
+		t.Errorf("want (accepted, nil) got %#v", a)
+	}
+}
+
+// A server that returns the opposite end of each client link via channels.
+type pairs struct {
+	t        *testing.T
+	client   Session
+	server   Connection
+	rchan    chan Receiver
+	schan    chan Sender
+	capacity int
+	prefetch bool
+}
+
+func newPairs(t *testing.T, capacity int, prefetch bool) *pairs {
+	p := &pairs{t: t, rchan: make(chan Receiver, 1), schan: make(chan Sender, 1)}
+	p.client, p.server = newClientServer(t)
+	go func() {
+		for i := range p.server.Incoming() {
+			switch i := i.(type) {
+			case *IncomingReceiver:
+				i.SetCapacity(capacity)
+				i.SetPrefetch(prefetch)
+				p.rchan <- i.Accept().(Receiver)
+			case *IncomingSender:
+				p.schan <- i.Accept().(Sender)
+			default:
+				i.Accept()
+			}
+		}
+	}()
+	return p
+}
+
+func (p *pairs) close() {
+	closeClientServer(p.client, p.server)
+}
+
+// Return a client sender and server receiver
+func (p *pairs) senderReceiver() (Sender, Receiver) {
+	snd, err := p.client.Sender()
+	fatalIf(p.t, err)
+	rcv := <-p.rchan
+	return snd, rcv
+}
+
+// Return a client receiver and server sender
+func (p *pairs) receiverSender() (Receiver, Sender) {
+	rcv, err := p.client.Receiver()
+	fatalIf(p.t, err)
+	snd := <-p.schan
+	return rcv, snd
+}
+
+type result struct {
+	label string
+	err   error
+	value interface{}
+}
+
+func (r result) String() string { return fmt.Sprintf("%v(%v)", r.err, r.label) }
+
+func doSend(snd Sender, results chan result) {
+	err := snd.SendSync(amqp.NewMessage()).Error
+	results <- result{"send", err, nil}
+}
+
+func doReceive(rcv Receiver, results chan result) {
+	msg, err := rcv.Receive()
+	results <- result{"receive", err, msg}
+}
+
+func doDisposition(ack <-chan Outcome, results chan result) {
+	results <- result{"disposition", (<-ack).Error, nil}
+}
+
+// Senders get credit immediately if receivers have prefetch set
+func TestSendReceivePrefetch(t *testing.T) {
+	pairs := newPairs(t, 1, true)
+	s, r := pairs.senderReceiver()
+	s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // Should not block for credit.
+	if _, err := r.Receive(); err != nil {
+		t.Error(err)
+	}
+}
+
+// Senders do not get credit till Receive() if receivers don't have prefetch
+func TestSendReceiveNoPrefetch(t *testing.T) {
+	pairs := newPairs(t, 1, false)
+	s, r := pairs.senderReceiver()
+	done := make(chan struct{}, 1)
+	go func() {
+		s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // Should block for credit.
+		close(done)
+	}()
+	select {
+	case <-done:
+		t.Errorf("send should be blocked on credit")
+	default:
+		if _, err := r.Receive(); err != nil {
+			t.Error(err)
+		} else {
+			<-done
+		} // Should be unblocked now
+	}
+}
+
+// Test that closing Links interrupts blocked link functions.
+func TestLinkCloseInterrupt(t *testing.T) {
+	want := amqp.Error{Name: "x", Description: "all bad"}
+	pairs := newPairs(t, 1, false)
+	results := make(chan result) // Collect expected errors
+
+	// Note closing the link does not interrupt Send() calls, the AMQP spec says
+	// that deliveries can be settled after the link is closed.
+
+	// Receiver.Close() interrupts Receive()
+	snd, rcv := pairs.senderReceiver()
+	go doReceive(rcv, results)
+	rcv.Close(want)
+	if r := <-results; want != r.err {
+		t.Errorf("want %#v got %#v", want, r)
+	}
+
+	// Remote Sender.Close() interrupts Receive()
+	snd, rcv = pairs.senderReceiver()
+	go doReceive(rcv, results)
+	snd.Close(want)
+	if r := <-results; want != r.err {
+		t.Errorf("want %#v got %#v", want, r)
+	}
+}
+
+// Test closing the server end of a connection.
+func TestConnectionCloseInterrupt1(t *testing.T) {
+	want := amqp.Error{Name: "x", Description: "bad"}
+	pairs := newPairs(t, 1, true)
+	results := make(chan result) // Collect expected errors
+
+	// Connection.Close() interrupts Send, Receive, Disposition.
+	snd, rcv := pairs.senderReceiver()
+	go doSend(snd, results)
+
+	rcv.Receive()
+	rcv, snd = pairs.receiverSender()
+	go doReceive(rcv, results)
+
+	snd, rcv = pairs.senderReceiver()
+	ack := snd.SendWaitable(amqp.NewMessage())
+	rcv.Receive()
+	go doDisposition(ack, results)
+
+	pairs.server.Close(want)
+	for i := 0; i < 3; i++ {
+		if r := <-results; want != r.err {
+			t.Errorf("want %v got %v", want, r)
+		}
+	}
+}
+
+// Test closing the client end of the connection.
+func TestConnectionCloseInterrupt2(t *testing.T) {
+	want := amqp.Error{Name: "x", Description: "bad"}
+	pairs := newPairs(t, 1, true)
+	results := make(chan result) // Collect expected errors
+
+	// Connection.Close() interrupts Send, Receive, Disposition.
+	snd, rcv := pairs.senderReceiver()
+	go doSend(snd, results)
+	rcv.Receive()
+
+	rcv, snd = pairs.receiverSender()
+	go doReceive(rcv, results)
+
+	snd, rcv = pairs.senderReceiver()
+	ack := snd.SendWaitable(amqp.NewMessage())
+	go doDisposition(ack, results)
+
+	pairs.client.Connection().Close(want)
+	for i := 0; i < 3; i++ {
+		if r := <-results; want != r.err {
+			t.Errorf("want %v got %v", want, r.err)
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go b/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
index fc701c6..ca93e5b 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
@@ -20,6 +20,7 @@ under the License.
 package electron
 
 import (
+	"fmt"
 	"io"
 	"qpid.apache.org/proton"
 )
@@ -28,12 +29,13 @@ import (
 // was closed cleanly.
 var Closed = io.EOF
 
-// Endpoint is the common interface for Connection, Session, Link, Sender and Receiver.
+// Endpoint is the local end of a communications channel to the remote peer
+// process.  The following interface implement Endpoint: Connection, Session,
+// Sender and Receiver.
 //
-// Endpoints can be created locally or by the remote peer. You must Open() an
-// endpoint before you can use it. Some endpoints have additional Set*() methods
-// that must be called before Open() to take effect, see Connection, Session,
-// Link, Sender and Receiver for details.
+// You can create an endpoint with functions on Container, Connection and
+// Session. You can accept incoming endpoints from the remote peer using
+// Connection.Incoming()
 //
 type Endpoint interface {
 	// Close an endpoint and signal an error to the remote end if error != nil.
@@ -46,35 +48,58 @@ type Endpoint interface {
 	// Error() == Closed means the endpoint was closed without error.
 	Error() error
 
-	// Connection containing the endpoint
+	// Connection is the connection associated with this endpoint.
 	Connection() Connection
 
 	// Done returns a channel that will close when the endpoint closes.
-	// Error() will contain the reason.
+	// After Done() has closed, Error() will return the reason for closing.
 	Done() <-chan struct{}
 
+	// Sync() waits for the remote peer to confirm the endpoint is active or
+	// reject it with an error. You can call it immediately on new endpoints
+	// for more predictable error handling.
+	//
+	// AMQP is an asynchronous protocol. It is legal to create an endpoint and
+	// start using it without waiting for confirmation. This avoids a needless
+	// delay in the non-error case and throughput by "assuming the best".
+	//
+	// However if there *is* an error, these "optimistic" actions will fail. The
+	// endpoint and its children will be closed with an error. The error will only
+	// be detected when you try to use one of these endpoints or call Sync()
+	Sync() error
+
 	// Called in handler goroutine when endpoint is remotely closed.
 	closed(err error) error
+	wakeSync()
 }
 
-// DEVELOPER NOTES
-//
-// An electron.Endpoint corresponds to a proton.Endpoint, which can be invalidated
-//
+// Base implementation for Endpoint
 type endpoint struct {
-	err  proton.ErrorHolder
-	str  string // Must be set by the value that embeds endpoint.
-	done chan struct{}
+	err    proton.ErrorHolder
+	str    string // String() return value.
+	done   chan struct{}
+	active chan struct{}
 }
 
-func (e *endpoint) init(s string) { e.str = s; e.done = make(chan struct{}) }
+func (e *endpoint) init(s string) {
+	e.str = s
+	e.done = make(chan struct{})
+	e.active = make(chan struct{})
+}
+
+// Called in proton goroutine on remote open.
+func (e *endpoint) wakeSync() {
+	select { // Close active channel if not already closed.
+	case <-e.active:
+	default:
+		close(e.active)
+	}
+}
 
-// Called in handler on a Closed event. Marks the endpoint as closed and the corresponding
-// proton.Endpoint pointer as invalid. Injected functions should check Error() to ensure
-// the pointer has not been invalidated.
+// Called in proton goroutine (from handler) on a Closed or Disconnected event.
 //
-// Returns the error stored on the endpoint, which may not be different to err if there was
-// already a n error
+// Set err if there is not already an error on the endpoint.
+// Return Error()
 func (e *endpoint) closed(err error) error {
 	select {
 	case <-e.done:
@@ -82,9 +107,10 @@ func (e *endpoint) closed(err error) error {
 	default:
 		e.err.Set(err)
 		e.err.Set(Closed)
+		e.wakeSync() // Make sure we wake up Sync()
 		close(e.done)
 	}
-	return e.err.Get()
+	return e.Error()
 }
 
 func (e *endpoint) String() string { return e.str }
@@ -93,6 +119,11 @@ func (e *endpoint) Error() error { return e.err.Get() }
 
 func (e *endpoint) Done() <-chan struct{} { return e.done }
 
+func (e *endpoint) Sync() error {
+	<-e.active
+	return e.Error()
+}
+
 // Call in proton goroutine to initiate closing an endpoint locally
 // handler will complete the close when remote end closes.
 func localClose(ep proton.Endpoint, err error) {
@@ -100,3 +131,52 @@ func localClose(ep proton.Endpoint, err error) {
 		proton.CloseError(ep, err)
 	}
 }
+
+// Incoming is the interface for incoming endpoints, see Connection.Incoming()
+//
+// Call Incoming.Accept() to open the endpoint or Incoming.Reject() to close it
+// with optional error
+//
+// Implementing types are *IncomingConnection, *IncomingSession, *IncomingSender
+// and *IncomingReceiver. Each type provides methods to examine the incoming
+// endpoint request and set configuration options for the local endpoint
+// before calling Accept() or Reject()
+type Incoming interface {
+	// Accept and open the endpoint.
+	Accept() Endpoint
+
+	// Reject the endpoint with an error
+	Reject(error)
+
+	// wait for and call the accept function, call in proton goroutine.
+	wait() error
+	pEndpoint() proton.Endpoint
+}
+
+type incoming struct {
+	pep      proton.Endpoint
+	acceptCh chan func() error
+}
+
+func makeIncoming(e proton.Endpoint) incoming {
+	return incoming{pep: e, acceptCh: make(chan func() error)}
+}
+
+func (in *incoming) String() string   { return fmt.Sprintf("%s: %s", in.pep.Type(), in.pep) }
+func (in *incoming) Reject(err error) { in.acceptCh <- func() error { return err } }
+
+// Call in proton goroutine, wait for and call the accept function.
+func (in *incoming) wait() error { return (<-in.acceptCh)() }
+
+func (in *incoming) pEndpoint() proton.Endpoint { return in.pep }
+
+// Called in app goroutine to send an accept function to proton and return the resulting endpoint.
+func (in *incoming) accept(f func() Endpoint) Endpoint {
+	done := make(chan Endpoint)
+	in.acceptCh <- func() error {
+		ep := f()
+		done <- ep
+		return nil
+	}
+	return <-done
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
index eb53df3..ede7b6c 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
@@ -78,28 +78,38 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
 			h.linkError(e.Link(), "no sender")
 		}
 
+	case proton.MConnectionOpening:
+		if e.Connection().State().LocalUninit() { // Remotely opened
+			h.incoming(newIncomingConnection(h.connection))
+		}
+		h.connection.wakeSync()
+
 	case proton.MSessionOpening:
 		if e.Session().State().LocalUninit() { // Remotely opened
 			h.incoming(newIncomingSession(h, e.Session()))
 		}
+		h.sessions[e.Session()].wakeSync()
 
 	case proton.MSessionClosed:
 		h.sessionClosed(e.Session(), proton.EndpointError(e.Session()))
 
 	case proton.MLinkOpening:
 		l := e.Link()
-		if l.State().LocalActive() { // Already opened locally.
-			break
-		}
-		ss := h.sessions[l.Session()]
-		if ss == nil {
-			h.linkError(e.Link(), "no session")
-			break
-		}
-		if l.IsReceiver() {
-			h.incoming(&IncomingReceiver{makeIncomingLink(ss, l)})
+		if ss := h.sessions[l.Session()]; ss != nil {
+			if l.State().LocalUninit() { // Remotely opened.
+				if l.IsReceiver() {
+					h.incoming(newIncomingReceiver(ss, l))
+				} else {
+					h.incoming(newIncomingSender(ss, l))
+				}
+			}
+			if ep, ok := h.links[l]; ok {
+				ep.wakeSync()
+			} else {
+				h.linkError(l, "no link")
+			}
 		} else {
-			h.incoming(&IncomingSender{makeIncomingLink(ss, l)})
+			h.linkError(l, "no session")
 		}
 
 	case proton.MLinkClosing:
@@ -112,27 +122,14 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
 		h.connection.err.Set(e.Connection().RemoteCondition().Error())
 
 	case proton.MConnectionClosed:
-		h.connectionClosed(proton.EndpointError(e.Connection()))
+		h.shutdown(proton.EndpointError(e.Connection()))
 
 	case proton.MDisconnected:
-		h.connection.err.Set(e.Transport().Condition().Error())
-		// If err not set at this point (e.g. to Closed) then this is unexpected.
-		h.connection.err.Set(amqp.Errorf(amqp.IllegalState, "unexpected disconnect on %s", h.connection))
-
-		err := h.connection.Error()
-
-		for l, _ := range h.links {
-			h.linkClosed(l, err)
-		}
-		h.links = nil
-		for _, s := range h.sessions {
-			s.closed(err)
-		}
-		h.sessions = nil
-		for _, sm := range h.sentMessages {
-			sm.ack <- Outcome{Unacknowledged, err, sm.value}
+		err := e.Transport().Condition().Error()
+		if err == nil {
+			err = amqp.Errorf(amqp.IllegalState, "unexpected disconnect on %s", h.connection)
 		}
-		h.sentMessages = nil
+		h.shutdown(err)
 	}
 }
 
@@ -175,13 +172,26 @@ func (h *handler) sessionClosed(ps proton.Session, err error) {
 	}
 }
 
-func (h *handler) connectionClosed(err error) {
+func (h *handler) shutdown(err error) {
 	err = h.connection.closed(err)
-	// Close links first to avoid repeated scans of the link list by sessions.
-	for l, _ := range h.links {
-		h.linkClosed(l, err)
+	for _, sm := range h.sentMessages {
+		// Don't block but ensure outcome is sent eventually.
+		if sm.ack != nil {
+			o := Outcome{Unacknowledged, err, sm.value}
+			select {
+			case sm.ack <- o:
+			default:
+				go func(ack chan<- Outcome) { ack <- o }(sm.ack) // Deliver it eventually
+			}
+		}
+	}
+	h.sentMessages = nil
+	for _, l := range h.links {
+		l.closed(err)
 	}
-	for s, _ := range h.sessions {
-		h.sessionClosed(s, err)
+	h.links = nil
+	for _, s := range h.sessions {
+		s.closed(err)
 	}
+	h.sessions = nil
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
index 5d78a14..e0f6cb4 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
@@ -182,31 +182,19 @@ func makeLocalLink(sn *session, isSender bool, setting ...LinkOption) (linkSetti
 	return l, nil
 }
 
-type incomingLink struct {
-	incoming
-	linkSettings
-	pLink proton.Link
-	sn    *session
-}
-
-// Set up a link from an incoming proton.Link.
-func makeIncomingLink(sn *session, pLink proton.Link) incomingLink {
-	l := incomingLink{
-		incoming: makeIncoming(pLink),
-		linkSettings: linkSettings{
-			isSender:  pLink.IsSender(),
-			source:    pLink.RemoteSource().Address(),
-			target:    pLink.RemoteTarget().Address(),
-			linkName:  pLink.Name(),
-			sndSettle: SndSettleMode(pLink.RemoteSndSettleMode()),
-			rcvSettle: RcvSettleMode(pLink.RemoteRcvSettleMode()),
-			capacity:  1,
-			prefetch:  false,
-			pLink:     pLink,
-			session:   sn,
-		},
+func makeIncomingLinkSettings(pLink proton.Link, sn *session) linkSettings {
+	return linkSettings{
+		isSender:  pLink.IsSender(),
+		source:    pLink.RemoteSource().Address(),
+		target:    pLink.RemoteTarget().Address(),
+		linkName:  pLink.Name(),
+		sndSettle: SndSettleMode(pLink.RemoteSndSettleMode()),
+		rcvSettle: RcvSettleMode(pLink.RemoteRcvSettleMode()),
+		capacity:  1,
+		prefetch:  false,
+		pLink:     pLink,
+		session:   sn,
 	}
-	return l
 }
 
 // Not part of Link interface but use by Sender and Receiver.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
deleted file mode 100644
index c5c351a..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
+++ /dev/null
@@ -1,454 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package electron
-
-import (
-	"fmt"
-	"net"
-	"path"
-	"qpid.apache.org/amqp"
-	"runtime"
-	"testing"
-	"time"
-)
-
-func fatalIf(t *testing.T, err error) {
-	if err != nil {
-		_, file, line, ok := runtime.Caller(1) // annotate with location of caller.
-		if ok {
-			_, file = path.Split(file)
-		}
-		t.Fatalf("(from %s:%d) %v", file, line, err)
-	}
-}
-
-// Start a server, return listening addr and channel for incoming Connections.
-func newServer(t *testing.T, cont Container) (net.Addr, <-chan Connection) {
-	listener, err := net.Listen("tcp", "")
-	fatalIf(t, err)
-	addr := listener.Addr()
-	ch := make(chan Connection)
-	go func() {
-		conn, err := listener.Accept()
-		c, err := cont.Connection(conn, Server(), AllowIncoming())
-		fatalIf(t, err)
-		ch <- c
-	}()
-	return addr, ch
-}
-
-// Open a client connection and session, return the session.
-func newClient(t *testing.T, cont Container, addr net.Addr) Session {
-	conn, err := net.Dial(addr.Network(), addr.String())
-	fatalIf(t, err)
-	c, err := cont.Connection(conn)
-	fatalIf(t, err)
-	sn, err := c.Session()
-	fatalIf(t, err)
-	return sn
-}
-
-// Return client and server ends of the same connection.
-func newClientServer(t *testing.T) (client Session, server Connection) {
-	addr, ch := newServer(t, NewContainer("test-server"))
-	client = newClient(t, NewContainer("test-client"), addr)
-	return client, <-ch
-}
-
-// Close client and server
-func closeClientServer(client Session, server Connection) {
-	client.Connection().Close(nil)
-	server.Close(nil)
-}
-
-// Send a message one way with a client sender and server receiver, verify ack.
-func TestClientSendServerReceive(t *testing.T) {
-	nLinks := 3
-	nMessages := 3
-
-	rchan := make(chan Receiver, nLinks)
-	client, server := newClientServer(t)
-	go func() {
-		for in := range server.Incoming() {
-			switch in := in.(type) {
-			case *IncomingReceiver:
-				in.SetCapacity(1)
-				in.SetPrefetch(false)
-				rchan <- in.Accept().(Receiver)
-			default:
-				in.Accept()
-			}
-		}
-	}()
-
-	defer func() { closeClientServer(client, server) }()
-
-	s := make([]Sender, nLinks)
-	for i := 0; i < nLinks; i++ {
-		var err error
-		s[i], err = client.Sender(Target(fmt.Sprintf("foo%d", i)))
-		if err != nil {
-			t.Fatal(err)
-		}
-	}
-	r := make([]Receiver, nLinks)
-	for i := 0; i < nLinks; i++ {
-		r[i] = <-rchan
-	}
-
-	for i := 0; i < nLinks; i++ {
-		for j := 0; j < nMessages; j++ {
-			// Client send
-			ack := make(chan Outcome, 1)
-			sendDone := make(chan struct{})
-			go func() {
-				defer close(sendDone)
-				m := amqp.NewMessageWith(fmt.Sprintf("foobar%v-%v", i, j))
-				var err error
-				s[i].SendAsync(m, ack, "testing")
-				if err != nil {
-					t.Fatal(err)
-				}
-			}()
-
-			// Server recieve
-			rm, err := r[i].Receive()
-			if err != nil {
-				t.Fatal(err)
-			}
-			if want, got := interface{}(fmt.Sprintf("foobar%v-%v", i, j)), rm.Message.Body(); want != got {
-				t.Errorf("%#v != %#v", want, got)
-			}
-
-			// Should not be acknowledged on client yet
-			<-sendDone
-			select {
-			case <-ack:
-				t.Errorf("unexpected ack")
-			default:
-			}
-
-			// Server send ack
-			if err := rm.Reject(); err != nil {
-				t.Error(err)
-			}
-			// Client get ack.
-			if a := <-ack; a.Value != "testing" || a.Error != nil || a.Status != Rejected {
-				t.Error("unexpected ack: ", a.Status, a.Error, a.Value)
-			}
-		}
-	}
-}
-
-func TestClientReceiver(t *testing.T) {
-	nMessages := 3
-	client, server := newClientServer(t)
-	go func() {
-		for in := range server.Incoming() {
-			switch in := in.(type) {
-			case *IncomingSender:
-				s := in.Accept().(Sender)
-				go func() {
-					for i := int32(0); i < int32(nMessages); i++ {
-						out := s.SendSync(amqp.NewMessageWith(i))
-						if out.Error != nil {
-							t.Error(out.Error)
-							return
-						}
-					}
-					s.Close(nil)
-				}()
-			default:
-				in.Accept()
-			}
-		}
-	}()
-
-	r, err := client.Receiver(Source("foo"))
-	if err != nil {
-		t.Fatal(err)
-	}
-	for i := int32(0); i < int32(nMessages); i++ {
-		rm, err := r.Receive()
-		if err != nil {
-			if err != Closed {
-				t.Error(err)
-			}
-			break
-		}
-		if err := rm.Accept(); err != nil {
-			t.Error(err)
-		}
-		if b, ok := rm.Message.Body().(int32); !ok || b != i {
-			t.Errorf("want %v, true got %v, %v", i, b, ok)
-		}
-	}
-	server.Close(nil)
-	client.Connection().Close(nil)
-}
-
-// Test timeout versions of waiting functions.
-func TestTimeouts(t *testing.T) {
-	var err error
-	rchan := make(chan Receiver, 1)
-	client, server := newClientServer(t)
-	go func() {
-		for i := range server.Incoming() {
-			switch i := i.(type) {
-			case *IncomingReceiver:
-				i.SetCapacity(1)
-				i.SetPrefetch(false)
-				rchan <- i.Accept().(Receiver) // Issue credit only on receive
-			default:
-				i.Accept()
-			}
-		}
-	}()
-	defer func() { closeClientServer(client, server) }()
-
-	// Open client sender
-	snd, err := client.Sender(Target("test"))
-	if err != nil {
-		t.Fatal(err)
-	}
-	rcv := <-rchan
-
-	// Test send with timeout
-	short := time.Millisecond
-	long := time.Second
-	m := amqp.NewMessage()
-	if err := snd.SendSyncTimeout(m, 0).Error; err != Timeout { // No credit, expect timeout.
-		t.Error("want Timeout got", err)
-	}
-	if err := snd.SendSyncTimeout(m, short).Error; err != Timeout { // No credit, expect timeout.
-		t.Error("want Timeout got", err)
-	}
-	// Test receive with timeout
-	if _, err = rcv.ReceiveTimeout(0); err != Timeout { // No credit, expect timeout.
-		t.Error("want Timeout got", err)
-	}
-	// Test receive with timeout
-	if _, err = rcv.ReceiveTimeout(short); err != Timeout { // No credit, expect timeout.
-		t.Error("want Timeout got", err)
-	}
-	// There is now a credit on the link due to receive
-	ack := make(chan Outcome)
-	snd.SendAsyncTimeout(m, ack, nil, short)
-	// Disposition should timeout
-	select {
-	case <-ack:
-		t.Errorf("want Timeout got %#v", ack)
-	case <-time.After(short):
-	}
-
-	// Receive and accept
-	rm, err := rcv.ReceiveTimeout(long)
-	if err != nil {
-		t.Fatal(err)
-	}
-	rm.Accept()
-	// Sender get ack
-	if a := <-ack; a.Status != Accepted || a.Error != nil {
-		t.Errorf("want (accepted, nil) got %#v", a)
-	}
-}
-
-// A server that returns the opposite end of each client link via channels.
-type pairs struct {
-	t        *testing.T
-	client   Session
-	server   Connection
-	rchan    chan Receiver
-	schan    chan Sender
-	capacity int
-	prefetch bool
-}
-
-func newPairs(t *testing.T, capacity int, prefetch bool) *pairs {
-	p := &pairs{t: t, rchan: make(chan Receiver, 1), schan: make(chan Sender, 1)}
-	p.client, p.server = newClientServer(t)
-	go func() {
-		for i := range p.server.Incoming() {
-			switch i := i.(type) {
-			case *IncomingReceiver:
-				i.SetCapacity(capacity)
-				i.SetPrefetch(prefetch)
-				p.rchan <- i.Accept().(Receiver)
-			case *IncomingSender:
-				p.schan <- i.Accept().(Sender)
-			default:
-				i.Accept()
-			}
-		}
-	}()
-	return p
-}
-
-func (p *pairs) close() {
-	closeClientServer(p.client, p.server)
-}
-
-// Return a client sender and server receiver
-func (p *pairs) senderReceiver() (Sender, Receiver) {
-	snd, err := p.client.Sender()
-	fatalIf(p.t, err)
-	rcv := <-p.rchan
-	return snd, rcv
-}
-
-// Return a client receiver and server sender
-func (p *pairs) receiverSender() (Receiver, Sender) {
-	rcv, err := p.client.Receiver()
-	fatalIf(p.t, err)
-	snd := <-p.schan
-	return rcv, snd
-}
-
-type result struct {
-	label string
-	err   error
-	value interface{}
-}
-
-func (r result) String() string { return fmt.Sprintf("%v(%v)", r.err, r.label) }
-
-func doSend(snd Sender, results chan result) {
-	err := snd.SendSync(amqp.NewMessage()).Error
-	results <- result{"send", err, nil}
-}
-
-func doReceive(rcv Receiver, results chan result) {
-	msg, err := rcv.Receive()
-	results <- result{"receive", err, msg}
-}
-
-func doDisposition(ack <-chan Outcome, results chan result) {
-	results <- result{"disposition", (<-ack).Error, nil}
-}
-
-// Senders get credit immediately if receivers have prefetch set
-func TestSendReceivePrefetch(t *testing.T) {
-	pairs := newPairs(t, 1, true)
-	s, r := pairs.senderReceiver()
-	s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // Should not block for credit.
-	if _, err := r.Receive(); err != nil {
-		t.Error(err)
-	}
-}
-
-// Senders do not get credit till Receive() if receivers don't have prefetch
-func TestSendReceiveNoPrefetch(t *testing.T) {
-	pairs := newPairs(t, 1, false)
-	s, r := pairs.senderReceiver()
-	done := make(chan struct{}, 1)
-	go func() {
-		s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // Should block for credit.
-		close(done)
-	}()
-	select {
-	case <-done:
-		t.Errorf("send should be blocked on credit")
-	default:
-		if _, err := r.Receive(); err != nil {
-			t.Error(err)
-		} else {
-			<-done
-		} // Should be unblocked now
-	}
-}
-
-// Test that closing Links interrupts blocked link functions.
-func TestLinkCloseInterrupt(t *testing.T) {
-	want := amqp.Error{Name: "x", Description: "all bad"}
-	pairs := newPairs(t, 1, false)
-	results := make(chan result) // Collect expected errors
-
-	// Note closing the link does not interrupt Send() calls, the AMQP spec says
-	// that deliveries can be settled after the link is closed.
-
-	// Receiver.Close() interrupts Receive()
-	snd, rcv := pairs.senderReceiver()
-	go doReceive(rcv, results)
-	rcv.Close(want)
-	if r := <-results; want != r.err {
-		t.Errorf("want %#v got %#v", want, r)
-	}
-
-	// Remote Sender.Close() interrupts Receive()
-	snd, rcv = pairs.senderReceiver()
-	go doReceive(rcv, results)
-	snd.Close(want)
-	if r := <-results; want != r.err {
-		t.Errorf("want %#v got %#v", want, r)
-	}
-}
-
-// Test closing the server end of a connection.
-func TestConnectionCloseInterrupt1(t *testing.T) {
-	want := amqp.Error{Name: "x", Description: "bad"}
-	pairs := newPairs(t, 1, true)
-	results := make(chan result) // Collect expected errors
-
-	// Connection.Close() interrupts Send, Receive, Disposition.
-	snd, rcv := pairs.senderReceiver()
-	go doSend(snd, results)
-
-	rcv.Receive()
-	rcv, snd = pairs.receiverSender()
-	go doReceive(rcv, results)
-
-	snd, rcv = pairs.senderReceiver()
-	ack := snd.SendWaitable(amqp.NewMessage())
-	rcv.Receive()
-	go doDisposition(ack, results)
-
-	pairs.server.Close(want)
-	for i := 0; i < 3; i++ {
-		if r := <-results; want != r.err {
-			t.Errorf("want %v got %v", want, r)
-		}
-	}
-}
-
-// Test closing the client end of the connection.
-func TestConnectionCloseInterrupt2(t *testing.T) {
-	want := amqp.Error{Name: "x", Description: "bad"}
-	pairs := newPairs(t, 1, true)
-	results := make(chan result) // Collect expected errors
-
-	// Connection.Close() interrupts Send, Receive, Disposition.
-	snd, rcv := pairs.senderReceiver()
-	go doSend(snd, results)
-	rcv.Receive()
-
-	rcv, snd = pairs.receiverSender()
-	go doReceive(rcv, results)
-
-	snd, rcv = pairs.senderReceiver()
-	ack := snd.SendWaitable(amqp.NewMessage())
-	go doDisposition(ack, results)
-
-	pairs.client.Connection().Close(want)
-	for i := 0; i < 3; i++ {
-		if r := <-results; want != r.err {
-			t.Errorf("want %v got %v", want, r.err)
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
index 606e4d6..fb234e2 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
@@ -213,7 +213,15 @@ func (rm *ReceivedMessage) Release() error { return rm.acknowledge(proton.Releas
 // IncomingReceiver is sent on the Connection.Incoming() channel when there is
 // an incoming request to open a receiver link.
 type IncomingReceiver struct {
-	incomingLink
+	incoming
+	linkSettings
+}
+
+func newIncomingReceiver(sn *session, pLink proton.Link) *IncomingReceiver {
+	return &IncomingReceiver{
+		incoming:     makeIncoming(pLink),
+		linkSettings: makeIncomingLinkSettings(pLink, sn),
+	}
 }
 
 // SetCapacity sets the capacity of the incoming receiver, call before Accept()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
index 1d0fc60..8badf35 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
@@ -259,7 +259,15 @@ type sentMessage struct {
 // IncomingSender is sent on the Connection.Incoming() channel when there is
 // an incoming request to open a sender link.
 type IncomingSender struct {
-	incomingLink
+	incoming
+	linkSettings
+}
+
+func newIncomingSender(sn *session, pLink proton.Link) *IncomingSender {
+	return &IncomingSender{
+		incoming:     makeIncoming(pLink),
+		linkSettings: makeIncomingLinkSettings(pLink, sn),
+	}
 }
 
 // Accept accepts an incoming sender endpoint

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
index bfcb41c..d9dcefd 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
@@ -127,9 +127,6 @@ func NewEngine(conn net.Conn, handlers ...EventHandler) (*Engine, error) {
 	if pnErr := C.pn_connection_engine_init(&eng.engine); pnErr != 0 {
 		return nil, fmt.Errorf("cannot setup engine: %s", PnErrorCode(pnErr))
 	}
-	// Unique container-id by default.
-	eng.Connection().SetContainer(UUID4().String()) // FIXME aconway 2016-06-21:
-	eng.Connection().Open()
 	return eng, nil
 }
 
@@ -143,12 +140,6 @@ func byteSlice(data unsafe.Pointer, size C.size_t) []byte {
 	}
 }
 
-func (eng *Engine) buffers() ([]byte, []byte) {
-	r := C.pn_connection_engine_read_buffer(&eng.engine)
-	w := C.pn_connection_engine_write_buffer(&eng.engine)
-	return byteSlice(unsafe.Pointer(r.start), r.size), byteSlice(unsafe.Pointer(w.start), w.size)
-}
-
 func (eng *Engine) Connection() Connection {
 	return Connection{C.pn_connection_engine_connection(&eng.engine)}
 }
@@ -158,11 +149,12 @@ func (eng *Engine) Transport() Transport {
 }
 
 func (eng *Engine) String() string {
-	return fmt.Sprintf("%s-%s", eng.conn.LocalAddr(), eng.conn.RemoteAddr())
+	return fmt.Sprintf("[%s]%s-%s", eng.Id(), eng.conn.LocalAddr(), eng.conn.RemoteAddr())
 }
 
 func (eng *Engine) Id() string {
-	return fmt.Sprintf("%p", eng)
+	// Use transport address to match default PN_TRACE_FRM=1 output.
+	return fmt.Sprintf("%p", eng.Transport().CPtr())
 }
 
 func (eng *Engine) Error() error {
@@ -213,8 +205,7 @@ func (eng *Engine) InjectWait(f func() error) error {
 //
 func (eng *Engine) Server() { eng.Transport().SetServer() }
 
-// FIXME aconway 2016-06-21: rename
-func (eng *Engine) disconnect() { // FIXME aconway 2016-06-21: disconnected
+func (eng *Engine) disconnect() {
 	eng.conn.Close()
 	C.pn_connection_engine_disconnected(&eng.engine)
 }
@@ -244,12 +235,35 @@ func (eng *Engine) Disconnect(err error) {
 	<-eng.running
 }
 
+func (eng *Engine) dispatch() bool {
+	for {
+		if cevent := C.pn_connection_engine_dispatch(&eng.engine); cevent != nil {
+			event := makeEvent(cevent, eng)
+			for _, h := range eng.handlers {
+				h.HandleEvent(event)
+			}
+		} else {
+			break
+		}
+	}
+	return !bool(C.pn_connection_engine_finished(&eng.engine))
+}
+
+func (eng *Engine) writeBuffer() []byte {
+	w := C.pn_connection_engine_write_buffer(&eng.engine)
+	return byteSlice(unsafe.Pointer(w.start), w.size)
+}
+
+func (eng *Engine) readBuffer() []byte {
+	r := C.pn_connection_engine_read_buffer(&eng.engine)
+	return byteSlice(unsafe.Pointer(r.start), r.size)
+}
+
 // Run the engine. Engine.Run() will exit when the engine is closed or
 // disconnected.  You can check for errors after exit with Engine.Error().
 //
 func (eng *Engine) Run() error {
 	C.pn_connection_engine_start(&eng.engine)
-
 	// Channels for read and write buffers going in and out of the read/write goroutines.
 	// The channels are unbuffered: we want to exchange buffers in seuquence.
 	readsIn, writesIn := make(chan []byte), make(chan []byte)
@@ -298,9 +312,18 @@ func (eng *Engine) Run() error {
 		}
 	}()
 
-	for !C.pn_connection_engine_finished(&eng.engine) {
-		// Enable readIn/writeIn channles only if we have a buffer.
-		readBuf, writeBuf := eng.buffers()
+	for eng.dispatch() {
+		readBuf := eng.readBuffer()
+		writeBuf := eng.writeBuffer()
+		// Note that getting the buffers can generate events (eg. SASL events) that
+		// might close the transport. Check if we are already finished before
+		// blocking for IO.
+		if !eng.dispatch() {
+			break
+		}
+
+		// sendReads/sendWrites are nil (not sendable in select) unless we have a
+		// buffer to read/write
 		var sendReads, sendWrites chan []byte
 		if readBuf != nil {
 			sendReads = readsIn
@@ -311,6 +334,7 @@ func (eng *Engine) Run() error {
 
 		// Send buffers to the read/write goroutines if we have them.
 		// Get buffers from the read/write goroutines and process them
+		// Check for injected functions
 		select {
 
 		case sendReads <- readBuf:
@@ -318,36 +342,18 @@ func (eng *Engine) Run() error {
 		case sendWrites <- writeBuf:
 
 		case buf := <-readsOut:
-			if len(buf) > 0 {
-				C.pn_connection_engine_read_done(&eng.engine, C.size_t(len(buf)))
-			} else {
-				panic(fmt.Sprintf("read buf %v", buf))
-			}
+			C.pn_connection_engine_read_done(&eng.engine, C.size_t(len(buf)))
 
 		case buf := <-writesOut:
-			if len(buf) > 0 {
-				C.pn_connection_engine_write_done(&eng.engine, C.size_t(len(buf)))
-			} else {
-				panic(fmt.Sprintf("write buf %v", buf))
-			}
+			C.pn_connection_engine_write_done(&eng.engine, C.size_t(len(buf)))
 
 		case f, ok := <-eng.inject: // Function injected from another goroutine
 			if ok {
 				f()
 			}
 		}
-
-		for {
-			cevent := C.pn_connection_engine_dispatch(&eng.engine)
-			if cevent == nil {
-				break
-			}
-			event := makeEvent(cevent, eng)
-			for _, h := range eng.handlers {
-				h.HandleEvent(event)
-			}
-		}
 	}
+
 	eng.err.Set(EndpointError(eng.Connection()))
 	eng.err.Set(eng.Transport().Condition().Error())
 	close(readsIn)
@@ -365,6 +371,5 @@ func (eng *Engine) Run() error {
 			C.pn_handler_free(h.pn)
 		}
 	}
-	// FIXME aconway 2016-06-21: consistent error handling
 	return eng.err.Get()
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go
index 2a96d81..0fd652c 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go
@@ -206,11 +206,11 @@ func (d endpointDelegator) HandleEvent(e Event) {
 		}
 
 	case d.remoteOpen:
+		d.delegator.mhandler.HandleMessagingEvent(d.opening, e)
 		switch {
 		case state.LocalActive():
 			d.delegator.mhandler.HandleMessagingEvent(d.opened, e)
 		case state.LocalUninit():
-			d.delegator.mhandler.HandleMessagingEvent(d.opening, e)
 			if d.delegator.AutoOpen {
 				endpoint.Open()
 			}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
index 70611d3..3303f0a 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
@@ -29,6 +29,7 @@ package proton
 //#include <proton/link.h>
 //#include <proton/link.h>
 //#include <proton/object.h>
+//#include <proton/sasl.h>
 //#include <proton/session.h>
 //#include <proton/transport.h>
 //#include <stdlib.h>
@@ -292,7 +293,8 @@ func (s Session) Receiver(name string) Link {
 
 // Unique (per process) string identifier for a connection, useful for debugging.
 func (c Connection) String() string {
-	return fmt.Sprintf("%x", c.pn)
+	// Use the transport address to match the default transport logs from PN_TRACE.
+	return fmt.Sprintf("%p", c.Transport().CPtr())
 }
 
 func (c Connection) Type() string {
@@ -323,6 +325,18 @@ func (c Connection) Sessions(state State) (sessions []Session) {
 	return
 }
 
+// SetPassword takes []byte not string because it is impossible to erase a string
+// from memory reliably. Proton will not keep the password in memory longer than
+// needed, the caller should overwrite their copy on return.
+//
+// The password must not contain embedded nul characters, a trailing nul is ignored.
+func (c Connection) SetPassword(password []byte) {
+	if len(password) == 0 || password[len(password)-1] != 0 {
+		password = append(password, 0) // Proton requires a terminating null.
+	}
+	C.pn_connection_set_password(c.pn, (*C.char)(unsafe.Pointer(&password[0])))
+}
+
 func (s Session) String() string {
 	return fmt.Sprintf("%s/%p", s.Connection(), s.pn)
 }
@@ -384,3 +398,8 @@ func (t Transport) Head() unsafe.Pointer {
 func (t Transport) Push(bytes []byte) int {
 	return int(C.pn_transport_push(t.pn, (*C.char)(unsafe.Pointer(&bytes[0])), C.size_t(len(bytes))))
 }
+
+// Get the SASL object for the transport.
+func (t Transport) SASL() SASL {
+	return SASL{C.pn_sasl(t.pn)}
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers_gen.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers_gen.go b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers_gen.go
index 183d6ec..38c76cc 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers_gen.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers_gen.go
@@ -42,6 +42,7 @@ import (
 // #include <proton/terminus.h>
 // #include <proton/connection.h>
 // #include <proton/transport.h>
+// #include <proton/sasl.h>
 import "C"
 
 type EventType int
@@ -704,12 +705,6 @@ func (c Connection) SetUser(user string) {
 
 	C.pn_connection_set_user(c.pn, userC)
 }
-func (c Connection) SetPassword(password string) {
-	passwordC := C.CString(password)
-	defer C.free(unsafe.Pointer(passwordC))
-
-	C.pn_connection_set_password(c.pn, passwordC)
-}
 func (c Connection) User() string {
 	return C.GoString(C.pn_connection_get_user(c.pn))
 }
@@ -873,3 +868,76 @@ func (t Transport) Tick(now time.Time) time.Time {
 func (t Transport) Connection() Connection {
 	return Connection{C.pn_transport_connection(t.pn)}
 }
+
+// Wrappers for declarations in sasl.h
+
+type SASLOutcome C.pn_sasl_outcome_t
+
+const (
+	SASLNone SASLOutcome = C.PN_SASL_NONE
+	SASLOk   SASLOutcome = C.PN_SASL_OK
+	SASLAuth SASLOutcome = C.PN_SASL_AUTH
+	SASLSys  SASLOutcome = C.PN_SASL_SYS
+	SASLPerm SASLOutcome = C.PN_SASL_PERM
+	SASLTemp SASLOutcome = C.PN_SASL_TEMP
+)
+
+func (e SASLOutcome) String() string {
+	switch e {
+
+	case C.PN_SASL_NONE:
+		return "SASLNone"
+	case C.PN_SASL_OK:
+		return "SASLOk"
+	case C.PN_SASL_AUTH:
+		return "SASLAuth"
+	case C.PN_SASL_SYS:
+		return "SASLSys"
+	case C.PN_SASL_PERM:
+		return "SASLPerm"
+	case C.PN_SASL_TEMP:
+		return "SASLTemp"
+	}
+	return "unknown"
+}
+
+type SASL struct{ pn *C.pn_sasl_t }
+
+func (s SASL) IsNil() bool          { return s.pn == nil }
+func (s SASL) CPtr() unsafe.Pointer { return unsafe.Pointer(s.pn) }
+func (s SASL) Done(outcome SASLOutcome) {
+	C.pn_sasl_done(s.pn, C.pn_sasl_outcome_t(outcome))
+}
+func (s SASL) Outcome() SASLOutcome {
+	return SASLOutcome(C.pn_sasl_outcome(s.pn))
+}
+func (s SASL) User() string {
+	return C.GoString(C.pn_sasl_get_user(s.pn))
+}
+func (s SASL) Mech() string {
+	return C.GoString(C.pn_sasl_get_mech(s.pn))
+}
+func (s SASL) AllowedMechs(mechs string) {
+	mechsC := C.CString(mechs)
+	defer C.free(unsafe.Pointer(mechsC))
+
+	C.pn_sasl_allowed_mechs(s.pn, mechsC)
+}
+func (s SASL) SetAllowInsecureMechs(insecure bool) {
+	C.pn_sasl_set_allow_insecure_mechs(s.pn, C.bool(insecure))
+}
+func (s SASL) AllowInsecureMechs() bool {
+	return bool(C.pn_sasl_get_allow_insecure_mechs(s.pn))
+}
+func (s SASL) ConfigName(name string) {
+	nameC := C.CString(name)
+	defer C.free(unsafe.Pointer(nameC))
+
+	C.pn_sasl_config_name(s.pn, nameC)
+}
+func (s SASL) ConfigPath(path string) {
+	pathC := C.CString(path)
+	defer C.free(unsafe.Pointer(pathC))
+
+	C.pn_sasl_config_path(s.pn, pathC)
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0e37353d/proton-c/src/engine/engine.c
----------------------------------------------------------------------
diff --git a/proton-c/src/engine/engine.c b/proton-c/src/engine/engine.c
index ca7e4d8..cb1f479 100644
--- a/proton-c/src/engine/engine.c
+++ b/proton-c/src/engine/engine.c
@@ -594,6 +594,10 @@ void pn_connection_set_user(pn_connection_t *connection, const char *user)
 void pn_connection_set_password(pn_connection_t *connection, const char *password)
 {
     assert(connection);
+    // Make sure the previous password is erased, if there was one.
+    size_t n = pn_string_size(connection->auth_password);
+    const char* s = pn_string_get(connection->auth_password);
+    if (n > 0 && s) memset((void*)s, 0, n);
     pn_string_set(connection->auth_password, password);
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[6/6] qpid-proton git commit: PROTON-1306: Go container improvements and client-server example.

Posted by ac...@apache.org.
PROTON-1306: Go container improvements and client-server example.

Added convenience ops to Container: Dial and Accept.
Added client-server documentation example.
Added Container.String(), improve other proton String() funcs.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/6134e216
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/6134e216
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/6134e216

Branch: refs/heads/master
Commit: 6134e216caf952aa031a49ab09d3e59ba71b1965
Parents: 77b907b
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Sep 9 12:37:01 2016 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue Sep 20 17:43:35 2016 -0400

----------------------------------------------------------------------
 examples/go/README.md                           | 104 +++++++++++--------
 examples/go/electron/broker.go                  |   9 +-
 examples/go/electron/receive.go                 |   8 +-
 examples/go/electron/send.go                    |   9 +-
 .../bindings/go/src/qpid.apache.org/README.md   |   5 +-
 .../go/src/qpid.apache.org/amqp/message.go      |   2 +
 .../src/qpid.apache.org/electron/connection.go  |  35 ++++++-
 .../src/qpid.apache.org/electron/container.go   |  52 +++++++---
 .../go/src/qpid.apache.org/electron/doc.go      |  10 ++
 .../electron/ex_client_server_test.go           |  81 +++++++++++++++
 .../go/src/qpid.apache.org/electron/sender.go   |  10 +-
 .../go/src/qpid.apache.org/proton/wrappers.go   |   4 +-
 12 files changed, 237 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6134e216/examples/go/README.md
----------------------------------------------------------------------
diff --git a/examples/go/README.md b/examples/go/README.md
index 9ba497b..24f4d2a 100644
--- a/examples/go/README.md
+++ b/examples/go/README.md
@@ -1,36 +1,34 @@
-# Go examples for proton
+# Go examples
 
-There are 3 Go packages for proton:
+## Electron examples
 
-- qpid.apache.org/electron: Concurrent, procedural API for messaging clients and servers.
-- qpid.apache.org/proton: Direct access to the event-driven, concurrent-unsafe proton library.
-- qpid.apache.org/amqp: Convert AMQP messages and data to and from Go data types.
+[qpid.apache.org/electron](http://godoc.org/qpid.apache.org/electron) is a
+simple API for writing concurrent AMQP clients and servers.
 
-`proton` and `electron` are alternative APIs for sending messages. `proton` is a
-direct wrapping of the concurrent-unsafe, event-driven C proton API. `electron`
-is a procedural, concurrent-safe interface that may be more convenient and
-familiar for Go programmers. The examples `proton/broker.go` and
-`electron/broker.go` give an illustration of how the APIs differ.
-
-## Example programs
-
-electron
 - [receive.go](electron/receive.go) receive from many connections concurrently.
 - [send.go](electron/send.go) send to many connections concurrently.
 - [broker.go](electron/broker.go) a simple broker using the electron API
+n
+## Proton examples
+
+[qpid.apache.org/proton](http://godoc.org/qpid.apache.org/proton) is an
+event-driven, concurrent-unsafe Go wrapper for the proton-C library. The
+[electron](http://godoc.org/qpid.apache.org/electron) package provides a more
+Go-friendly concurrent API built on top of proton.
 
-proton
 - [broker.go](proton/broker.go) a simple broker using the proton API
 
+See [A Tale of Two Brokers](#a-tale-of-two-brokers) for a comparison of the two APIs.
+
 ## Using the Go packages
 
-If you have the proton C library and headers installed you can get the latest go
+If you have the proton-C library and headers installed you can get the latest go
 packages with
 
     go get qpid.apache.org/electron
 
-If proton is installed in a non-standard place (other than /usr or /usr/local) you
-can set these environment variables before `go get`, for example:
+If Proton-C is installed in a non-standard place (other than /usr or /usr/local)
+you should set these environment variables before `go get`:
 
     export CGO_LDFLAGS="-L/<my-proton>/lib[64]"
     export CGO_CFLAGS="-I/<my-proton>/include"
@@ -77,43 +75,57 @@ Receive messages concurrently from "foo" and "bar". Note -count 20 for 10 messag
 The broker and clients use the standard AMQP port (5672) on the local host by
 default, to use a different address use the `-addr host:port` flag.
 
-If you have the full proton repository checked out you can try try using the
-python broker with Go clients:
+If you have other Proton examples available you can try communicating between
+programs in in different languages. For example use the python broker with Go
+clients:
 
     python ../python/broker.py
+    go run send.go -count 10 localhost:/foo localhost:/bar
 
 Or use the Go broker and the python clients:
 
+    go run broker.go -debug
     python ../python/simple_send.py
     python ../python/simple_recv.py
 
 
 ## A tale of two brokers.
 
-The `proton` and `electron` packages provide two alternate APIs for AMQP applications.
-See [the proton Go README](https://github.com/apache/qpid-proton/blob/master/proton-c/bindings/go/src/qpid.apache.org/README.md) for a discussion
-of why there are two APIs.
-
-The examples `proton/broker.go` and `electron/broker.go` both implement the same
-simple broker-like functionality using each of the two APIs. They both handle
-multiple connections concurrently and store messages on bounded queues
-implemented by Go channels.
-
-However the `electron/broker` is less than half as long as the `proton/broker`
-illustrating why it is better suited for most Go applications.
-
-`proton/broker` must explicitly handle proton events, which are processed in a
-single goroutine per connection since proton is not concurrent safe. Each
-connection uses channels to exchange messages between the event-handling
-goroutine and the shared queues that are accessible to all connections. Sending
-messages is particularly tricky since we must monitor the queue for available
-messages and the sending link for available credit in order to send messages.
-
-
-`electron/broker` takes advantage of the `electron` package, which hides all the
-event handling and passing of messages between goroutines beind behind
-straightforward interfaces for sending and receiving messages. The electron
-broker can implement links as simple goroutines that loop popping messages from
-a queue and sending them or receiving messages and pushing them to a queue.
-
+The [proton](http://godoc.org/qpid.apache.org/proton) and
+[electron](http://godoc.org/qpid.apache.org/electron) packages provide two
+different APIs for building AMQP applications. For most applications,
+[electron](http://godoc.org/qpid.apache.org/electron) is easier to use.
+[The proton Go README](https://github.com/apache/qpid-proton/blob/master/proton-c/bindings/go/src/qpid.apache.org/README.md)
+has some discussion about why there are two APIs.
+
+The examples [proton/broker.go](proton/broker.go) and
+[electron/broker.go](electron/broker.go) implement the same simple broker
+functionality using each of the two APIs. They both handle multiple connections
+concurrently and store messages on bounded queues implemented by Go channels.
+
+However the [electron/broker.go](electron/broker.go) is less than half as long as the
+[proton/broker.go](proton/broker.go) illustrating why it is better suited for most Go
+applications.
+
+[proton/broker.go](proton/broker.go) implements an event-driven loop per connection that reacts
+to events like 'incoming link', 'incoming message' and 'sender has credit'.  It
+uses channels to exchange data between the event-loop goroutine for each
+connection and shared queues that are accessible to all connections. Sending
+messages is particularly tricky, the broker must monitor the queue for available
+messages and the sender link for available credit.
+
+
+[electron/broker.go](electron/broker.go) does not need any "upside-down"
+event-driven code, it is implemented as straightforward loops. The broker is a
+loop listening for connections. Each connection is a loop accepting for incoming
+sender or recdiver links. Each receiving link is a loop that receives a message
+and pushes it to a queue.  Each sending link is a loop that pops a message from
+a queue and sends it.
+
+Queue bounds and credit manage themselves: popping from a queue blocks till
+there is a message, sending blocks until there is credit, receiving blocks till
+something is received and pushing onto a queue blocks until there is
+space. There's no need for code that monitors the state of multiple queues and
+links. Each loop has one simple job to do, and the Go run-time schedules them
+efficiently.
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6134e216/examples/go/electron/broker.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/broker.go b/examples/go/electron/broker.go
index d7aedcc..d698838 100644
--- a/examples/go/electron/broker.go
+++ b/examples/go/electron/broker.go
@@ -55,7 +55,7 @@ func main() {
 	flag.Parse()
 	b := &broker{
 		queues:    util.MakeQueues(*qsize),
-		container: electron.NewContainer(""),
+		container: electron.NewContainer(fmt.Sprintf("broker[%s]", os.Getpid())),
 		acks:      make(chan electron.Outcome),
 		sent:      make(chan sentMessage),
 	}
@@ -92,16 +92,11 @@ func (b *broker) run() error {
 
 	// Start a goroutine for each new connections
 	for {
-		conn, err := listener.Accept()
+		c, err := b.container.Accept(listener)
 		if err != nil {
 			util.Debugf("Accept error: %v", err)
 			continue
 		}
-		c, err := b.container.Connection(conn, electron.Server(), electron.AllowIncoming())
-		if err != nil {
-			util.Debugf("Connection error: %v", err)
-			continue
-		}
 		cc := &connection{b, c}
 		go cc.run() // Handle the connection
 		util.Debugf("Accepted %v", c)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6134e216/examples/go/electron/receive.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/receive.go b/examples/go/electron/receive.go
index 94ee509..7a505d8 100644
--- a/examples/go/electron/receive.go
+++ b/examples/go/electron/receive.go
@@ -24,9 +24,7 @@ import (
 	"flag"
 	"fmt"
 	"log"
-	"net"
 	"os"
-	"path"
 	"qpid.apache.org/amqp"
 	"qpid.apache.org/electron"
 	"sync"
@@ -59,8 +57,7 @@ func main() {
 	var wait sync.WaitGroup // Used by main() to wait for all goroutines to end.
 	wait.Add(len(urls))     // Wait for one goroutine per URL.
 
-	_, prog := path.Split(os.Args[0])
-	container := electron.NewContainer(fmt.Sprintf("%v:%v", prog, os.Getpid()))
+	container := electron.NewContainer(fmt.Sprintf("receive[%s]", os.Getpid()))
 	connections := make(chan electron.Connection, len(urls)) // Connections to close on exit
 
 	// Start a goroutine to for each URL to receive messages and send them to the messages channel.
@@ -74,9 +71,8 @@ func main() {
 			util.ExitIf(err)
 
 			// Open a new connection
-			conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port"
+			c, err := container.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port"
 			util.ExitIf(err)
-			c, err := container.Connection(conn)
 			connections <- c // Save connection so we can Close() when main() ends
 
 			// Create a Receiver using the path of the URL as the source address

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6134e216/examples/go/electron/send.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/send.go b/examples/go/electron/send.go
index 04ec2ef..4ea93ec 100644
--- a/examples/go/electron/send.go
+++ b/examples/go/electron/send.go
@@ -24,9 +24,7 @@ import (
 	"flag"
 	"fmt"
 	"log"
-	"net"
 	"os"
-	"path"
 	"qpid.apache.org/amqp"
 	"qpid.apache.org/electron"
 	"sync"
@@ -58,8 +56,7 @@ func main() {
 	var wait sync.WaitGroup
 	wait.Add(len(urls)) // Wait for one goroutine per URL.
 
-	_, prog := path.Split(os.Args[0])
-	container := electron.NewContainer(fmt.Sprintf("%v:%v", prog, os.Getpid()))
+	container := electron.NewContainer(fmt.Sprintf("send[%s]", os.Getpid()))
 	connections := make(chan electron.Connection, len(urls)) // Connctions to close on exit
 
 	// Start a goroutine for each URL to send messages.
@@ -72,9 +69,7 @@ func main() {
 			util.ExitIf(err)
 
 			// Open a new connection
-			conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port"
-			util.ExitIf(err)
-			c, err := container.Connection(conn)
+			c, err := container.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port"
 			util.ExitIf(err)
 			connections <- c // Save connection so we can Close() when main() ends
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6134e216/proton-c/bindings/go/src/qpid.apache.org/README.md
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/README.md b/proton-c/bindings/go/src/qpid.apache.org/README.md
index 4b2da12..ffd67f8 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/README.md
+++ b/proton-c/bindings/go/src/qpid.apache.org/README.md
@@ -16,9 +16,10 @@ with goroutines and channels to build concurrent AMQP clients and servers.
 
 [qpid.apache.org/proton](http://godoc.org/qpid.apache.org/proton) is an
 event-driven, concurrent-unsafe package that closely follows the proton C
-API. Most Go programmers will find the electron package easier to use.
+API. Most Go programmers will find the
+[electron](http://godoc.org/qpid.apache.org/electron) package easier to use.
 
-There are [examples](https://github.com/apache/qpid-proton/blob/master/examples/go/README.md)
+See the [examples](https://github.com/apache/qpid-proton/blob/master/examples/go/README.md)
 to help you get started.
 
 Feedback is encouraged at:

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6134e216/proton-c/bindings/go/src/qpid.apache.org/amqp/message.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/message.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/message.go
index 1d1287f..4ae36f4 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/amqp/message.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/message.go
@@ -344,3 +344,5 @@ func (m *message) Encode(buffer []byte) ([]byte, error) {
 }
 
 // TODO aconway 2015-09-14: Multi-section messages.
+
+// TODO aconway 2016-09-09: Message.String() use inspect.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6134e216/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
index d0733f2..00c08ad 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
@@ -141,6 +141,12 @@ func AllowIncoming() ConnectionOption {
 	return func(c *connection) { c.incoming = make(chan Incoming) }
 }
 
+// Parent returns a ConnectionOption that associates the Connection with it's Container
+// If not set a connection will create its own default container.
+func Parent(cont Container) ConnectionOption {
+	return func(c *connection) { c.container = cont.(*container) }
+}
+
 type connection struct {
 	endpoint
 	connectionSettings
@@ -158,10 +164,10 @@ type connection struct {
 	defaultSession Session
 }
 
-func newConnection(conn net.Conn, cont *container, setting ...ConnectionOption) (*connection, error) {
+// NewConnection creates a connection with the given options.
+func NewConnection(conn net.Conn, setting ...ConnectionOption) (*connection, error) {
 	c := &connection{
-		container: cont,
-		conn:      conn,
+		conn: conn,
 	}
 	c.handler = newHandler(c)
 	var err error
@@ -170,10 +176,13 @@ func newConnection(conn net.Conn, cont *container, setting ...ConnectionOption)
 		return nil, err
 	}
 	c.pConnection = c.engine.Connection()
-	c.pConnection.SetContainer(cont.Id())
 	for _, set := range setting {
 		set(c)
 	}
+	if c.container == nil {
+		c.container = NewContainer("").(*container)
+	}
+	c.pConnection.SetContainer(c.container.Id())
 	globalSASLInit(c.engine)
 
 	c.endpoint.init(c.engine.String())
@@ -351,3 +360,21 @@ func globalSASLInit(eng *proton.Engine) {
 		sasl.ConfigPath(globalSASLConfigDir)
 	}
 }
+
+// Dial is shorthand for using net.Dial() then NewConnection()
+func Dial(network, addr string, opts ...ConnectionOption) (c Connection, err error) {
+	conn, err := net.Dial(network, addr)
+	if err == nil {
+		c, err = NewConnection(conn, opts...)
+	}
+	return
+}
+
+// DialWithDialer is shorthand for using dialer.Dial() then NewConnection()
+func DialWithDialer(dialer *net.Dialer, network, addr string, opts ...ConnectionOption) (c Connection, err error) {
+	conn, err := dialer.Dial(network, addr)
+	if err == nil {
+		c, err = NewConnection(conn, opts...)
+	}
+	return
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6134e216/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/container.go b/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
index 1ab4df2..4cf5969 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
@@ -26,8 +26,11 @@ import (
 	"sync/atomic"
 )
 
-// Container is an AMQP container, it represents a single AMQP "application".It
-// provides functions to create new Connections to remote containers.
+// Container is an AMQP container, it represents a single AMQP "application"
+// which can have multiple client or server connections.
+//
+// Each Container in a distributed AMQP application must have a unique
+// container-id which is applied to its connections.
 //
 // Create with NewContainer()
 //
@@ -35,18 +38,19 @@ type Container interface {
 	// Id is a unique identifier for the container in your distributed application.
 	Id() string
 
-	// Enable AMQP over the supplied net.Conn. Returns a Connection endpoint.
-	//
-	// For client connections (e.g. established with net.Dial()), you can start
-	// using the connection immediately. Connection.Incoming() is disabled by
-	// default for clients, pass an AllowIncoming() option to enable incoming
-	// sessions and links.
-	//
-	// For server connection (e.g. established with net.Listener.Accept()) you
-	// must pass the Server() option and receive from the Connection.Incoming()
-	// channel. The first Incoming value will be an *IncomingConnection that lets
-	// you examine the connection properties before Accept() or Reject()
-	Connection(net.Conn, ...ConnectionOption) (Connection, error)
+	// Connection creates a connection associated with this container.
+	Connection(conn net.Conn, opts ...ConnectionOption) (Connection, error)
+
+	// Dial is shorthand for
+	//     conn, err := net.Dial(); c, err := Connection(conn, opts...)
+	Dial(network string, addr string, opts ...ConnectionOption) (Connection, error)
+
+	// Accept is shorthand for:
+	//     conn, err := l.Accept(); c, err := Connection(conn, append(opts, Server()...)
+	Accept(l net.Listener, opts ...ConnectionOption) (Connection, error)
+
+	// String returns Id()
+	String() string
 }
 
 type container struct {
@@ -73,10 +77,26 @@ func NewContainer(id string) Container {
 
 func (cont *container) Id() string { return cont.id }
 
+func (cont *container) String() string { return cont.Id() }
+
 func (cont *container) nextLinkName() string {
 	return cont.id + "@" + cont.nextTag()
 }
 
-func (cont *container) Connection(conn net.Conn, setting ...ConnectionOption) (Connection, error) {
-	return newConnection(conn, cont, setting...)
+func (cont *container) Connection(conn net.Conn, opts ...ConnectionOption) (Connection, error) {
+	return NewConnection(conn, append(opts, Parent(cont))...)
+}
+
+func (cont *container) Dial(network, address string, opts ...ConnectionOption) (c Connection, err error) {
+	if conn, err := net.Dial(network, address); err == nil {
+		c, err = cont.Connection(conn, opts...)
+	}
+	return
+}
+
+func (cont *container) Accept(l net.Listener, opts ...ConnectionOption) (c Connection, err error) {
+	if conn, err := l.Accept(); err == nil {
+		c, err = cont.Connection(conn, append(opts, Server())...)
+	}
+	return
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6134e216/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go b/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
index 207d8ba..436e5df 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
@@ -36,6 +36,16 @@ links to Send() and Receive() messages. Connection.Incoming() lets you accept
 incoming links opened by the remote peer. You can open and accept multiple links
 in both directions on a single Connection.
 
+Some of the documentation examples show client and server side by side in a
+single program, in separate goroutines. This is only for example purposes, real
+AMQP applications would run in separate processes on the network.
+More realistic examples: https://github.com/apache/qpid-proton/blob/master/examples/go/README.md
+
+Some of the documentation examples show client and server side by side in a
+single program, in separate goroutines. This is only for example purposes, real
+AMQP applications would run in separate processes on the network.
+More realistic examples: https://github.com/apache/qpid-proton/blob/master/examples/go/README.md
+
 */
 package electron
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6134e216/proton-c/bindings/go/src/qpid.apache.org/electron/ex_client_server_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/ex_client_server_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/ex_client_server_test.go
new file mode 100644
index 0000000..93f275b
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/ex_client_server_test.go
@@ -0,0 +1,81 @@
+package electron_test
+
+import (
+	"fmt"
+	"net"
+	"qpid.apache.org/amqp"
+	"qpid.apache.org/electron"
+)
+
+//  Print errors
+func check(msg string, err error) bool {
+	if err != nil {
+		fmt.Printf("%s: %s\n", msg, err)
+	}
+	return err == nil
+}
+
+func runServer(cont electron.Container, l net.Listener) {
+	for c, err := cont.Accept(l); check("accept connection", err); c, err = cont.Accept(l) {
+		go func() { // Process connections concurrently, accepting AMQP endpoints
+			for in := range c.Incoming() {
+				ep := in.Accept() // Accept all endpoints
+				go func() {       // Process endpoints concurrently
+					switch ep := ep.(type) {
+					case electron.Sender:
+						m := amqp.NewMessageWith("hello yourself")
+						fmt.Printf("server %q sending %q\n", ep.Source(), m.Body())
+						ep.SendForget(m) // One-way send, client does not need to Accept.
+					case electron.Receiver:
+						if rm, err := ep.Receive(); check("server receive", err) {
+							fmt.Printf("server %q received %q\n", ep.Target(), rm.Message.Body())
+							err := rm.Accept() // Client is waiting for Accept.
+							check("accept message", err)
+						}
+					}
+				}()
+			}
+		}()
+	}
+}
+
+func startServer() (addr net.Addr) {
+	cont := electron.NewContainer("server")
+	if l, err := net.Listen("tcp", ""); check("listen", err) {
+		addr = l.Addr()
+		go runServer(cont, l)
+	}
+	return addr
+}
+
+// Connect to addr and send/receive a message.
+func client(addr net.Addr) {
+	if c, err := electron.Dial(addr.Network(), addr.String()); check("dial", err) {
+		defer c.Close(nil)
+		if s, err := c.Sender(electron.Target("target")); check("sender", err) {
+			fmt.Printf("client sending\n")
+			s.SendSync(amqp.NewMessageWith("hello")) // Send and wait for server to Accept()
+		}
+		if r, err := c.Receiver(electron.Source("source")); check("receiver", err) {
+			if rm, err := r.Receive(); err == nil {
+				fmt.Printf("client received %q\n", rm.Message.Body())
+			}
+		}
+	}
+}
+
+// Example client and server communicating via AMQP over a TCP/IP connection.
+//
+// Normally client and server would be separate processes.
+// For more realistic examples:
+//     https://github.com/apache/qpid-proton/blob/master/examples/go/README.md
+//
+func Example_clientServer() {
+	addr := startServer()
+	client(addr)
+	// Output:
+	// client sending
+	// server "target" received "hello"
+	// server "source" sending "hello yourself"
+	// client received "hello yourself"
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6134e216/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
index 8badf35..f46fdc4 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
@@ -34,7 +34,7 @@ import (
 // The result of sending a message is provided by an Outcome value.
 //
 // A sender can buffer messages up to the credit limit provided by the remote receiver.
-// Send* methods will block if the buffer is full until there is space.
+// All the Send* methods will block if the buffer is full until there is space.
 // Send*Timeout methods will give up after the timeout and set Timeout as Outcome.Error.
 //
 type Sender interface {
@@ -47,10 +47,14 @@ type Sender interface {
 
 	// SendWaitable puts a message in the send buffer and returns a channel that
 	// you can use to wait for the Outcome of just that message. The channel is
-	// buffered so you can receive from it whenever you want without blocking anything.
+	// buffered so you can receive from it whenever you want without blocking.
+	//
+	// Note: can block if there is no space to buffer the message.
 	SendWaitable(m amqp.Message) <-chan Outcome
 
 	// SendForget buffers a message for sending and returns, with no notification of the outcome.
+	//
+	// Note: can block if there is no space to buffer the message.
 	SendForget(m amqp.Message)
 
 	// SendAsync puts a message in the send buffer and returns immediately.  An
@@ -63,6 +67,8 @@ type Sender interface {
 	// goroutines to avoid blocking the connection.
 	//
 	// If ack == nil no Outcome is sent.
+	//
+	// Note: can block if there is no space to buffer the message.
 	SendAsync(m amqp.Message, ack chan<- Outcome, value interface{})
 
 	SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, value interface{}, timeout time.Duration)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6134e216/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
index 3303f0a..fa3e850 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
@@ -294,7 +294,7 @@ func (s Session) Receiver(name string) Link {
 // Unique (per process) string identifier for a connection, useful for debugging.
 func (c Connection) String() string {
 	// Use the transport address to match the default transport logs from PN_TRACE.
-	return fmt.Sprintf("%p", c.Transport().CPtr())
+	return fmt.Sprintf("(Connection)(%p)", c.Transport().CPtr())
 }
 
 func (c Connection) Type() string {
@@ -338,7 +338,7 @@ func (c Connection) SetPassword(password []byte) {
 }
 
 func (s Session) String() string {
-	return fmt.Sprintf("%s/%p", s.Connection(), s.pn)
+	return fmt.Sprintf("(Session)(%p)", s.pn) // TODO aconway 2016-09-12: should print channel number.
 }
 
 func (s Session) Type() string { return "session" }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org