You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/10/11 22:21:33 UTC

[01/12] qpid-proton git commit: PROTON-1953: [go] occasional client/server hang with high volume of messages

Repository: qpid-proton
Updated Branches:
  refs/heads/master e769f784e -> 4a9f3b986


PROTON-1953: [go] occasional client/server hang with high volume of messages

Due to race/deadlock in read/write and engine main  goroutine. Simplified:
- start read/write goroutines as needed
- handle read/write completion via Inject(), no special channels


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/486fbaf0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/486fbaf0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/486fbaf0

Branch: refs/heads/master
Commit: 486fbaf034f2d89688bb678914d798a1a1595cc5
Parents: e769f78
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Oct 10 16:09:33 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 11 11:16:37 2018 -0400

----------------------------------------------------------------------
 go/src/qpid.apache.org/proton/engine.go | 173 ++++++++++-----------------
 1 file changed, 64 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/486fbaf0/go/src/qpid.apache.org/proton/engine.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/proton/engine.go b/go/src/qpid.apache.org/proton/engine.go
index d28a09f..af26a5f 100644
--- a/go/src/qpid.apache.org/proton/engine.go
+++ b/go/src/qpid.apache.org/proton/engine.go
@@ -103,15 +103,16 @@ type Engine struct {
 	err    ErrorHolder
 	inject chan func()
 
-	conn       net.Conn
-	connection Connection
-	transport  Transport
-	collector  *C.pn_collector_t
-	handlers   []EventHandler // Handlers for proton events.
-	running    chan struct{}  // This channel will be closed when the goroutines are done.
-	closeOnce  sync.Once
-	timer      *time.Timer
-	traceEvent bool
+	conn             net.Conn
+	connection       Connection
+	transport        Transport
+	collector        *C.pn_collector_t
+	handlers         []EventHandler // Handlers for proton events.
+	running          chan struct{}  // This channel will be closed when the goroutines are done.
+	closeOnce        sync.Once
+	timer            *time.Timer
+	traceEvent       bool
+	reading, writing bool
 }
 
 const bufferSize = 4096
@@ -255,6 +256,12 @@ func (eng *Engine) Disconnect(err error) {
 	<-eng.running
 }
 
+// For debugging purposes: like Transport.Log() but takes a format string
+// and works even if the transport has been freed.
+func (eng *Engine) log(format string, args ...interface{}) {
+	fmt.Fprintf(os.Stderr, "[%p]: %v", eng.transport, fmt.Sprintf(format, args...))
+}
+
 // Let proton run timed activity and set up the next tick
 func (eng *Engine) tick() {
 	now := time.Now()
@@ -281,16 +288,50 @@ func (eng *Engine) dispatch() bool {
 	return !eng.transport.Closed() || C.pn_collector_peek(eng.collector) != nil
 }
 
-func (eng *Engine) writeBuffer() []byte {
-	size := eng.Transport().Pending() // Evaluate before Head(), may change buffer.
-	start := eng.Transport().Head()
-	return cByteSlice(start, size)
+func (eng *Engine) write() {
+	if !eng.writing {
+		size := eng.Transport().Pending() // Evaluate before Head(), may change buffer.
+		start := eng.Transport().Head()
+		if size > 0 {
+			eng.writing = true
+			go func() { // Blocking Write() in separate goroutineb
+				n, err := eng.conn.Write(cByteSlice(start, size))
+				eng.Inject(func() { // Inject results of Write back to engine goroutine
+					eng.writing = false
+					if n > 0 {
+						eng.transport.Pop(uint(n))
+					}
+					if err != nil {
+						eng.Transport().Condition().SetError(err)
+						eng.Transport().CloseHead()
+					}
+				})
+			}()
+		}
+	}
 }
 
-func (eng *Engine) readBuffer() []byte {
-	size := eng.Transport().Capacity()
-	start := eng.Transport().Tail()
-	return cByteSlice(start, size)
+func (eng *Engine) read() {
+	if !eng.reading {
+		size := eng.Transport().Capacity()
+		start := eng.Transport().Tail()
+		if size > 0 {
+			eng.reading = true
+			go func() { // Blocking Read in separate goroutine
+				n, err := eng.conn.Read(cByteSlice(start, size))
+				eng.Inject(func() {
+					eng.reading = false
+					if n > 0 {
+						eng.Transport().Process(uint(n))
+					}
+					if err != nil {
+						eng.Transport().Condition().SetError(err)
+						eng.Transport().CloseTail()
+					}
+				})
+			}()
+		}
+	}
 }
 
 func (eng *Engine) free() {
@@ -317,106 +358,20 @@ func (eng *Engine) Run() error {
 	defer eng.free()
 	eng.transport.Bind(eng.connection)
 	eng.tick() // Start ticking if needed
-
-	// Channels for read and write buffers going in and out of the read/write goroutines.
-	// The channels are unbuffered: we want to exchange buffers in sequence.
-	readsIn, writesIn := make(chan []byte), make(chan []byte)
-	readsOut, writesOut := make(chan []byte), make(chan []byte)
-
-	wait := sync.WaitGroup{}
-	wait.Add(2) // Read and write goroutines
-
-	go func() { // Read goroutine
-		defer wait.Done()
-		for {
-			rbuf, ok := <-readsIn
-			if !ok {
-				return
-			}
-			n, err := eng.conn.Read(rbuf)
-			if n > 0 {
-				readsOut <- rbuf[:n]
-			} else if err != nil {
-				_ = eng.Inject(func() {
-					eng.Transport().Condition().SetError(err)
-					eng.Transport().CloseTail()
-				})
-				return
-			}
-		}
-	}()
-
-	go func() { // Write goroutine
-		defer wait.Done()
-		for {
-			wbuf, ok := <-writesIn
-			if !ok {
-				return
-			}
-			n, err := eng.conn.Write(wbuf)
-			if n > 0 {
-				writesOut <- wbuf[:n]
-			} else if err != nil {
-				_ = eng.Inject(func() {
-					eng.Transport().Condition().SetError(err)
-					eng.Transport().CloseHead()
-				})
-				return
-			}
-		}
-	}()
-
 	for eng.dispatch() {
-		readBuf := eng.readBuffer()
-		writeBuf := eng.writeBuffer()
-		// Note that getting the buffers can generate events (eg. SASL events) that
-		// might close the transport. Check if we are already finished before
-		// blocking for IO.
-		if !eng.dispatch() {
-			break
-		}
-
-		// sendReads/sendWrites are nil (not sendable in select) unless we have a
-		// buffer to read/write
-		var sendReads, sendWrites chan []byte
-		if readBuf != nil {
-			sendReads = readsIn
-		}
-		if writeBuf != nil {
-			sendWrites = writesIn
-		}
-
-		// Send buffers to the read/write goroutines if we have them.
-		// Get buffers from the read/write goroutines and process them
-		// Check for injected functions
+		// Initiate read/write if needed
+		eng.read()
+		eng.write()
 		select {
-
-		case sendReads <- readBuf:
-
-		case sendWrites <- writeBuf:
-
-		case buf := <-readsOut:
-			eng.transport.Process(uint(len(buf)))
-
-		case buf := <-writesOut:
-			eng.transport.Pop(uint(len(buf)))
-
-		case f, ok := <-eng.inject: // Function injected from another goroutine
-			if ok {
-				f()
-			}
-
+		case f := <-eng.inject: // User or IO action
+			f()
 		case <-eng.timer.C:
 			eng.tick()
 		}
 	}
-
 	eng.err.Set(EndpointError(eng.Connection()))
 	eng.err.Set(eng.Transport().Condition().Error())
-	close(readsIn)
-	close(writesIn)
 	close(eng.running)   // Signal goroutines have exited and Error is set, disable Inject()
-	_ = eng.conn.Close() // Close conn, force read/write goroutines to exit (they will Inject)
-	wait.Wait()          // Wait for goroutines
+	_ = eng.conn.Close() // Close conn, force read goroutine to exit (Inject will fail)
 	return eng.err.Get()
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[05/12] qpid-proton git commit: PROTON-1954: [go] Container should default to random container-id

Posted by ac...@apache.org.
PROTON-1954: [go] Container should default to random container-id

- default to random container-id
- add ContainerId option to set container ID without a Container object
- set remote heartbeat on incoming connection settings


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/c2491078
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/c2491078
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/c2491078

Branch: refs/heads/master
Commit: c24910781489de19669384180d96a23ca5a4fbcd
Parents: d9b4b98
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Sep 20 20:53:14 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 11 12:33:08 2018 -0400

----------------------------------------------------------------------
 go/src/qpid.apache.org/electron/connection.go | 24 ++++++++++++++++++----
 1 file changed, 20 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c2491078/go/src/qpid.apache.org/electron/connection.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/connection.go b/go/src/qpid.apache.org/electron/connection.go
index 295dd50..0d5e7c5 100644
--- a/go/src/qpid.apache.org/electron/connection.go
+++ b/go/src/qpid.apache.org/electron/connection.go
@@ -23,6 +23,8 @@ package electron
 import "C"
 
 import (
+	"crypto/rand"
+	"encoding/hex"
 	"net"
 	"qpid.apache.org/proton"
 	"sync"
@@ -106,7 +108,7 @@ func (c connectionSettings) User() string             { return c.user }
 func (c connectionSettings) VirtualHost() string      { return c.virtualHost }
 func (c connectionSettings) Heartbeat() time.Duration { return c.heartbeat }
 
-// ConnectionOption can be passed when creating a connection to configure various options
+// ConnectionOption arguments can be passed when creating a connection to configure it.
 type ConnectionOption func(*connection)
 
 // User returns a ConnectionOption sets the user name for a connection
@@ -160,6 +162,12 @@ func Parent(cont Container) ConnectionOption {
 	return func(c *connection) { c.container = cont.(*container) }
 }
 
+// ContainerId returns a ConnectionOption that creates a new Container
+// with id and associates it with the connection
+func ContainerId(id string) ConnectionOption {
+	return func(c *connection) { c.container = NewContainer(id).(*container) }
+}
+
 type connection struct {
 	endpoint
 	connectionSettings
@@ -178,6 +186,7 @@ type connection struct {
 }
 
 // NewConnection creates a connection with the given options.
+// Options are applied in order.
 func NewConnection(conn net.Conn, opts ...ConnectionOption) (*connection, error) {
 	c := &connection{
 		conn: conn,
@@ -193,7 +202,12 @@ func NewConnection(conn net.Conn, opts ...ConnectionOption) (*connection, error)
 		set(c)
 	}
 	if c.container == nil {
-		c.container = NewContainer("").(*container)
+		// Generate a random container-id. Not an RFC4122-compliant UUID but probably-unique
+		id := make([]byte, 16)
+		if _, err = rand.Read(id); err != nil {
+			return nil, err
+		}
+		c.container = NewContainer(hex.EncodeToString(id)).(*container)
 	}
 	c.pConnection.SetContainer(c.container.Id())
 	saslConfig.setup(c.engine)
@@ -294,14 +308,16 @@ type IncomingConnection struct {
 func newIncomingConnection(c *connection) *IncomingConnection {
 	c.user = c.pConnection.Transport().User()
 	c.virtualHost = c.pConnection.RemoteHostname()
+	c.heartbeat = c.pConnection.Transport().RemoteIdleTimeout()
 	return &IncomingConnection{
 		incoming:           makeIncoming(c.pConnection),
 		connectionSettings: c.connectionSettings,
 		c:                  c}
 }
 
-// AcceptConnection is like Accept() but takes ConnectionOption s
-// For example you can set the Heartbeat() for the accepted connection.
+// AcceptConnection is like Accept() but takes ConnectionOption
+// arguments like NewConnection(). For example you can set the
+// Heartbeat() for the incoming connection.
 func (in *IncomingConnection) AcceptConnection(opts ...ConnectionOption) Connection {
 	return in.accept(func() Endpoint {
 		for _, opt := range opts {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[02/12] qpid-proton git commit: PROTON-1910: [go] test refactor and benchmarks

Posted by ac...@apache.org.
PROTON-1910: [go] test refactor and benchmarks

- Simplify commmon test tools, move to commmon_test.go
- Use net.Pipe for most tests, more efficient than a full network socket
- Added simple benchmarks


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d9b4b989
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d9b4b989
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d9b4b989

Branch: refs/heads/master
Commit: d9b4b9893412676e947d693a7d4e336fad4d4110
Parents: 486fbaf
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Sep 21 00:51:45 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 11 12:32:59 2018 -0400

----------------------------------------------------------------------
 go/src/qpid.apache.org/amqp/message_test.go     | 138 ++++++--
 go/src/qpid.apache.org/electron/auth_test.go    |  39 +--
 .../qpid.apache.org/electron/benchmark_test.go  | 132 ++++++++
 go/src/qpid.apache.org/electron/common_test.go  | 148 ++++++++
 .../qpid.apache.org/electron/electron_test.go   | 337 ++++---------------
 5 files changed, 459 insertions(+), 335 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9b4b989/go/src/qpid.apache.org/amqp/message_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/amqp/message_test.go b/go/src/qpid.apache.org/amqp/message_test.go
index 663e82f..b22c60d 100644
--- a/go/src/qpid.apache.org/amqp/message_test.go
+++ b/go/src/qpid.apache.org/amqp/message_test.go
@@ -20,6 +20,7 @@ under the License.
 package amqp
 
 import (
+	"reflect"
 	"testing"
 	"time"
 )
@@ -28,9 +29,7 @@ func roundTrip(m Message) error {
 	var err error
 	if buffer, err := m.Encode(nil); err == nil {
 		if m2, err := DecodeMessage(buffer); err == nil {
-			if err = checkEqual(m, m2); err == nil {
-				err = checkEqual(m.String(), m2.String())
-			}
+			err = checkEqual(m, m2)
 		}
 	}
 	return err
@@ -38,35 +37,40 @@ func roundTrip(m Message) error {
 
 func TestDefaultMessage(t *testing.T) {
 	m := NewMessage()
+	mv := reflect.ValueOf(m)
 	// Check defaults
-	for _, data := range [][]interface{}{
-		{m.Inferred(), false},
-		{m.Durable(), false},
-		{m.Priority(), uint8(4)},
-		{m.TTL(), time.Duration(0)},
-		{m.UserId(), ""},
-		{m.Address(), ""},
-		{m.Subject(), ""},
-		{m.ReplyTo(), ""},
-		{m.ContentType(), ""},
-		{m.ContentEncoding(), ""},
-		{m.GroupId(), ""},
-		{m.GroupSequence(), int32(0)},
-		{m.ReplyToGroupId(), ""},
-		{m.MessageId(), nil},
-		{m.CorrelationId(), nil},
-		{m.DeliveryAnnotations(), map[AnnotationKey]interface{}(nil)},
-		{m.MessageAnnotations(), map[AnnotationKey]interface{}(nil)},
-		{m.ApplicationProperties(), map[string]interface{}(nil)},
+	for _, x := range []struct {
+		method string
+		want   interface{}
+	}{
+		{"Inferred", false},
+		{"Durable", false},
+		{"Priority", uint8(4)},
+		{"TTL", time.Duration(0)},
+		{"UserId", ""},
+		{"Address", ""},
+		{"Subject", ""},
+		{"ReplyTo", ""},
+		{"ContentType", ""},
+		{"ContentEncoding", ""},
+		{"GroupId", ""},
+		{"GroupSequence", int32(0)},
+		{"ReplyToGroupId", ""},
+		{"MessageId", nil},
+		{"CorrelationId", nil},
+		{"DeliveryAnnotations", map[AnnotationKey]interface{}(nil)},
+		{"MessageAnnotations", map[AnnotationKey]interface{}(nil)},
+		{"ApplicationProperties", map[string]interface{}(nil)},
 
 		// Deprecated
-		{m.Instructions(), map[string]interface{}(nil)},
-		{m.Annotations(), map[string]interface{}(nil)},
-		{m.Properties(), map[string]interface{}(nil)},
-		{m.Body(), nil},
+		{"Instructions", map[string]interface{}(nil)},
+		{"Annotations", map[string]interface{}(nil)},
+		{"Properties", map[string]interface{}(nil)},
+		{"Body", nil},
 	} {
-		if err := checkEqual(data[0], data[1]); err != nil {
-			t.Error(err)
+		ret := mv.MethodByName(x.method).Call(nil)
+		if err := checkEqual(x.want, ret[0].Interface()); err != nil {
+			t.Errorf("%s: %s", x.method, err)
 		}
 	}
 	if err := roundTrip(m); err != nil {
@@ -84,14 +88,17 @@ func TestMessageString(t *testing.T) {
 	m.SetDeliveryAnnotations(map[AnnotationKey]interface{}{AnnotationKeySymbol("instructions"): "foo"})
 	m.SetMessageAnnotations(map[AnnotationKey]interface{}{AnnotationKeySymbol("annotations"): "bar"})
 	m.SetApplicationProperties(map[string]interface{}{"int": int32(32)})
+	if err := roundTrip(m); err != nil {
+		t.Error(err)
+	}
 	msgstr := `Message{user_id="user", instructions={:instructions="foo"}, annotations={:annotations="bar"}, properties={"int"=32}, body="hello"}`
 	if err := checkEqual(msgstr, m.String()); err != nil {
 		t.Error(err)
 	}
 }
 
-func TestMessageRoundTrip(t *testing.T) {
-	m := NewMessage()
+// Set all message properties
+func setMessageProperties(m Message) Message {
 	m.SetInferred(false)
 	m.SetDurable(true)
 	m.SetPriority(42)
@@ -110,7 +117,25 @@ func TestMessageRoundTrip(t *testing.T) {
 	m.SetDeliveryAnnotations(map[AnnotationKey]interface{}{AnnotationKeySymbol("instructions"): "foo"})
 	m.SetMessageAnnotations(map[AnnotationKey]interface{}{AnnotationKeySymbol("annotations"): "bar"})
 	m.SetApplicationProperties(map[string]interface{}{"int": int32(32), "bool": true})
-	m.Marshal("hello")
+	return m
+}
+
+func TestMessageRoundTrip(t *testing.T) {
+	m1 := NewMessage()
+	setMessageProperties(m1)
+	m1.Marshal("hello")
+
+	buffer, err := m1.Encode(nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+	m, err := DecodeMessage(buffer)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if err = checkEqual(m1, m); err != nil {
+		t.Error(err)
+	}
 
 	for _, data := range [][]interface{}{
 		{m.Inferred(), false},
@@ -215,3 +240,54 @@ func TestMessageBodyTypes(t *testing.T) {
 
 	// TODO aconway 2015-09-08: array etc.
 }
+
+// Benchmarks assign to package-scope variables to prevent being optimized out.
+var bmM Message
+var bmBuf []byte
+
+func BenchmarkNewMessageEmpty(b *testing.B) {
+	for n := 0; n < b.N; n++ {
+		bmM = NewMessage()
+	}
+}
+
+func BenchmarkNewMessageString(b *testing.B) {
+	for n := 0; n < b.N; n++ {
+		bmM = NewMessageWith("hello")
+	}
+}
+
+func BenchmarkNewMessageAll(b *testing.B) {
+	for n := 0; n < b.N; n++ {
+		bmM = setMessageProperties(NewMessageWith("hello"))
+	}
+}
+
+func BenchmarkEncode(b *testing.B) {
+	m := setMessageProperties(NewMessageWith("hello"))
+	var buf []byte
+	b.ResetTimer()
+	for n := 0; n < b.N; n++ {
+		buf, err := m.Encode(buf)
+		if err != nil {
+			b.Fatal(err)
+		}
+		bmBuf = buf
+	}
+}
+
+func BenchmarkDecode(b *testing.B) {
+	var buf []byte
+	buf, err := setMessageProperties(NewMessageWith("hello")).Encode(buf)
+	if err != nil {
+		b.Fatal(err)
+	}
+	m := NewMessage()
+	b.ResetTimer()
+	for n := 0; n < b.N; n++ {
+		if err := m.Decode(buf); err != nil {
+			b.Fatal(err)
+		}
+		bmM = m
+	}
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9b4b989/go/src/qpid.apache.org/electron/auth_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/auth_test.go b/go/src/qpid.apache.org/electron/auth_test.go
index 162b366..30ef401 100644
--- a/go/src/qpid.apache.org/electron/auth_test.go
+++ b/go/src/qpid.apache.org/electron/auth_test.go
@@ -29,57 +29,40 @@ import (
 	"testing"
 )
 
-func testAuthClientServer(t *testing.T, copts []ConnectionOption, sopts []ConnectionOption) (got connectionSettings, err error) {
-	client, server := newClientServerOpts(t, copts, sopts)
-	defer closeClientServer(client, server)
-
-	go func() {
-		for in := range server.Incoming() {
-			switch in := in.(type) {
-			case *IncomingConnection:
-				got = connectionSettings{user: in.User(), virtualHost: in.VirtualHost()}
-			}
-			in.Accept()
-		}
-	}()
-
-	err = client.Sync()
-	return
-}
-
 func TestAuthAnonymous(t *testing.T) {
-	got, err := testAuthClientServer(t,
+	p := newPipe(t,
 		[]ConnectionOption{User("fred"), VirtualHost("vhost"), SASLAllowInsecure(true)},
 		[]ConnectionOption{SASLAllowedMechs("ANONYMOUS"), SASLAllowInsecure(true)})
-	fatalIf(t, err)
-	errorIf(t, checkEqual(connectionSettings{user: "anonymous", virtualHost: "vhost"}, got))
+	fatalIf(t, p.server.Sync())
+	errorIf(t, checkEqual("anonymous", p.server.User()))
+	errorIf(t, checkEqual("vhost", p.server.VirtualHost()))
 }
 
 func TestAuthPlain(t *testing.T) {
 	extendedSASL.startTest(t)
-	got, err := testAuthClientServer(t,
+	p := newPipe(t,
 		[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN"), User("fred@proton"), Password([]byte("xxx"))},
 		[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN")})
-	fatalIf(t, err)
-	errorIf(t, checkEqual(connectionSettings{user: "fred@proton"}, got))
+	fatalIf(t, p.server.Sync())
+	errorIf(t, checkEqual("fred@proton", p.server.User()))
 }
 
 func TestAuthBadPass(t *testing.T) {
 	extendedSASL.startTest(t)
-	_, err := testAuthClientServer(t,
+	p := newPipe(t,
 		[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN"), User("fred@proton"), Password([]byte("yyy"))},
 		[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN")})
-	if err == nil {
+	if p.server.Sync() == nil {
 		t.Error("Expected auth failure for bad pass")
 	}
 }
 
 func TestAuthBadUser(t *testing.T) {
 	extendedSASL.startTest(t)
-	_, err := testAuthClientServer(t,
+	p := newPipe(t,
 		[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN"), User("foo@bar"), Password([]byte("yyy"))},
 		[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN")})
-	if err == nil {
+	if p.server.Sync() == nil {
 		t.Error("Expected auth failure for bad user")
 	}
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9b4b989/go/src/qpid.apache.org/electron/benchmark_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/benchmark_test.go b/go/src/qpid.apache.org/electron/benchmark_test.go
new file mode 100644
index 0000000..ae9d47c
--- /dev/null
+++ b/go/src/qpid.apache.org/electron/benchmark_test.go
@@ -0,0 +1,132 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
+	"flag"
+	"strings"
+	"sync"
+	"testing"
+
+	"qpid.apache.org/amqp"
+)
+
+// To change capacity use
+//     go test -bench=. -args -capacity 100
+var capacity = flag.Int("capacity", 1000, "Prefetch capacity")
+var bodySize = flag.Int("bodySize", 1000, "Message body size")
+
+type bmCommon struct {
+	b    *testing.B
+	p    *pair
+	s    Sender
+	r    Receiver
+	ack  chan Outcome
+	done sync.WaitGroup
+}
+
+func makeBmCommon(p *pair, waitCount int) bmCommon {
+	bm := bmCommon{p: p, b: p.t.(*testing.B)}
+	bm.p.capacity = *capacity
+	bm.p.prefetch = true
+	bm.s, bm.r = p.sender()
+	bm.ack = make(chan Outcome, *capacity)
+	bm.done.Add(waitCount)
+	bm.b.ResetTimer()
+	return bm
+}
+
+func (bm *bmCommon) receiveAccept() {
+	defer bm.done.Done()
+	for n := 0; n < bm.b.N; n++ {
+		if rm, err := bm.r.Receive(); err != nil {
+			bm.b.Fatal(err)
+		} else {
+			fatalIf(bm.b, rm.Accept())
+		}
+	}
+}
+
+func (bm *bmCommon) outcomes() {
+	defer bm.done.Done()
+	for n := 0; n < bm.b.N; n++ {
+		fatalIf(bm.b, (<-bm.ack).Error)
+	}
+}
+
+var emptyMsg = amqp.NewMessage()
+
+func BenchmarkSendForget(b *testing.B) {
+	bm := makeBmCommon(newPipe(b, nil, nil), 1)
+	defer bm.p.close()
+
+	go func() { // Receive, no ack
+		defer bm.done.Done()
+		for n := 0; n < b.N; n++ {
+			if _, err := bm.r.Receive(); err != nil {
+				b.Fatal(err)
+			}
+		}
+	}()
+
+	for n := 0; n < b.N; n++ {
+		bm.s.SendForget(emptyMsg)
+	}
+	bm.done.Wait()
+}
+
+func BenchmarkSendSync(b *testing.B) {
+	bm := makeBmCommon(newPipe(b, nil, nil), 1)
+	defer bm.p.close()
+
+	go bm.receiveAccept()
+	for n := 0; n < b.N; n++ {
+		fatalIf(b, bm.s.SendSync(emptyMsg).Error)
+	}
+	bm.done.Wait()
+}
+
+func BenchmarkSendAsync(b *testing.B) {
+	bm := makeBmCommon(newPipe(b, nil, nil), 2)
+	defer bm.p.close()
+
+	go bm.outcomes()      // Handle outcomes
+	go bm.receiveAccept() // Receive
+	for n := 0; n < b.N; n++ {
+		bm.s.SendAsync(emptyMsg, bm.ack, nil)
+	}
+	bm.done.Wait()
+}
+
+// Create a new message for each send, with body and property.
+func BenchmarkSendAsyncNewMessage(b *testing.B) {
+	body := strings.Repeat("x", *bodySize)
+	bm := makeBmCommon(newPipe(b, nil, nil), 2)
+	defer bm.p.close()
+
+	go bm.outcomes()      // Handle outcomes
+	go bm.receiveAccept() // Receive
+	for n := 0; n < b.N; n++ {
+		msg := amqp.NewMessageWith(body)
+		msg.SetApplicationProperties(map[string]interface{}{"prop": "value"})
+		bm.s.SendAsync(msg, bm.ack, nil)
+	}
+	bm.done.Wait()
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9b4b989/go/src/qpid.apache.org/electron/common_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/common_test.go b/go/src/qpid.apache.org/electron/common_test.go
new file mode 100644
index 0000000..3aad825
--- /dev/null
+++ b/go/src/qpid.apache.org/electron/common_test.go
@@ -0,0 +1,148 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
+	"fmt"
+	"net"
+	"path"
+	"reflect"
+	"runtime"
+	"testing"
+)
+
+func decorate(err error, callDepth int) string {
+	_, file, line, _ := runtime.Caller(callDepth + 1) // annotate with location of caller.
+	_, file = path.Split(file)
+	return fmt.Sprintf("\n%s:%d: %v", file, line, err)
+}
+
+func fatalIfN(t testing.TB, err error, callDepth int) {
+	if err != nil {
+		t.Fatal(decorate(err, callDepth+1))
+	}
+}
+
+func fatalIf(t testing.TB, err error) {
+	fatalIfN(t, err, 1)
+}
+
+func errorIf(t testing.TB, err error) {
+	if err != nil {
+		t.Errorf(decorate(err, 1))
+	}
+}
+
+func checkEqual(want interface{}, got interface{}) error {
+	if !reflect.DeepEqual(want, got) {
+		return fmt.Errorf("(%#v != %#v)", want, got)
+	}
+	return nil
+}
+
+// AMQP client/server pair
+type pair struct {
+	t        testing.TB
+	client   Session
+	server   Connection
+	capacity int
+	prefetch bool
+	rchan    chan Receiver
+	schan    chan Sender
+	auth     connectionSettings
+}
+
+func newPair(t testing.TB, cli, srv net.Conn, clientOpts, serverOpts []ConnectionOption) *pair {
+	opts := append([]ConnectionOption{Server()}, serverOpts...)
+	sc, _ := NewConnection(srv, opts...)
+	opts = append([]ConnectionOption{}, clientOpts...)
+	cc, _ := NewConnection(cli, opts...)
+	cs, _ := cc.Session()
+	p := &pair{
+		t:        t,
+		client:   cs,
+		server:   sc,
+		capacity: 100,
+		rchan:    make(chan Receiver),
+		schan:    make(chan Sender)}
+
+	go func() {
+		for i := range p.server.Incoming() {
+			switch i := i.(type) {
+			case *IncomingReceiver:
+				if p.capacity > 0 {
+					i.SetCapacity(p.capacity)
+				}
+				i.SetPrefetch(p.prefetch)
+				p.rchan <- i.Accept().(Receiver)
+				break
+			case *IncomingSender:
+				p.schan <- i.Accept().(Sender)
+			default:
+				i.Accept()
+			}
+		}
+	}()
+
+	return p
+}
+
+// AMQP pair linked by in-memory pipe
+func newPipe(t testing.TB, clientOpts, serverOpts []ConnectionOption) *pair {
+	cli, srv := net.Pipe()
+	return newPair(t, cli, srv, clientOpts, serverOpts)
+}
+
+// AMQP pair linked by TCP socket
+func newSocketPair(t testing.TB, clientOpts, serverOpts []ConnectionOption) *pair {
+	l, err := net.Listen("tcp4", ":0") // For systems with ipv6 disabled
+	fatalIfN(t, err, 1)
+	srvCh := make(chan net.Conn)
+	var srvErr error
+	go func() {
+		var c net.Conn
+		c, srvErr = l.Accept()
+		srvCh <- c
+	}()
+	addr := l.Addr()
+	cli, err := net.Dial(addr.Network(), addr.String())
+	fatalIfN(t, err, 1)
+	srv := <-srvCh
+	fatalIfN(t, srvErr, 1)
+	return newPair(t, cli, srv, clientOpts, serverOpts)
+}
+
+func (p *pair) close() { p.client.Connection().Close(nil); p.server.Close(nil) }
+
+// Return a client sender and server receiver
+func (p *pair) sender(opts ...LinkOption) (Sender, Receiver) {
+	snd, err := p.client.Sender(opts...)
+	fatalIfN(p.t, err, 2)
+	rcv := <-p.rchan
+	return snd, rcv
+}
+
+// Return a client receiver and server sender
+func (p *pair) receiver(opts ...LinkOption) (Receiver, Sender) {
+	rcv, err := p.client.Receiver(opts...)
+	fatalIfN(p.t, err, 2)
+	snd := <-p.schan
+	return rcv, snd
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9b4b989/go/src/qpid.apache.org/electron/electron_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/electron_test.go b/go/src/qpid.apache.org/electron/electron_test.go
index 74759f5..c8a51c7 100644
--- a/go/src/qpid.apache.org/electron/electron_test.go
+++ b/go/src/qpid.apache.org/electron/electron_test.go
@@ -21,119 +21,23 @@ package electron
 
 import (
 	"fmt"
-	"net"
-	"path"
 	"qpid.apache.org/amqp"
-	"reflect"
-	"runtime"
 	"testing"
 	"time"
 )
 
-func fatalIf(t *testing.T, err error) {
-	if err != nil {
-		_, file, line, ok := runtime.Caller(1) // annotate with location of caller.
-		if ok {
-			_, file = path.Split(file)
-		}
-		t.Fatalf("(from %s:%d) %v", file, line, err)
-	}
-}
-
-func errorIf(t *testing.T, err error) {
-	if err != nil {
-		_, file, line, ok := runtime.Caller(1) // annotate with location of caller.
-		if ok {
-			_, file = path.Split(file)
-		}
-		t.Errorf("(from %s:%d) %v", file, line, err)
-	}
-}
-
-func checkEqual(want interface{}, got interface{}) error {
-	if !reflect.DeepEqual(want, got) {
-		return fmt.Errorf("%#v != %#v", want, got)
-	}
-	return nil
-}
-
-// Start a server, return listening addr and channel for incoming Connections.
-func newServer(t *testing.T, cont Container, opts ...ConnectionOption) (net.Addr, <-chan Connection) {
-	listener, err := net.Listen("tcp4", "") // For systems with ipv6 disabled
-	fatalIf(t, err)
-	addr := listener.Addr()
-	ch := make(chan Connection)
-	go func() {
-		conn, err := listener.Accept()
-		c, err := cont.Connection(conn, append([]ConnectionOption{Server()}, opts...)...)
-		fatalIf(t, err)
-		ch <- c
-	}()
-	return addr, ch
-}
-
-// Open a client connection and session, return the session.
-func newClient(t *testing.T, cont Container, addr net.Addr, opts ...ConnectionOption) Session {
-	conn, err := net.Dial(addr.Network(), addr.String())
-	fatalIf(t, err)
-	// Don't  bother checking error here, it's an async error so it's racy to do so anyway.
-	// Let caller use Sync() or catch it on first use.
-	c, _ := cont.Connection(conn, opts...)
-	sn, _ := c.Session()
-	return sn
-}
-
-// Return client and server ends of the same connection.
-func newClientServerOpts(t *testing.T, copts []ConnectionOption, sopts []ConnectionOption) (client Session, server Connection) {
-	addr, ch := newServer(t, NewContainer("test-server"), sopts...)
-	client = newClient(t, NewContainer("test-client"), addr, copts...)
-	return client, <-ch
-}
-
-// Return client and server ends of the same connection.
-func newClientServer(t *testing.T) (client Session, server Connection) {
-	return newClientServerOpts(t, nil, nil)
-}
-
-// Close client and server
-func closeClientServer(client Session, server Connection) {
-	client.Connection().Close(nil)
-	server.Close(nil)
-}
-
 // Send a message one way with a client sender and server receiver, verify ack.
-func TestClientSendServerReceive(t *testing.T) {
+func TestClientSender(t *testing.T) {
+	p := newPipe(t, nil, nil)
+	defer func() { p.close() }()
+
 	nLinks := 3
 	nMessages := 3
 
-	rchan := make(chan Receiver, nLinks)
-	client, server := newClientServer(t)
-	go func() {
-		for in := range server.Incoming() {
-			switch in := in.(type) {
-			case *IncomingReceiver:
-				in.SetCapacity(1)
-				in.SetPrefetch(false)
-				rchan <- in.Accept().(Receiver)
-			default:
-				in.Accept()
-			}
-		}
-	}()
-
-	defer func() { closeClientServer(client, server) }()
-
 	s := make([]Sender, nLinks)
-	for i := 0; i < nLinks; i++ {
-		var err error
-		s[i], err = client.Sender(Target(fmt.Sprintf("foo%d", i)))
-		if err != nil {
-			t.Fatal(err)
-		}
-	}
 	r := make([]Receiver, nLinks)
 	for i := 0; i < nLinks; i++ {
-		r[i] = <-rchan
+		s[i], r[i] = p.sender(Target(fmt.Sprintf("foo%d", i)))
 	}
 
 	for i := 0; i < nLinks; i++ {
@@ -146,16 +50,12 @@ func TestClientSendServerReceive(t *testing.T) {
 				m := amqp.NewMessageWith(fmt.Sprintf("foobar%v-%v", i, j))
 				var err error
 				s[i].SendAsync(m, ack, "testing")
-				if err != nil {
-					t.Fatal(err)
-				}
+				fatalIf(t, err)
 			}()
 
 			// Server receive
 			rm, err := r[i].Receive()
-			if err != nil {
-				t.Fatal(err)
-			}
+			fatalIf(t, err)
 			if want, got := interface{}(fmt.Sprintf("foobar%v-%v", i, j)), rm.Message.Body(); want != got {
 				t.Errorf("%#v != %#v", want, got)
 			}
@@ -182,76 +82,28 @@ func TestClientSendServerReceive(t *testing.T) {
 
 func TestClientReceiver(t *testing.T) {
 	nMessages := 3
-	client, server := newClientServer(t)
+	p := newPipe(t, nil, nil)
+	defer func() { p.close() }()
+	r, s := p.receiver(Source("foo"), Capacity(nMessages), Prefetch(true))
 	go func() {
-		for in := range server.Incoming() {
-			switch in := in.(type) {
-			case *IncomingSender:
-				s := in.Accept().(Sender)
-				go func() {
-					for i := int32(0); i < int32(nMessages); i++ {
-						out := s.SendSync(amqp.NewMessageWith(i))
-						if out.Error != nil {
-							t.Error(out.Error)
-							return
-						}
-					}
-					s.Close(nil)
-				}()
-			default:
-				in.Accept()
-			}
+		for i := 0; i < nMessages; i++ { // Server sends
+			out := s.SendSync(amqp.NewMessageWith(int32(i)))
+			fatalIf(t, out.Error)
 		}
 	}()
-
-	r, err := client.Receiver(Source("foo"))
-	if err != nil {
-		t.Fatal(err)
-	}
-	for i := int32(0); i < int32(nMessages); i++ {
+	for i := 0; i < nMessages; i++ { // Client receives
 		rm, err := r.Receive()
-		if err != nil {
-			if err != Closed {
-				t.Error(err)
-			}
-			break
-		}
-		if err := rm.Accept(); err != nil {
-			t.Error(err)
-		}
-		if b, ok := rm.Message.Body().(int32); !ok || b != i {
-			t.Errorf("want %v, true got %v, %v", i, b, ok)
-		}
+		fatalIf(t, err)
+		errorIf(t, checkEqual(int32(i), rm.Message.Body()))
+		errorIf(t, rm.Accept())
 	}
-	server.Close(nil)
-	client.Connection().Close(nil)
 }
 
 // Test timeout versions of waiting functions.
 func TestTimeouts(t *testing.T) {
-	var err error
-	rchan := make(chan Receiver, 1)
-	client, server := newClientServer(t)
-	go func() {
-		for i := range server.Incoming() {
-			switch i := i.(type) {
-			case *IncomingReceiver:
-				i.SetCapacity(1)
-				i.SetPrefetch(false)
-				rchan <- i.Accept().(Receiver) // Issue credit only on receive
-			default:
-				i.Accept()
-			}
-		}
-	}()
-	defer func() { closeClientServer(client, server) }()
-
-	// Open client sender
-	snd, err := client.Sender(Target("test"))
-	if err != nil {
-		t.Fatal(err)
-	}
-	rcv := <-rchan
+	p := newPipe(t, nil, nil)
+	defer func() { p.close() }()
+	snd, rcv := p.sender(Target("test"))
 
 	// Test send with timeout
 	short := time.Millisecond
@@ -264,11 +116,11 @@ func TestTimeouts(t *testing.T) {
 		t.Error("want Timeout got", err)
 	}
 	// Test receive with timeout
-	if _, err = rcv.ReceiveTimeout(0); err != Timeout { // No credit, expect timeout.
+	if _, err := rcv.ReceiveTimeout(0); err != Timeout { // No credit, expect timeout.
 		t.Error("want Timeout got", err)
 	}
 	// Test receive with timeout
-	if _, err = rcv.ReceiveTimeout(short); err != Timeout { // No credit, expect timeout.
+	if _, err := rcv.ReceiveTimeout(short); err != Timeout { // No credit, expect timeout.
 		t.Error("want Timeout got", err)
 	}
 	// There is now a credit on the link due to receive
@@ -295,57 +147,6 @@ func TestTimeouts(t *testing.T) {
 	}
 }
 
-// A server that returns the opposite end of each client link via channels.
-type pairs struct {
-	t        *testing.T
-	client   Session
-	server   Connection
-	rchan    chan Receiver
-	schan    chan Sender
-	capacity int
-	prefetch bool
-}
-
-func newPairs(t *testing.T, capacity int, prefetch bool) *pairs {
-	p := &pairs{t: t, rchan: make(chan Receiver, 1), schan: make(chan Sender, 1)}
-	p.client, p.server = newClientServer(t)
-	go func() {
-		for i := range p.server.Incoming() {
-			switch i := i.(type) {
-			case *IncomingReceiver:
-				i.SetCapacity(capacity)
-				i.SetPrefetch(prefetch)
-				p.rchan <- i.Accept().(Receiver)
-			case *IncomingSender:
-				p.schan <- i.Accept().(Sender)
-			default:
-				i.Accept()
-			}
-		}
-	}()
-	return p
-}
-
-func (p *pairs) close() {
-	closeClientServer(p.client, p.server)
-}
-
-// Return a client sender and server receiver
-func (p *pairs) senderReceiver() (Sender, Receiver) {
-	snd, err := p.client.Sender()
-	fatalIf(p.t, err)
-	rcv := <-p.rchan
-	return snd, rcv
-}
-
-// Return a client receiver and server sender
-func (p *pairs) receiverSender() (Receiver, Sender) {
-	rcv, err := p.client.Receiver()
-	fatalIf(p.t, err)
-	snd := <-p.schan
-	return rcv, snd
-}
-
 type result struct {
 	label string
 	err   error
@@ -370,8 +171,9 @@ func doDisposition(ack <-chan Outcome, results chan result) {
 
 // Senders get credit immediately if receivers have prefetch set
 func TestSendReceivePrefetch(t *testing.T) {
-	pairs := newPairs(t, 1, true)
-	s, r := pairs.senderReceiver()
+	p := newPipe(t, nil, nil)
+	p.prefetch = true
+	s, r := p.sender()
 	s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // Should not block for credit.
 	if _, err := r.Receive(); err != nil {
 		t.Error(err)
@@ -380,8 +182,9 @@ func TestSendReceivePrefetch(t *testing.T) {
 
 // Senders do not get credit till Receive() if receivers don't have prefetch
 func TestSendReceiveNoPrefetch(t *testing.T) {
-	pairs := newPairs(t, 1, false)
-	s, r := pairs.senderReceiver()
+	p := newPipe(t, nil, nil)
+	p.prefetch = false
+	s, r := p.sender()
 	done := make(chan struct{}, 1)
 	go func() {
 		s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // Should block for credit.
@@ -402,14 +205,14 @@ func TestSendReceiveNoPrefetch(t *testing.T) {
 // Test that closing Links interrupts blocked link functions.
 func TestLinkCloseInterrupt(t *testing.T) {
 	want := amqp.Error{Name: "x", Description: "all bad"}
-	pairs := newPairs(t, 1, false)
+	p := newPipe(t, nil, nil)
 	results := make(chan result) // Collect expected errors
 
 	// Note closing the link does not interrupt Send() calls, the AMQP spec says
 	// that deliveries can be settled after the link is closed.
 
 	// Receiver.Close() interrupts Receive()
-	snd, rcv := pairs.senderReceiver()
+	snd, rcv := p.sender()
 	go doReceive(rcv, results)
 	rcv.Close(want)
 	if r := <-results; want != r.err {
@@ -417,7 +220,7 @@ func TestLinkCloseInterrupt(t *testing.T) {
 	}
 
 	// Remote Sender.Close() interrupts Receive()
-	snd, rcv = pairs.senderReceiver()
+	snd, rcv = p.sender()
 	go doReceive(rcv, results)
 	snd.Close(want)
 	if r := <-results; want != r.err {
@@ -428,27 +231,28 @@ func TestLinkCloseInterrupt(t *testing.T) {
 // Test closing the server end of a connection.
 func TestConnectionCloseInterrupt1(t *testing.T) {
 	want := amqp.Error{Name: "x", Description: "bad"}
-	pairs := newPairs(t, 1, true)
+	p := newSocketPair(t, nil, nil)
+	p.prefetch = true
 	results := make(chan result) // Collect expected errors
 
 	// Connection.Close() interrupts Send, Receive, Disposition.
-	snd, rcv := pairs.senderReceiver()
+	snd, rcv := p.sender()
 	go doSend(snd, results)
 
 	if _, err := rcv.Receive(); err != nil {
 		t.Error("receive", err)
 	}
-	rcv, snd = pairs.receiverSender()
+	rcv, snd = p.receiver()
 	go doReceive(rcv, results)
 
-	snd, rcv = pairs.senderReceiver()
+	snd, rcv = p.sender()
 	ack := snd.SendWaitable(amqp.NewMessage())
 	if _, err := rcv.Receive(); err != nil {
 		t.Error("receive", err)
 	}
 	go doDisposition(ack, results)
 
-	pairs.server.Close(want)
+	p.server.Close(want)
 	for i := 0; i < 3; i++ {
 		if r := <-results; want != r.err {
 			t.Errorf("want %v got %v", want, r)
@@ -459,24 +263,25 @@ func TestConnectionCloseInterrupt1(t *testing.T) {
 // Test closing the client end of the connection.
 func TestConnectionCloseInterrupt2(t *testing.T) {
 	want := amqp.Error{Name: "x", Description: "bad"}
-	pairs := newPairs(t, 1, true)
+	p := newSocketPair(t, nil, nil)
+	p.prefetch = true
 	results := make(chan result) // Collect expected errors
 
 	// Connection.Close() interrupts Send, Receive, Disposition.
-	snd, rcv := pairs.senderReceiver()
+	snd, rcv := p.sender()
 	go doSend(snd, results)
 	if _, err := rcv.Receive(); err != nil {
 		t.Error("receive", err)
 	}
 
-	rcv, snd = pairs.receiverSender()
+	rcv, snd = p.receiver()
 	go doReceive(rcv, results)
 
-	snd, rcv = pairs.senderReceiver()
+	snd, rcv = p.sender()
 	ack := snd.SendWaitable(amqp.NewMessage())
 	go doDisposition(ack, results)
 
-	pairs.client.Connection().Close(want)
+	p.client.Connection().Close(want)
 	for i := 0; i < 3; i++ {
 		if r := <-results; want != r.err {
 			t.Errorf("want %v got %v", want, r.err)
@@ -484,63 +289,43 @@ func TestConnectionCloseInterrupt2(t *testing.T) {
 	}
 }
 
-func heartbeat(c Connection) time.Duration {
-	return c.(*connection).engine.Transport().RemoteIdleTimeout()
-}
-
 func TestHeartbeat(t *testing.T) {
-	client, server := newClientServerOpts(t,
-		[]ConnectionOption{Heartbeat(102 * time.Millisecond)},
-		nil)
-	defer closeClientServer(client, server)
-
-	var serverHeartbeat time.Duration
-
-	go func() {
-		for in := range server.Incoming() {
-			switch in := in.(type) {
-			case *IncomingConnection:
-				serverHeartbeat = in.Heartbeat()
-				in.AcceptConnection(Heartbeat(101 * time.Millisecond))
-			default:
-				in.Accept()
-			}
-		}
-	}()
+	p := newSocketPair(t,
+		[]ConnectionOption{Heartbeat(12 * time.Millisecond)},
+		[]ConnectionOption{Heartbeat(11 * time.Millisecond)})
+	defer func() { p.close() }()
 
-	// Freeze the server to stop it sending heartbeats.
+	// Function to freeze the server to stop it sending heartbeats.
 	unfreeze := make(chan bool)
 	defer close(unfreeze)
-	freeze := func() error { return server.(*connection).engine.Inject(func() { <-unfreeze }) }
+	freeze := func() error { return p.server.(*connection).engine.Inject(func() { <-unfreeze }) }
 
-	fatalIf(t, client.Sync())
-	errorIf(t, checkEqual(101*time.Millisecond, heartbeat(client.Connection())))
-	errorIf(t, checkEqual(102*time.Millisecond, serverHeartbeat))
-	errorIf(t, client.Connection().Error())
+	fatalIf(t, p.client.Sync())
+	errorIf(t, checkEqual(11*time.Millisecond, p.client.Connection().Heartbeat()))
+	errorIf(t, checkEqual(12*time.Millisecond, p.server.Heartbeat()))
 
 	// Freeze the server for less than a heartbeat
 	fatalIf(t, freeze())
-	time.Sleep(50 * time.Millisecond)
+	time.Sleep(5 * time.Millisecond)
 	unfreeze <- true
 	// Make sure server is still responding.
-	s, err := client.Sender()
-	errorIf(t, err)
+	s, _ := p.sender()
 	errorIf(t, s.Sync())
 
-	// Freeze the server till the client times out the connection
+	// Freeze the server till the p.client times out the connection
 	fatalIf(t, freeze())
 	select {
-	case <-client.Done():
-		if amqp.ResourceLimitExceeded != client.Error().(amqp.Error).Name {
-			t.Error("bad timeout error:", client.Error())
+	case <-p.client.Done():
+		if amqp.ResourceLimitExceeded != p.client.Error().(amqp.Error).Name {
+			t.Error("bad timeout error:", p.client.Error())
 		}
 	case <-time.After(400 * time.Millisecond):
 		t.Error("connection failed to time out")
 	}
 
 	unfreeze <- true // Unfreeze the server
-	<-server.Done()
-	if amqp.ResourceLimitExceeded != server.Error().(amqp.Error).Name {
-		t.Error("bad timeout error:", server.Error())
+	<-p.server.Done()
+	if amqp.ResourceLimitExceeded != p.server.Error().(amqp.Error).Name {
+		t.Error("bad timeout error:", p.server.Error())
 	}
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[07/12] qpid-proton git commit: PROTON-1955: [go] incorrect conversion between Go time and AMQP time

Posted by ac...@apache.org.
PROTON-1955: [go] incorrect conversion between Go time and AMQP time


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/ccaeaa0c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/ccaeaa0c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/ccaeaa0c

Branch: refs/heads/master
Commit: ccaeaa0cbcc1364b3265984ef15f5d92c6087793
Parents: 32c7036
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Oct 5 15:38:48 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 11 15:14:15 2018 -0400

----------------------------------------------------------------------
 go/src/qpid.apache.org/amqp/marshal.go    |  2 +-
 go/src/qpid.apache.org/amqp/types.go      | 26 ++++++++++++++++--------
 go/src/qpid.apache.org/amqp/types_test.go | 28 +++++++++++++++++++++++++-
 go/src/qpid.apache.org/amqp/unmarshal.go  |  4 ++--
 4 files changed, 48 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccaeaa0c/go/src/qpid.apache.org/amqp/marshal.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/amqp/marshal.go b/go/src/qpid.apache.org/amqp/marshal.go
index 99584a2..d846993 100644
--- a/go/src/qpid.apache.org/amqp/marshal.go
+++ b/go/src/qpid.apache.org/amqp/marshal.go
@@ -224,7 +224,7 @@ func marshal(i interface{}, data *C.pn_data_t) {
 
 		// Other simple types
 	case time.Time:
-		C.pn_data_put_timestamp(data, C.pn_timestamp_t(v.UnixNano()/1000))
+		C.pn_data_put_timestamp(data, pnTime(v))
 	case UUID:
 		C.pn_data_put_uuid(data, *(*C.pn_uuid_t)(unsafe.Pointer(&v[0])))
 	case Char:

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccaeaa0c/go/src/qpid.apache.org/amqp/types.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/amqp/types.go b/go/src/qpid.apache.org/amqp/types.go
index a1fe2ac..04226ff 100644
--- a/go/src/qpid.apache.org/amqp/types.go
+++ b/go/src/qpid.apache.org/amqp/types.go
@@ -163,19 +163,29 @@ func (l List) GoString() string {
 }
 
 // pnTime converts Go time.Time to Proton millisecond Unix time.
+// Take care to convert zero values to zero values.
 func pnTime(t time.Time) C.pn_timestamp_t {
-	secs := t.Unix()
-	// Note: sub-second accuracy is not guaranteed if the Unix time in
-	// nanoseconds cannot be represented by an int64 (sometime around year 2260)
-	msecs := (t.UnixNano() % int64(time.Second)) / int64(time.Millisecond)
-	return C.pn_timestamp_t(secs*1000 + msecs)
+	if t.IsZero() {
+		return C.pn_timestamp_t(0)
+	}
+	return C.pn_timestamp_t(t.UnixNano() / int64(time.Millisecond))
 }
 
 // goTime converts a pn_timestamp_t to a Go time.Time.
+// Take care to convert zero values to zero values.
 func goTime(t C.pn_timestamp_t) time.Time {
-	secs := int64(t) / 1000
-	nsecs := (int64(t) % 1000) * int64(time.Millisecond)
-	return time.Unix(secs, nsecs)
+	if t == 0 {
+		return time.Time{}
+	}
+	return time.Unix(0, int64(t)*int64(time.Millisecond))
+}
+
+func pnDuration(d time.Duration) C.pn_millis_t {
+	return (C.pn_millis_t)(d / (time.Millisecond))
+}
+
+func goDuration(d C.pn_millis_t) time.Duration {
+	return time.Duration(d) * time.Millisecond
 }
 
 func goBytes(cBytes C.pn_bytes_t) (bytes []byte) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccaeaa0c/go/src/qpid.apache.org/amqp/types_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/amqp/types_test.go b/go/src/qpid.apache.org/amqp/types_test.go
index 994a3cd..d7e54f1 100644
--- a/go/src/qpid.apache.org/amqp/types_test.go
+++ b/go/src/qpid.apache.org/amqp/types_test.go
@@ -28,7 +28,7 @@ import (
 
 func checkEqual(want interface{}, got interface{}) error {
 	if !reflect.DeepEqual(want, got) {
-		return fmt.Errorf("%#v != %#v", want, got)
+		return fmt.Errorf("(%T)%#v != (%T)%#v", want, want, got, got)
 	}
 	return nil
 }
@@ -200,3 +200,29 @@ func TestDescribed(t *testing.T) {
 		t.Error(err)
 	}
 }
+
+func TestTimeConversion(t *testing.T) {
+	pt := pnTime(timeValue)
+	if err := checkEqual(timeValue, goTime(pt)); err != nil {
+		t.Error(err)
+	}
+	if err := checkEqual(pt, pnTime(goTime(pt))); err != nil {
+		t.Error(err)
+	}
+	ut := time.Unix(123, 456*1000000)
+	if err := checkEqual(123456, int(pnTime(ut))); err != nil {
+		t.Error(err)
+	}
+	if err := checkEqual(ut, goTime(123456)); err != nil {
+		t.Error(err)
+	}
+
+	// Preserve zero values
+	var tz time.Time
+	if err := checkEqual(0, int(pnTime(tz))); err != nil {
+		t.Error(err)
+	}
+	if err := checkEqual(tz, goTime(pnTime(tz))); err != nil {
+		t.Error(err)
+	}
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccaeaa0c/go/src/qpid.apache.org/amqp/unmarshal.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/amqp/unmarshal.go b/go/src/qpid.apache.org/amqp/unmarshal.go
index 2c6e3f1..054f115 100644
--- a/go/src/qpid.apache.org/amqp/unmarshal.go
+++ b/go/src/qpid.apache.org/amqp/unmarshal.go
@@ -487,7 +487,7 @@ func unmarshal(v interface{}, data *C.pn_data_t) {
 
 	case *time.Time:
 		panicUnless(pnType == C.PN_TIMESTAMP, data, v)
-		*v = time.Unix(0, int64(C.pn_data_get_timestamp(data))*1000)
+		*v = goTime(C.pn_data_get_timestamp(data))
 
 	case *UUID:
 		panicUnless(pnType == C.PN_UUID, data, v)
@@ -566,7 +566,7 @@ func getInterface(data *C.pn_data_t, vp *interface{}) {
 	case C.PN_SYMBOL:
 		*vp = Symbol(goString(C.pn_data_get_symbol(data)))
 	case C.PN_TIMESTAMP:
-		*vp = time.Unix(0, int64(C.pn_data_get_timestamp(data))*1000)
+		*vp = goTime(C.pn_data_get_timestamp(data))
 	case C.PN_UUID:
 		var u UUID
 		unmarshal(&u, data)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[09/12] qpid-proton git commit: NO-JIRA: Map Go errors to amqp::internal-error

Posted by ac...@apache.org.
NO-JIRA: Map Go errors to amqp::internal-error

Previously using go error type name which is not useful, and often empty.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/84844a95
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/84844a95
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/84844a95

Branch: refs/heads/master
Commit: 84844a95c37d8e6dcdc397b9974769e07980e8ae
Parents: ccaeaa0
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Oct 5 17:05:16 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 11 15:14:20 2018 -0400

----------------------------------------------------------------------
 go/src/qpid.apache.org/amqp/error.go      | 11 +++++++----
 go/src/qpid.apache.org/proton/wrappers.go | 17 ++++++-----------
 2 files changed, 13 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/84844a95/go/src/qpid.apache.org/amqp/error.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/amqp/error.go b/go/src/qpid.apache.org/amqp/error.go
index 3a178b2..a8974cb 100644
--- a/go/src/qpid.apache.org/amqp/error.go
+++ b/go/src/qpid.apache.org/amqp/error.go
@@ -24,7 +24,6 @@ import "C"
 
 import (
 	"fmt"
-	"reflect"
 )
 
 // Error is an AMQP error condition. It has a name and a description.
@@ -46,10 +45,14 @@ func Errorf(name, format string, arg ...interface{}) Error {
 	return Error{name, fmt.Sprintf(format, arg...)}
 }
 
-// MakeError makes an AMQP error from a go error using the Go error type as the name
-// and the err.Error() string as the description.
+// MakeError makes an AMQP error from a go error: {Name: InternalError, Description: err.Error()}
+// If err is already an amqp.Error it is returned unchanged.
 func MakeError(err error) Error {
-	return Error{reflect.TypeOf(err).Name(), err.Error()}
+	if amqpErr, ok := err.(Error); ok {
+		return amqpErr
+	} else {
+		return Error{InternalError, err.Error()}
+	}
 }
 
 var (

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/84844a95/go/src/qpid.apache.org/proton/wrappers.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/proton/wrappers.go b/go/src/qpid.apache.org/proton/wrappers.go
index a7b7fb2..42b2a23 100644
--- a/go/src/qpid.apache.org/proton/wrappers.go
+++ b/go/src/qpid.apache.org/proton/wrappers.go
@@ -37,10 +37,10 @@ import "C"
 
 import (
 	"fmt"
-	"qpid.apache.org/amqp"
-	"reflect"
 	"time"
 	"unsafe"
+
+	"qpid.apache.org/amqp"
 )
 
 // TODO aconway 2015-05-05: Documentation for generated types.
@@ -379,17 +379,12 @@ func (c Condition) Error() error {
 	return amqp.Error{Name: c.Name(), Description: c.Description()}
 }
 
-// Set a Go error into a condition.
-// If it is not an amqp.Condition use the error type as name, error string as description.
+// Set a Go error into a condition, converting to an amqp.Error using amqp.MakeError
 func (c Condition) SetError(err error) {
 	if err != nil {
-		if cond, ok := err.(amqp.Error); ok {
-			c.SetName(cond.Name)
-			c.SetDescription(cond.Description)
-		} else {
-			c.SetName(reflect.TypeOf(err).Name())
-			c.SetDescription(err.Error())
-		}
+		cond := amqp.MakeError(err)
+		c.SetName(cond.Name)
+		c.SetDescription(cond.Description)
 	}
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[11/12] qpid-proton git commit: PROTON-1910: [go] native Message implementation

Posted by ac...@apache.org.
PROTON-1910: [go] native Message implementation


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/886d2b93
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/886d2b93
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/886d2b93

Branch: refs/heads/master
Commit: 886d2b9349f74c02219b29990ee04278124f5224
Parents: ef716fa
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Oct 11 15:20:23 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 11 15:20:23 2018 -0400

----------------------------------------------------------------------
 go/src/qpid.apache.org/amqp/message.go      | 521 +++++++++++++++--------
 go/src/qpid.apache.org/amqp/message_test.go |  10 +-
 2 files changed, 355 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/886d2b93/go/src/qpid.apache.org/amqp/message.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/amqp/message.go b/go/src/qpid.apache.org/amqp/message.go
index e514b26..919904c 100644
--- a/go/src/qpid.apache.org/amqp/message.go
+++ b/go/src/qpid.apache.org/amqp/message.go
@@ -35,10 +35,10 @@ package amqp
 import "C"
 
 import (
+	"bytes"
 	"fmt"
-	"runtime"
+	"reflect"
 	"time"
-	"unsafe"
 )
 
 // Message is the interface to an AMQP message.
@@ -124,11 +124,12 @@ type Message interface {
 
 	// Per-delivery annotations to provide delivery instructions.
 	// May be added or removed by intermediaries during delivery.
+	// See ApplicationProperties() for properties set by the application.
 	DeliveryAnnotations() map[AnnotationKey]interface{}
 	SetDeliveryAnnotations(map[AnnotationKey]interface{})
 
 	// Message annotations added as part of the bare message at creation, usually
-	// by an AMQP library. See ApplicationProperties() for adding application data.
+	// by an AMQP library. See ApplicationProperties() for properties set by the application.
 	MessageAnnotations() map[AnnotationKey]interface{}
 	SetMessageAnnotations(map[AnnotationKey]interface{})
 
@@ -141,15 +142,18 @@ type Message interface {
 	Inferred() bool
 	SetInferred(bool)
 
-	// Marshal a Go value into the message body. See amqp.Marshal() for details.
+	// Get the message body, using the amqp.Unmarshal() rules for interface{}
+	Body() interface{}
+
+	// Set the body using amqp.Marshal()
+	SetBody(interface{})
+
+	// Marshal a Go value into the message body, synonym for SetBody()
 	Marshal(interface{})
 
-	// Unmarshal the message body into the value pointed to by v. See amqp.Unmarshal() for details.
+	// Unmarshal the message body into the value pointed to by v. See amqp.Unmarshal()
 	Unmarshal(interface{})
 
-	// Body value resulting from the default unmarshaling of message body as interface{}
-	Body() interface{}
-
 	// Encode encodes the message as AMQP data. If buffer is non-nil and is large enough
 	// the message is encoded into it, otherwise a new buffer is created.
 	// Returns the buffer containing the message.
@@ -158,7 +162,7 @@ type Message interface {
 	// Decode data into this message. Overwrites an existing message content.
 	Decode(buffer []byte) error
 
-	// Clear the message contents.
+	// Clear the message contents, set all fields to the default value.
 	Clear()
 
 	// Copy the contents of another message to this one.
@@ -180,194 +184,205 @@ type Message interface {
 	String() string
 }
 
-type message struct{ pn *C.pn_message_t }
-
-func freeMessage(m *message) {
-	C.pn_message_free(m.pn)
-	m.pn = nil
-}
-
 // NewMessage creates a new message instance.
 func NewMessage() Message {
-	m := &message{C.pn_message()}
-	runtime.SetFinalizer(m, freeMessage)
+	m := &message{}
+	m.Clear()
 	return m
 }
 
-// NewMessageWith creates a message with value as the body. Equivalent to
-//     m := NewMessage(); m.Marshal(body)
+// NewMessageWith creates a message with value as the body.
 func NewMessageWith(value interface{}) Message {
 	m := NewMessage()
-	m.Marshal(value)
+	m.SetBody(value)
 	return m
 }
 
-func (m *message) Clear() { C.pn_message_clear(m.pn) }
-
-func (m *message) Copy(x Message) error {
-	if data, err := x.Encode(nil); err == nil {
-		return m.Decode(data)
-	} else {
-		return err
-	}
-}
-
-// ==== message get functions
-
-func rewindGet(data *C.pn_data_t) (v interface{}) {
-	C.pn_data_rewind(data)
-	C.pn_data_next(data)
-	unmarshal(&v, data)
-	return v
-}
-
-func (m *message) Inferred() bool  { return bool(C.pn_message_is_inferred(m.pn)) }
-func (m *message) Durable() bool   { return bool(C.pn_message_is_durable(m.pn)) }
-func (m *message) Priority() uint8 { return uint8(C.pn_message_get_priority(m.pn)) }
-func (m *message) TTL() time.Duration {
-	return time.Duration(C.pn_message_get_ttl(m.pn)) * time.Millisecond
+// NewMessageCopy creates a copy of an existing message.
+func NewMessageCopy(m Message) Message {
+	m2 := NewMessage()
+	m2.Copy(m)
+	return m2
 }
-func (m *message) FirstAcquirer() bool        { return bool(C.pn_message_is_first_acquirer(m.pn)) }
-func (m *message) DeliveryCount() uint32      { return uint32(C.pn_message_get_delivery_count(m.pn)) }
-func (m *message) MessageId() interface{}     { return rewindGet(C.pn_message_id(m.pn)) }
-func (m *message) UserId() string             { return goString(C.pn_message_get_user_id(m.pn)) }
-func (m *message) Address() string            { return C.GoString(C.pn_message_get_address(m.pn)) }
-func (m *message) Subject() string            { return C.GoString(C.pn_message_get_subject(m.pn)) }
-func (m *message) ReplyTo() string            { return C.GoString(C.pn_message_get_reply_to(m.pn)) }
-func (m *message) CorrelationId() interface{} { return rewindGet(C.pn_message_correlation_id(m.pn)) }
-func (m *message) ContentType() string        { return C.GoString(C.pn_message_get_content_type(m.pn)) }
-func (m *message) ContentEncoding() string    { return C.GoString(C.pn_message_get_content_encoding(m.pn)) }
 
-func (m *message) ExpiryTime() time.Time {
-	return time.Unix(0, int64(time.Millisecond*time.Duration(C.pn_message_get_expiry_time(m.pn))))
-}
-func (m *message) CreationTime() time.Time {
-	return time.Unix(0, int64(time.Millisecond)*int64(C.pn_message_get_creation_time(m.pn)))
-}
-func (m *message) GroupId() string        { return C.GoString(C.pn_message_get_group_id(m.pn)) }
-func (m *message) GroupSequence() int32   { return int32(C.pn_message_get_group_sequence(m.pn)) }
-func (m *message) ReplyToGroupId() string { return C.GoString(C.pn_message_get_reply_to_group_id(m.pn)) }
+// Reset message to all default values
+func (m *message) Clear() { *m = message{priority: 4} }
 
-func getAnnotations(data *C.pn_data_t) (v map[AnnotationKey]interface{}) {
-	if C.pn_data_size(data) > 0 {
-		C.pn_data_rewind(data)
-		C.pn_data_next(data)
-		unmarshal(&v, data)
+// Copy makes a deep copy of message x
+func (m *message) Copy(x Message) error {
+	var mc MessageCodec
+	bytes, err := mc.Encode(x, nil)
+	if err == nil {
+		err = mc.Decode(m, bytes)
 	}
-	return v
-}
+	return err
+}
+
+type message struct {
+	address               string
+	applicationProperties map[string]interface{}
+	contentEncoding       string
+	contentType           string
+	correlationId         interface{}
+	creationTime          time.Time
+	deliveryAnnotations   map[AnnotationKey]interface{}
+	deliveryCount         uint32
+	durable               bool
+	expiryTime            time.Time
+	firstAcquirer         bool
+	groupId               string
+	groupSequence         int32
+	inferred              bool
+	messageAnnotations    map[AnnotationKey]interface{}
+	messageId             interface{}
+	priority              uint8
+	replyTo               string
+	replyToGroupId        string
+	subject               string
+	ttl                   time.Duration
+	userId                string
+	body                  interface{}
+	// Keep the original data to support Unmarshal to a non-interface{} type
+	// Waste of memory, consider deprecating or making it optional.
+	pnBody *C.pn_data_t
+}
+
+// ==== message get methods
+func (m *message) Body() interface{}          { return m.body }
+func (m *message) Inferred() bool             { return m.inferred }
+func (m *message) Durable() bool              { return m.durable }
+func (m *message) Priority() uint8            { return m.priority }
+func (m *message) TTL() time.Duration         { return m.ttl }
+func (m *message) FirstAcquirer() bool        { return m.firstAcquirer }
+func (m *message) DeliveryCount() uint32      { return m.deliveryCount }
+func (m *message) MessageId() interface{}     { return m.messageId }
+func (m *message) UserId() string             { return m.userId }
+func (m *message) Address() string            { return m.address }
+func (m *message) Subject() string            { return m.subject }
+func (m *message) ReplyTo() string            { return m.replyTo }
+func (m *message) CorrelationId() interface{} { return m.correlationId }
+func (m *message) ContentType() string        { return m.contentType }
+func (m *message) ContentEncoding() string    { return m.contentEncoding }
+func (m *message) ExpiryTime() time.Time      { return m.expiryTime }
+func (m *message) CreationTime() time.Time    { return m.creationTime }
+func (m *message) GroupId() string            { return m.groupId }
+func (m *message) GroupSequence() int32       { return m.groupSequence }
+func (m *message) ReplyToGroupId() string     { return m.replyToGroupId }
 
 func (m *message) DeliveryAnnotations() map[AnnotationKey]interface{} {
-	return getAnnotations(C.pn_message_instructions(m.pn))
+	if m.deliveryAnnotations == nil {
+		m.deliveryAnnotations = make(map[AnnotationKey]interface{})
+	}
+	return m.deliveryAnnotations
 }
 func (m *message) MessageAnnotations() map[AnnotationKey]interface{} {
-	return getAnnotations(C.pn_message_annotations(m.pn))
+	if m.messageAnnotations == nil {
+		m.messageAnnotations = make(map[AnnotationKey]interface{})
+	}
+	return m.messageAnnotations
 }
-
 func (m *message) ApplicationProperties() map[string]interface{} {
-	var v map[string]interface{}
-	data := C.pn_message_properties(m.pn)
-	if C.pn_data_size(data) > 0 {
-		C.pn_data_rewind(data)
-		C.pn_data_next(data)
-		unmarshal(&v, data)
+	if m.applicationProperties == nil {
+		m.applicationProperties = make(map[string]interface{})
 	}
-	return v
+	return m.applicationProperties
 }
 
 // ==== message set methods
 
-func setData(v interface{}, data *C.pn_data_t) {
-	C.pn_data_clear(data)
-	marshal(v, data)
-}
+func (m *message) SetBody(v interface{})          { m.body = v }
+func (m *message) SetInferred(x bool)             { m.inferred = x }
+func (m *message) SetDurable(x bool)              { m.durable = x }
+func (m *message) SetPriority(x uint8)            { m.priority = x }
+func (m *message) SetTTL(x time.Duration)         { m.ttl = x }
+func (m *message) SetFirstAcquirer(x bool)        { m.firstAcquirer = x }
+func (m *message) SetDeliveryCount(x uint32)      { m.deliveryCount = x }
+func (m *message) SetMessageId(x interface{})     { m.messageId = x }
+func (m *message) SetUserId(x string)             { m.userId = x }
+func (m *message) SetAddress(x string)            { m.address = x }
+func (m *message) SetSubject(x string)            { m.subject = x }
+func (m *message) SetReplyTo(x string)            { m.replyTo = x }
+func (m *message) SetCorrelationId(x interface{}) { m.correlationId = x }
+func (m *message) SetContentType(x string)        { m.contentType = x }
+func (m *message) SetContentEncoding(x string)    { m.contentEncoding = x }
+func (m *message) SetExpiryTime(x time.Time)      { m.expiryTime = x }
+func (m *message) SetCreationTime(x time.Time)    { m.creationTime = x }
+func (m *message) SetGroupId(x string)            { m.groupId = x }
+func (m *message) SetGroupSequence(x int32)       { m.groupSequence = x }
+func (m *message) SetReplyToGroupId(x string)     { m.replyToGroupId = x }
+
+func (m *message) SetDeliveryAnnotations(x map[AnnotationKey]interface{}) {
+	m.deliveryAnnotations = x
+}
+func (m *message) SetMessageAnnotations(x map[AnnotationKey]interface{}) {
+	m.messageAnnotations = x
+}
+func (m *message) SetApplicationProperties(x map[string]interface{}) {
+	m.applicationProperties = x
+}
+
+// Marshal body from v, same as SetBody(v). See amqp.Marshal.
+func (m *message) Marshal(v interface{}) { m.body = v }
 
-func (m *message) SetInferred(b bool)  { C.pn_message_set_inferred(m.pn, C.bool(b)) }
-func (m *message) SetDurable(b bool)   { C.pn_message_set_durable(m.pn, C.bool(b)) }
-func (m *message) SetPriority(b uint8) { C.pn_message_set_priority(m.pn, C.uint8_t(b)) }
-func (m *message) SetTTL(d time.Duration) {
-	C.pn_message_set_ttl(m.pn, C.pn_millis_t(d/time.Millisecond))
-}
-func (m *message) SetFirstAcquirer(b bool)     { C.pn_message_set_first_acquirer(m.pn, C.bool(b)) }
-func (m *message) SetDeliveryCount(c uint32)   { C.pn_message_set_delivery_count(m.pn, C.uint32_t(c)) }
-func (m *message) SetMessageId(id interface{}) { setData(id, C.pn_message_id(m.pn)) }
-func (m *message) SetUserId(s string)          { C.pn_message_set_user_id(m.pn, pnBytes(([]byte)(s))) }
-func (m *message) SetAddress(s string) {
-	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_address))
-}
-func (m *message) SetSubject(s string) {
-	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_subject))
-}
-func (m *message) SetReplyTo(s string) {
-	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to))
-}
-func (m *message) SetCorrelationId(c interface{}) { setData(c, C.pn_message_correlation_id(m.pn)) }
-func (m *message) SetContentType(s string) {
-	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_type))
-}
-func (m *message) SetContentEncoding(s string) {
-	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_encoding))
-}
-func (m *message) SetExpiryTime(t time.Time)   { C.pn_message_set_expiry_time(m.pn, pnTime(t)) }
-func (m *message) SetCreationTime(t time.Time) { C.pn_message_set_creation_time(m.pn, pnTime(t)) }
-func (m *message) SetGroupId(s string) {
-	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_group_id))
-}
-func (m *message) SetGroupSequence(s int32) {
-	C.pn_message_set_group_sequence(m.pn, C.pn_sequence_t(s))
-}
-func (m *message) SetReplyToGroupId(s string) {
-	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to_group_id))
+func (m *message) Unmarshal(v interface{}) {
+	// FIXME aconway 2018-09-28: this is inefficient, replace with a
+	// reflective conversion from the existing body value that respects
+	// the Unmarshal() rules.
+	pnData := C.pn_data(2)
+	marshal(m.body, pnData)
+	unmarshal(v, pnData)
 }
 
-func (m *message) SetDeliveryAnnotations(v map[AnnotationKey]interface{}) {
-	setData(v, C.pn_message_instructions(m.pn))
-}
-func (m *message) SetMessageAnnotations(v map[AnnotationKey]interface{}) {
-	setData(v, C.pn_message_annotations(m.pn))
+// Internal use only
+type MessageCodec struct {
+	pn *C.pn_message_t // Cache a pn_message_t to speed up encode/decode
+	// Optionally remember a byte buffer to use with MessageCodec methods.
+	Buffer []byte
 }
-func (m *message) SetApplicationProperties(v map[string]interface{}) {
-	setData(v, C.pn_message_properties(m.pn))
-}
-
-// Marshal body from v
-func (m *message) Marshal(v interface{}) { clearMarshal(v, C.pn_message_body(m.pn)) }
 
-// Unmarshal body to v, which must be a pointer to a value. See amqp.Unmarshal
-func (m *message) Unmarshal(v interface{}) {
-	data := C.pn_message_body(m.pn)
-	if C.pn_data_size(data) > 0 {
-		C.pn_data_rewind(data)
-		C.pn_data_next(data)
-		unmarshal(v, data)
+func (mc *MessageCodec) pnMessage() *C.pn_message_t {
+	if mc.pn == nil {
+		mc.pn = C.pn_message()
 	}
-	return
+	return mc.pn
 }
 
-// Return the body value as an interface
-func (m *message) Body() (v interface{}) { m.Unmarshal(&v); return }
-
-func (m *message) Decode(data []byte) error {
-	m.Clear()
-	if len(data) == 0 {
-		return fmt.Errorf("empty buffer for decode")
+func (mc *MessageCodec) Close() {
+	if mc.pn != nil {
+		C.pn_message_free(mc.pn)
+		mc.pn = nil
 	}
-	if C.pn_message_decode(m.pn, cPtr(data), cLen(data)) < 0 {
-		return fmt.Errorf("decoding message: %s", PnError(C.pn_message_error(m.pn)))
+}
+
+func (mc *MessageCodec) Decode(m Message, data []byte) error {
+	pn := mc.pnMessage()
+	if C.pn_message_decode(pn, cPtr(data), cLen(data)) < 0 {
+		return fmt.Errorf("decoding message: %s", PnError(C.pn_message_error(pn)))
 	}
+	m.(*message).get(pn)
 	return nil
 }
 
+func (m *message) Decode(data []byte) error {
+	var mc MessageCodec
+	defer mc.Close()
+	return mc.Decode(m, data)
+}
+
 func DecodeMessage(data []byte) (m Message, err error) {
 	m = NewMessage()
 	err = m.Decode(data)
 	return
 }
 
-func (m *message) Encode(buffer []byte) ([]byte, error) {
+// Encode m using buffer. Return the final buffer used to hold m,
+// may be different if the initial buffer was not large enough.
+func (mc *MessageCodec) Encode(m Message, buffer []byte) ([]byte, error) {
+	pn := mc.pnMessage()
+	m.(*message).put(pn)
 	encode := func(buf []byte) ([]byte, error) {
 		len := cLen(buf)
-		result := C.pn_message_encode(m.pn, cPtr(buf), &len)
+		result := C.pn_message_encode(pn, cPtr(buf), &len)
 		switch {
 		case result == C.PN_OVERFLOW:
 			return buf, overflow
@@ -380,50 +395,214 @@ func (m *message) Encode(buffer []byte) ([]byte, error) {
 	return encodeGrow(buffer, encode)
 }
 
+func (m *message) Encode(buffer []byte) ([]byte, error) {
+	var mc MessageCodec
+	defer mc.Close()
+	return mc.Encode(m, buffer)
+}
+
 // TODO aconway 2015-09-14: Multi-section messages.
 
-func (m *message) String() string {
-	str := C.pn_string(C.CString(""))
-	defer C.pn_free(unsafe.Pointer(str))
-	C.pn_inspect(unsafe.Pointer(m.pn), str)
-	return C.GoString(C.pn_string_get(str))
+type ignoreFunc func(v interface{}) bool
+
+func isNil(v interface{}) bool   { return v == nil }
+func isZero(v interface{}) bool  { return v == reflect.Zero(reflect.TypeOf(v)).Interface() }
+func isEmpty(v interface{}) bool { return reflect.ValueOf(v).Len() == 0 }
+
+type stringBuilder struct {
+	bytes.Buffer
+	separator string
 }
 
-// ==== Deprecated functions
-func oldGetAnnotations(data *C.pn_data_t) (v map[string]interface{}) {
-	if C.pn_data_size(data) > 0 {
+func (b *stringBuilder) field(name string, value interface{}, ignore ignoreFunc) {
+	if !ignore(value) {
+		b.WriteString(b.separator)
+		b.separator = ", "
+		b.WriteString(name)
+		b.WriteString(": ")
+		fmt.Fprintf(&b.Buffer, "%v", value)
+	}
+}
+
+// Human-readable string describing message.
+// Includes only message fields with non-default values.
+func (m *message) String() string {
+	var b stringBuilder
+	b.WriteString("Message{")
+	b.field("address", m.address, isEmpty)
+	b.field("durable", m.durable, isZero)
+	// Priority has weird default
+	b.field("priority", m.priority, func(v interface{}) bool { return v.(uint8) == 4 })
+	b.field("ttl", m.ttl, isZero)
+	b.field("first-acquirer", m.firstAcquirer, isZero)
+	b.field("delivery-count", m.deliveryCount, isZero)
+	b.field("message-id", m.messageId, isNil)
+	b.field("user-id", m.userId, isEmpty)
+	b.field("subject", m.subject, isEmpty)
+	b.field("reply-to", m.replyTo, isEmpty)
+	b.field("correlation-id", m.correlationId, isNil)
+	b.field("content-type", m.contentType, isEmpty)
+	b.field("content-encoding", m.contentEncoding, isEmpty)
+	b.field("expiry-time", m.expiryTime, isZero)
+	b.field("creation-time", m.creationTime, isZero)
+	b.field("group-id", m.groupId, isEmpty)
+	b.field("group-sequence", m.groupSequence, isZero)
+	b.field("reply-to-group-id", m.replyToGroupId, isEmpty)
+	b.field("inferred", m.inferred, isZero)
+	b.field("delivery-annotations", m.deliveryAnnotations, isEmpty)
+	b.field("message-annotations", m.messageAnnotations, isEmpty)
+	b.field("application-properties", m.applicationProperties, isEmpty)
+	b.field("body", m.body, isNil)
+	b.WriteString("}")
+	return b.String()
+}
+
+// ==== get message from pn_message_t
+
+func getData(v interface{}, data *C.pn_data_t) {
+	if data != nil && C.pn_data_size(data) > 0 {
 		C.pn_data_rewind(data)
 		C.pn_data_next(data)
-		unmarshal(&v, data)
+		unmarshal(v, data)
+	}
+	return
+}
+
+func getString(c *C.char) string {
+	if c == nil {
+		return ""
+	}
+	return C.GoString(c)
+}
+
+func (m *message) get(pn *C.pn_message_t) {
+	m.Clear()
+	m.inferred = bool(C.pn_message_is_inferred(pn))
+	m.durable = bool(C.pn_message_is_durable(pn))
+	m.priority = uint8(C.pn_message_get_priority(pn))
+	m.ttl = goDuration(C.pn_message_get_ttl(pn))
+	m.firstAcquirer = bool(C.pn_message_is_first_acquirer(pn))
+	m.deliveryCount = uint32(C.pn_message_get_delivery_count(pn))
+	getData(&m.messageId, C.pn_message_id(pn))
+	m.userId = string(goBytes(C.pn_message_get_user_id(pn)))
+	m.address = getString(C.pn_message_get_address(pn))
+	m.subject = getString(C.pn_message_get_subject(pn))
+	m.replyTo = getString(C.pn_message_get_reply_to(pn))
+	getData(&m.correlationId, C.pn_message_correlation_id(pn))
+	m.contentType = getString(C.pn_message_get_content_type(pn))
+	m.contentEncoding = getString(C.pn_message_get_content_encoding(pn))
+	m.expiryTime = goTime(C.pn_message_get_expiry_time(pn))
+	m.creationTime = goTime(C.pn_message_get_creation_time(pn))
+	m.groupId = getString(C.pn_message_get_group_id(pn))
+	m.groupSequence = int32(C.pn_message_get_group_sequence(pn))
+	m.replyToGroupId = getString(C.pn_message_get_reply_to_group_id(pn))
+	getData(&m.deliveryAnnotations, C.pn_message_instructions(pn))
+	getData(&m.messageAnnotations, C.pn_message_annotations(pn))
+	getData(&m.applicationProperties, C.pn_message_properties(pn))
+	getData(&m.body, C.pn_message_body(pn))
+}
+
+// ==== put message to pn_message_t
+
+func putData(v interface{}, pn *C.pn_data_t) {
+	if v != nil {
+		C.pn_data_clear(pn)
+		marshal(v, pn)
+	}
+}
+
+// For pointer-based fields (pn_data_t, strings, bytes) only
+// put a field if it has a non-empty value
+func (m *message) put(pn *C.pn_message_t) {
+	C.pn_message_clear(pn)
+	C.pn_message_set_inferred(pn, C.bool(m.inferred))
+	C.pn_message_set_durable(pn, C.bool(m.durable))
+	C.pn_message_set_priority(pn, C.uint8_t(m.priority))
+	C.pn_message_set_ttl(pn, pnDuration(m.ttl))
+	C.pn_message_set_first_acquirer(pn, C.bool(m.firstAcquirer))
+	C.pn_message_set_delivery_count(pn, C.uint32_t(m.deliveryCount))
+	putData(m.messageId, C.pn_message_id(pn))
+	if m.userId != "" {
+		C.pn_message_set_user_id(pn, pnBytes(([]byte)(m.userId)))
+	}
+	if m.address != "" {
+		C.pn_message_set_address(pn, C.CString(m.address))
+	}
+	if m.subject != "" {
+		C.pn_message_set_subject(pn, C.CString(m.subject))
+	}
+	if m.replyTo != "" {
+		C.pn_message_set_reply_to(pn, C.CString(m.replyTo))
+	}
+	putData(m.correlationId, C.pn_message_correlation_id(pn))
+	if m.contentType != "" {
+		C.pn_message_set_content_type(pn, C.CString(m.contentType))
+	}
+	if m.contentEncoding != "" {
+		C.pn_message_set_content_encoding(pn, C.CString(m.contentEncoding))
+	}
+	C.pn_message_set_expiry_time(pn, pnTime(m.expiryTime))
+	C.pn_message_set_creation_time(pn, pnTime(m.creationTime))
+	if m.groupId != "" {
+		C.pn_message_set_group_id(pn, C.CString(m.groupId))
 	}
-	return v
+	C.pn_message_set_group_sequence(pn, C.pn_sequence_t(m.groupSequence))
+	if m.replyToGroupId != "" {
+		C.pn_message_set_reply_to_group_id(pn, C.CString(m.replyToGroupId))
+	}
+	if len(m.deliveryAnnotations) != 0 {
+		putData(m.deliveryAnnotations, C.pn_message_instructions(pn))
+	}
+	if len(m.messageAnnotations) != 0 {
+		putData(m.messageAnnotations, C.pn_message_annotations(pn))
+	}
+	if len(m.applicationProperties) != 0 {
+		putData(m.applicationProperties, C.pn_message_properties(pn))
+	}
+	putData(m.body, C.pn_message_body(pn))
+}
+
+// ==== Deprecated functions
+
+func oldAnnotations(in map[AnnotationKey]interface{}) (out map[string]interface{}) {
+	if len(in) == 0 {
+		return nil
+	}
+	out = make(map[string]interface{})
+	for k, v := range in {
+		out[k.String()] = v
+	}
+	return
 }
 
 func (m *message) Instructions() map[string]interface{} {
-	return oldGetAnnotations(C.pn_message_instructions(m.pn))
+	return oldAnnotations(m.deliveryAnnotations)
 }
 func (m *message) Annotations() map[string]interface{} {
-	return oldGetAnnotations(C.pn_message_annotations(m.pn))
+	return oldAnnotations(m.messageAnnotations)
 }
 func (m *message) Properties() map[string]interface{} {
-	return oldGetAnnotations(C.pn_message_properties(m.pn))
+	return m.applicationProperties
 }
 
 // Convert old string-keyed annotations to an AnnotationKey map
-func fixAnnotations(old map[string]interface{}) (annotations map[AnnotationKey]interface{}) {
-	annotations = make(map[AnnotationKey]interface{})
-	for k, v := range old {
-		annotations[AnnotationKeyString(k)] = v
+func newAnnotations(in map[string]interface{}) (out map[AnnotationKey]interface{}) {
+	if len(in) == 0 {
+		return nil
+	}
+	out = make(map[AnnotationKey]interface{})
+	for k, v := range in {
+		out[AnnotationKeyString(k)] = v
 	}
 	return
 }
 
 func (m *message) SetInstructions(v map[string]interface{}) {
-	setData(fixAnnotations(v), C.pn_message_instructions(m.pn))
+	m.deliveryAnnotations = newAnnotations(v)
 }
 func (m *message) SetAnnotations(v map[string]interface{}) {
-	setData(fixAnnotations(v), C.pn_message_annotations(m.pn))
+	m.messageAnnotations = newAnnotations(v)
 }
 func (m *message) SetProperties(v map[string]interface{}) {
-	setData(fixAnnotations(v), C.pn_message_properties(m.pn))
+	m.applicationProperties = v
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/886d2b93/go/src/qpid.apache.org/amqp/message_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/amqp/message_test.go b/go/src/qpid.apache.org/amqp/message_test.go
index b22c60d..668ca49 100644
--- a/go/src/qpid.apache.org/amqp/message_test.go
+++ b/go/src/qpid.apache.org/amqp/message_test.go
@@ -58,14 +58,14 @@ func TestDefaultMessage(t *testing.T) {
 		{"ReplyToGroupId", ""},
 		{"MessageId", nil},
 		{"CorrelationId", nil},
-		{"DeliveryAnnotations", map[AnnotationKey]interface{}(nil)},
-		{"MessageAnnotations", map[AnnotationKey]interface{}(nil)},
-		{"ApplicationProperties", map[string]interface{}(nil)},
+		{"DeliveryAnnotations", map[AnnotationKey]interface{}{}},
+		{"MessageAnnotations", map[AnnotationKey]interface{}{}},
+		{"ApplicationProperties", map[string]interface{}{}},
 
 		// Deprecated
 		{"Instructions", map[string]interface{}(nil)},
 		{"Annotations", map[string]interface{}(nil)},
-		{"Properties", map[string]interface{}(nil)},
+		{"Properties", map[string]interface{}{}},
 		{"Body", nil},
 	} {
 		ret := mv.MethodByName(x.method).Call(nil)
@@ -91,7 +91,7 @@ func TestMessageString(t *testing.T) {
 	if err := roundTrip(m); err != nil {
 		t.Error(err)
 	}
-	msgstr := `Message{user_id="user", instructions={:instructions="foo"}, annotations={:annotations="bar"}, properties={"int"=32}, body="hello"}`
+	msgstr := "Message{user-id: user, delivery-annotations: map[instructions:foo], message-annotations: map[annotations:bar], application-properties: map[int:32], body: hello}"
 	if err := checkEqual(msgstr, m.String()); err != nil {
 		t.Error(err)
 	}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[04/12] qpid-proton git commit: NO-JIRA: Fix go vet -v warnings, minor typos

Posted by ac...@apache.org.
NO-JIRA: Fix go vet -v warnings, minor typos


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/f46076e4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/f46076e4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/f46076e4

Branch: refs/heads/master
Commit: f46076e4c19f05584f1724ef52930821c8d66c38
Parents: 55b2735
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Oct 5 15:38:14 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 11 12:33:08 2018 -0400

----------------------------------------------------------------------
 go/src/qpid.apache.org/amqp/types_test.go      |  2 +-
 go/src/qpid.apache.org/electron/common_test.go |  2 +-
 go/src/qpid.apache.org/electron/connection.go  |  5 ++-
 go/src/qpid.apache.org/electron/error.go       | 35 ---------------------
 go/src/qpid.apache.org/electron/receiver.go    |  8 +++--
 go/src/qpid.apache.org/proton/error.go         | 13 +-------
 6 files changed, 12 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f46076e4/go/src/qpid.apache.org/amqp/types_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/amqp/types_test.go b/go/src/qpid.apache.org/amqp/types_test.go
index 8f7cd6a..994a3cd 100644
--- a/go/src/qpid.apache.org/amqp/types_test.go
+++ b/go/src/qpid.apache.org/amqp/types_test.go
@@ -44,7 +44,7 @@ func checkUnmarshal(marshaled []byte, v interface{}) error {
 	return nil
 }
 
-func ExampleKey() {
+func ExampleAnnotationKey() {
 	var k AnnotationKey = AnnotationKeySymbol(Symbol("foo"))
 	fmt.Println(k.Get().(Symbol))
 	k = AnnotationKeyUint64(42)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f46076e4/go/src/qpid.apache.org/electron/common_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/common_test.go b/go/src/qpid.apache.org/electron/common_test.go
index e2e4a2a..59fbfff 100644
--- a/go/src/qpid.apache.org/electron/common_test.go
+++ b/go/src/qpid.apache.org/electron/common_test.go
@@ -47,7 +47,7 @@ func fatalIf(t testing.TB, err error) {
 
 func errorIf(t testing.TB, err error) {
 	if err != nil {
-		t.Errorf(decorate(err, 1))
+		t.Error(decorate(err, 1))
 	}
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f46076e4/go/src/qpid.apache.org/electron/connection.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/connection.go b/go/src/qpid.apache.org/electron/connection.go
index 628d933..48cba60 100644
--- a/go/src/qpid.apache.org/electron/connection.go
+++ b/go/src/qpid.apache.org/electron/connection.go
@@ -25,6 +25,7 @@ import "C"
 import (
 	"crypto/rand"
 	"encoding/hex"
+	"fmt"
 	"net"
 	"sync"
 	"time"
@@ -310,7 +311,9 @@ func (c *connection) WaitTimeout(timeout time.Duration) error {
 }
 
 func (c *connection) Incoming() <-chan Incoming {
-	assert(c.incoming != nil, "Incoming() is only allowed for a Connection created with the Server() option: %s", c)
+	if c.incoming == nil {
+		panic(fmt.Errorf("Incoming() only allowed on Connection created with the Server() option: %s", c))
+	}
 	return c.incoming
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f46076e4/go/src/qpid.apache.org/electron/error.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/error.go b/go/src/qpid.apache.org/electron/error.go
deleted file mode 100644
index 4dcfd94..0000000
--- a/go/src/qpid.apache.org/electron/error.go
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package electron
-
-import (
-	"fmt"
-)
-
-// assert panics if condition is false with optional formatted message
-func assert(condition bool, format ...interface{}) {
-	if !condition {
-		if len(format) > 0 {
-			panic(fmt.Errorf(format[0].(string), format[1:]...))
-		} else {
-			panic(fmt.Errorf("assertion failed"))
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f46076e4/go/src/qpid.apache.org/electron/receiver.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/receiver.go b/go/src/qpid.apache.org/electron/receiver.go
index 26b46a8..8e3b1a7 100644
--- a/go/src/qpid.apache.org/electron/receiver.go
+++ b/go/src/qpid.apache.org/electron/receiver.go
@@ -21,9 +21,10 @@ package electron
 
 import (
 	"fmt"
+	"time"
+
 	"qpid.apache.org/amqp"
 	"qpid.apache.org/proton"
-	"time"
 )
 
 // Receiver is a Link that receives messages.
@@ -126,7 +127,9 @@ func (r *receiver) Receive() (rm ReceivedMessage, err error) {
 }
 
 func (r *receiver) ReceiveTimeout(timeout time.Duration) (rm ReceivedMessage, err error) {
-	assert(r.buffer != nil, "Receiver is not open: %s", r)
+	if r.buffer == nil {
+		panic(fmt.Errorf("Receiver is not open: %s", r))
+	}
 	if !r.prefetch { // Per-caller flow control
 		select { // Check for immediate availability, avoid caller() inject
 		case rm2, ok := <-r.buffer:
@@ -164,7 +167,6 @@ func (r *receiver) message(delivery proton.Delivery) {
 			localClose(r.pLink, err)
 			return
 		}
-		assert(m != nil)
 		r.pLink.Advance()
 		if r.pLink.Credit() < 0 {
 			localClose(r.pLink, fmt.Errorf("received message in excess of credit limit"))

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f46076e4/go/src/qpid.apache.org/proton/error.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/proton/error.go b/go/src/qpid.apache.org/proton/error.go
index 5232fec..a6fb026 100644
--- a/go/src/qpid.apache.org/proton/error.go
+++ b/go/src/qpid.apache.org/proton/error.go
@@ -71,7 +71,7 @@ type ErrorHolder struct {
 	value atomic.Value
 }
 
-// Set the error if not already set, return the error in the Holder.
+// Set the error if not already set
 func (e *ErrorHolder) Set(err error) {
 	if err != nil {
 		e.once.Do(func() { e.value.Store(err) })
@@ -83,14 +83,3 @@ func (e *ErrorHolder) Get() (err error) {
 	err, _ = e.value.Load().(error)
 	return
 }
-
-// assert panics if condition is false with optional formatted message
-func assert(condition bool, format ...interface{}) {
-	if !condition {
-		if len(format) > 0 {
-			panic(fmt.Errorf(format[0].(string), format[1:]...))
-		} else {
-			panic(fmt.Errorf("assertion failed"))
-		}
-	}
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[06/12] qpid-proton git commit: PROTON-1952: [go] Server connection fails to authenticate

Posted by ac...@apache.org.
PROTON-1952: [go] Server connection fails to authenticate

Problem: If the Server() option came after SASL options in the option list, the
server hangs in authentication.

Solution: NewConnection panics if there is a Server() option that is not the first
option in the list.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/55b27351
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/55b27351
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/55b27351

Branch: refs/heads/master
Commit: 55b27351de92258cb4e8d7ba32701b6c0ba93a3c
Parents: c249107
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Oct 5 12:02:28 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 11 12:33:08 2018 -0400

----------------------------------------------------------------------
 go/src/qpid.apache.org/electron/common_test.go | 32 +++++++++---------
 go/src/qpid.apache.org/electron/connection.go  | 37 +++++++++++++++------
 go/src/qpid.apache.org/electron/container.go   |  5 +--
 3 files changed, 46 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/55b27351/go/src/qpid.apache.org/electron/common_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/common_test.go b/go/src/qpid.apache.org/electron/common_test.go
index 3aad825..e2e4a2a 100644
--- a/go/src/qpid.apache.org/electron/common_test.go
+++ b/go/src/qpid.apache.org/electron/common_test.go
@@ -25,6 +25,7 @@ import (
 	"path"
 	"reflect"
 	"runtime"
+	"sync"
 	"testing"
 )
 
@@ -69,16 +70,12 @@ type pair struct {
 	auth     connectionSettings
 }
 
-func newPair(t testing.TB, cli, srv net.Conn, clientOpts, serverOpts []ConnectionOption) *pair {
-	opts := append([]ConnectionOption{Server()}, serverOpts...)
-	sc, _ := NewConnection(srv, opts...)
-	opts = append([]ConnectionOption{}, clientOpts...)
-	cc, _ := NewConnection(cli, opts...)
-	cs, _ := cc.Session()
+func newPair(t testing.TB, cli, srv Connection) *pair {
+	cs, _ := cli.Session()
 	p := &pair{
 		t:        t,
 		client:   cs,
-		server:   sc,
+		server:   srv,
 		capacity: 100,
 		rchan:    make(chan Receiver),
 		schan:    make(chan Sender)}
@@ -107,26 +104,31 @@ func newPair(t testing.TB, cli, srv net.Conn, clientOpts, serverOpts []Connectio
 // AMQP pair linked by in-memory pipe
 func newPipe(t testing.TB, clientOpts, serverOpts []ConnectionOption) *pair {
 	cli, srv := net.Pipe()
-	return newPair(t, cli, srv, clientOpts, serverOpts)
+	opts := []ConnectionOption{Server(), ContainerId(t.Name() + "-server")}
+	sc, _ := NewConnection(srv, append(opts, serverOpts...)...)
+	opts = []ConnectionOption{ContainerId(t.Name() + "-client")}
+	cc, _ := NewConnection(cli, append(opts, clientOpts...)...)
+	return newPair(t, cc, sc)
 }
 
 // AMQP pair linked by TCP socket
 func newSocketPair(t testing.TB, clientOpts, serverOpts []ConnectionOption) *pair {
 	l, err := net.Listen("tcp4", ":0") // For systems with ipv6 disabled
 	fatalIfN(t, err, 1)
-	srvCh := make(chan net.Conn)
+	var srv Connection
 	var srvErr error
+	var wg sync.WaitGroup
+	wg.Add(1)
 	go func() {
-		var c net.Conn
-		c, srvErr = l.Accept()
-		srvCh <- c
+		defer wg.Done()
+		srv, srvErr = NewContainer(t.Name()+"-server").Accept(l, serverOpts...)
 	}()
 	addr := l.Addr()
-	cli, err := net.Dial(addr.Network(), addr.String())
+	cli, err := NewContainer(t.Name()+"-client").Dial(addr.Network(), addr.String(), clientOpts...)
 	fatalIfN(t, err, 1)
-	srv := <-srvCh
+	wg.Wait()
 	fatalIfN(t, srvErr, 1)
-	return newPair(t, cli, srv, clientOpts, serverOpts)
+	return newPair(t, cli, srv)
 }
 
 func (p *pair) close() { p.client.Connection().Close(nil); p.server.Close(nil) }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/55b27351/go/src/qpid.apache.org/electron/connection.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/connection.go b/go/src/qpid.apache.org/electron/connection.go
index 0d5e7c5..628d933 100644
--- a/go/src/qpid.apache.org/electron/connection.go
+++ b/go/src/qpid.apache.org/electron/connection.go
@@ -26,9 +26,10 @@ import (
 	"crypto/rand"
 	"encoding/hex"
 	"net"
-	"qpid.apache.org/proton"
 	"sync"
 	"time"
+
+	"qpid.apache.org/proton"
 )
 
 // Settings associated with a Connection.
@@ -147,7 +148,7 @@ func Password(password []byte) ConnectionOption {
 // net.Listener.Accept()
 //
 func Server() ConnectionOption {
-	return func(c *connection) { c.engine.Server(); c.server = true; AllowIncoming()(c) }
+	return func(c *connection) { c.setServer() }
 }
 
 // AllowIncoming returns a ConnectionOption to enable incoming endpoints, see
@@ -174,13 +175,13 @@ type connection struct {
 
 	defaultSessionOnce, closeOnce sync.Once
 
-	container   *container
-	conn        net.Conn
-	server      bool
-	incoming    chan Incoming
-	handler     *handler
-	engine      *proton.Engine
-	pConnection proton.Connection
+	container      *container
+	conn           net.Conn
+	server, client bool
+	incoming       chan Incoming
+	handler        *handler
+	engine         *proton.Engine
+	pConnection    proton.Connection
 
 	defaultSession Session
 }
@@ -198,8 +199,13 @@ func NewConnection(conn net.Conn, opts ...ConnectionOption) (*connection, error)
 		return nil, err
 	}
 	c.pConnection = c.engine.Connection()
-	for _, set := range opts {
-		set(c)
+	for _, opt := range opts {
+		opt(c)
+		// If the first option is not Server(), then we are a client.
+		// Applying Server() after other options is an error
+		if !c.server {
+			c.client = true
+		}
 	}
 	if c.container == nil {
 		// Generate a random container-id. Not an RFC4122-compliant UUID but probably-unique
@@ -216,6 +222,15 @@ func NewConnection(conn net.Conn, opts ...ConnectionOption) (*connection, error)
 	return c, nil
 }
 
+func (c *connection) setServer() {
+	if c.client {
+		panic("electron.Server() must be first in the ConnectionOption list")
+	}
+	c.server = true
+	c.engine.Server()
+	AllowIncoming()(c)
+}
+
 func (c *connection) run() {
 	if !c.server {
 		c.pConnection.Open()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/55b27351/go/src/qpid.apache.org/electron/container.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/container.go b/go/src/qpid.apache.org/electron/container.go
index 7c19aa5..9990706 100644
--- a/go/src/qpid.apache.org/electron/container.go
+++ b/go/src/qpid.apache.org/electron/container.go
@@ -21,9 +21,10 @@ package electron
 
 import (
 	"net"
-	"qpid.apache.org/proton"
 	"strconv"
 	"sync/atomic"
+
+	"qpid.apache.org/proton"
 )
 
 // Container is an AMQP container, it represents a single AMQP "application"
@@ -99,7 +100,7 @@ func (cont *container) Dial(network, address string, opts ...ConnectionOption) (
 func (cont *container) Accept(l net.Listener, opts ...ConnectionOption) (c Connection, err error) {
 	conn, err := l.Accept()
 	if err == nil {
-		c, err = cont.Connection(conn, append(opts, Server())...)
+		c, err = cont.Connection(conn, append([]ConnectionOption{Server()}, opts...)...)
 	}
 	return
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[08/12] qpid-proton git commit: PROTON-1956: [go] server does not close transport on unexpected disconnect

Posted by ac...@apache.org.
PROTON-1956: [go] server does not close transport on unexpected disconnect


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/2a84494c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/2a84494c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/2a84494c

Branch: refs/heads/master
Commit: 2a84494c4befb40c674116aeba7e563e68b9537d
Parents: 84844a9
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Oct 9 16:30:05 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 11 15:14:20 2018 -0400

----------------------------------------------------------------------
 go/src/qpid.apache.org/electron/electron_test.go |  7 ++++---
 go/src/qpid.apache.org/electron/handler.go       | 10 +++++++---
 go/src/qpid.apache.org/proton/handlers.go        |  7 +++++--
 3 files changed, 16 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2a84494c/go/src/qpid.apache.org/electron/electron_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/electron_test.go b/go/src/qpid.apache.org/electron/electron_test.go
index cf881e6..a648a18 100644
--- a/go/src/qpid.apache.org/electron/electron_test.go
+++ b/go/src/qpid.apache.org/electron/electron_test.go
@@ -21,9 +21,10 @@ package electron
 
 import (
 	"fmt"
-	"qpid.apache.org/amqp"
 	"testing"
 	"time"
+
+	"qpid.apache.org/amqp"
 )
 
 // Send a message one way with a client sender and server receiver, verify ack.
@@ -325,7 +326,7 @@ func TestHeartbeat(t *testing.T) {
 
 	unfreeze <- true // Unfreeze the server
 	<-p.server.Done()
-	if amqp.ResourceLimitExceeded != p.server.Error().(amqp.Error).Name {
-		t.Error("bad timeout error:", p.server.Error())
+	if p.server.Error() == nil {
+		t.Error("expected server side  time-out or connection abort error")
 	}
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2a84494c/go/src/qpid.apache.org/electron/handler.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/handler.go b/go/src/qpid.apache.org/electron/handler.go
index af1efd6..899d6a9 100644
--- a/go/src/qpid.apache.org/electron/handler.go
+++ b/go/src/qpid.apache.org/electron/handler.go
@@ -126,9 +126,13 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
 		h.shutdown(proton.EndpointError(e.Connection()))
 
 	case proton.MDisconnected:
-		err := e.Transport().Condition().Error()
-		if err == nil {
-			err = amqp.Errorf(amqp.IllegalState, "unexpected disconnect on %s", h.connection)
+		var err error
+		if err = e.Connection().RemoteCondition().Error(); err == nil {
+			if err = e.Connection().Condition().Error(); err == nil {
+				if err = e.Transport().Condition().Error(); err == nil {
+					err = amqp.Errorf(amqp.IllegalState, "unexpected disconnect on %s", h.connection)
+				}
+			}
 		}
 		h.shutdown(err)
 	}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2a84494c/go/src/qpid.apache.org/proton/handlers.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/proton/handlers.go b/go/src/qpid.apache.org/proton/handlers.go
index f101548..bbb5014 100644
--- a/go/src/qpid.apache.org/proton/handlers.go
+++ b/go/src/qpid.apache.org/proton/handlers.go
@@ -25,7 +25,6 @@ import "fmt"
 type EventHandler interface {
 	// HandleEvent is called with an event.
 	// Typically HandleEvent() is implemented as a switch on e.Type()
-	// Returning an error will stop the Engine.
 	HandleEvent(e Event)
 }
 
@@ -38,7 +37,6 @@ type EventHandler interface {
 type MessagingHandler interface {
 	// HandleMessagingEvent is called with  MessagingEvent.
 	// Typically HandleEvent() is implemented as a switch on e.Type()
-	// Returning an error will stop the Engine.
 	HandleMessagingEvent(MessagingEvent, Event)
 }
 
@@ -351,6 +349,11 @@ func (d *MessagingAdapter) HandleEvent(e Event) {
 			d.outgoing(e)
 		}
 
+	case ETransportTailClosed:
+		if !e.Connection().State().RemoteClosed() { // Unexpected transport closed
+			e.Transport().CloseHead() // Complete transport close, no connection close expected
+		}
+
 	case ETransportClosed:
 		d.mhandler.HandleMessagingEvent(MDisconnected, e)
 	}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[10/12] qpid-proton git commit: PROTON-1910: [go] proton.Link allow sending/receiving message as bytes

Posted by ac...@apache.org.
PROTON-1910: [go] proton.Link allow sending/receiving message as bytes


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/ef716fa0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/ef716fa0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/ef716fa0

Branch: refs/heads/master
Commit: ef716fa008e709efbab6ed64edfa53f0361c3262
Parents: 2a84494
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Oct 11 15:19:00 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 11 15:19:00 2018 -0400

----------------------------------------------------------------------
 go/src/qpid.apache.org/proton/message.go | 42 ++++++++++++++++++++-------
 1 file changed, 32 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef716fa0/go/src/qpid.apache.org/proton/message.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/proton/message.go b/go/src/qpid.apache.org/proton/message.go
index fbb1d48..7c166a5 100644
--- a/go/src/qpid.apache.org/proton/message.go
+++ b/go/src/qpid.apache.org/proton/message.go
@@ -26,9 +26,10 @@ import "C"
 
 import (
 	"fmt"
-	"qpid.apache.org/amqp"
 	"strconv"
 	"sync/atomic"
+
+	"qpid.apache.org/amqp"
 )
 
 // HasMessage is true if all message data is available.
@@ -41,7 +42,23 @@ func (d Delivery) HasMessage() bool { return !d.IsNil() && d.Readable() && !d.Pa
 // handling an MMessage event is always a safe context to call this function.
 //
 // Will return an error if message is incomplete or not current.
-func (delivery Delivery) Message() (m amqp.Message, err error) {
+func (delivery Delivery) Message() (amqp.Message, error) {
+	var err error
+	if bytes, err := delivery.MessageBytes(); err == nil {
+		m := amqp.NewMessage()
+		err = m.Decode(bytes)
+		return m, err
+	}
+	return nil, err
+}
+
+// MessageBytes extracts the raw message bytes contained in a delivery.
+//
+// Must be called in the correct link context with this delivery as the current message,
+// handling an MMessage event is always a safe context to call this function.
+//
+// Will return an error if message is incomplete or not current.
+func (delivery Delivery) MessageBytes() ([]byte, error) {
 	if !delivery.Readable() {
 		return nil, fmt.Errorf("delivery is not readable")
 	}
@@ -53,9 +70,7 @@ func (delivery Delivery) Message() (m amqp.Message, err error) {
 	if result != len(data) {
 		return nil, fmt.Errorf("cannot receive message: %s", PnErrorCode(result))
 	}
-	m = amqp.NewMessage()
-	err = m.Decode(data)
-	return
+	return data, nil
 }
 
 // Process-wide atomic counter for generating tag names
@@ -68,15 +83,22 @@ func nextTag() string {
 // Send sends a amqp.Message over a Link.
 // Returns a Delivery that can be use to determine the outcome of the message.
 func (link Link) Send(m amqp.Message) (Delivery, error) {
+	var err error
+	if bytes, err := m.Encode(nil); err == nil {
+		if d, err := link.SendMessageBytes(bytes); err == nil {
+			return d, err
+		}
+	}
+	return Delivery{}, err
+}
+
+// SendMessageBytes sends encoded bytes of an amqp.Message over a Link.
+// Returns a Delivery that can be use to determine the outcome of the message.
+func (link Link) SendMessageBytes(bytes []byte) (Delivery, error) {
 	if !link.IsSender() {
 		return Delivery{}, fmt.Errorf("attempt to send message on receiving link")
 	}
-
 	delivery := link.Delivery(nextTag())
-	bytes, err := m.Encode(nil)
-	if err != nil {
-		return Delivery{}, fmt.Errorf("cannot send message %s", err)
-	}
 	result := link.SendBytes(bytes)
 	link.Advance()
 	if result != len(bytes) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[03/12] qpid-proton git commit: NO-JIRA: longer timeout in heartbeat test, false failures

Posted by ac...@apache.org.
NO-JIRA: longer timeout in heartbeat test, false failures


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/32c7036c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/32c7036c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/32c7036c

Branch: refs/heads/master
Commit: 32c7036cc5745d55241195e4b36e0b516fd57b2c
Parents: f46076e
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Oct 10 16:12:11 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 11 12:33:08 2018 -0400

----------------------------------------------------------------------
 go/src/qpid.apache.org/electron/electron_test.go | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/32c7036c/go/src/qpid.apache.org/electron/electron_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/electron_test.go b/go/src/qpid.apache.org/electron/electron_test.go
index c8a51c7..cf881e6 100644
--- a/go/src/qpid.apache.org/electron/electron_test.go
+++ b/go/src/qpid.apache.org/electron/electron_test.go
@@ -291,8 +291,8 @@ func TestConnectionCloseInterrupt2(t *testing.T) {
 
 func TestHeartbeat(t *testing.T) {
 	p := newSocketPair(t,
-		[]ConnectionOption{Heartbeat(12 * time.Millisecond)},
-		[]ConnectionOption{Heartbeat(11 * time.Millisecond)})
+		[]ConnectionOption{Heartbeat(102 * time.Millisecond)},
+		[]ConnectionOption{Heartbeat(101 * time.Millisecond)})
 	defer func() { p.close() }()
 
 	// Function to freeze the server to stop it sending heartbeats.
@@ -301,8 +301,8 @@ func TestHeartbeat(t *testing.T) {
 	freeze := func() error { return p.server.(*connection).engine.Inject(func() { <-unfreeze }) }
 
 	fatalIf(t, p.client.Sync())
-	errorIf(t, checkEqual(11*time.Millisecond, p.client.Connection().Heartbeat()))
-	errorIf(t, checkEqual(12*time.Millisecond, p.server.Heartbeat()))
+	errorIf(t, checkEqual(101*time.Millisecond, p.client.Connection().Heartbeat()))
+	errorIf(t, checkEqual(102*time.Millisecond, p.server.Heartbeat()))
 
 	// Freeze the server for less than a heartbeat
 	fatalIf(t, freeze())


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[12/12] qpid-proton git commit: PROTON-1910: [go] move message encode/decode to handler thread

Posted by ac...@apache.org.
 PROTON-1910: [go] move message encode/decode to handler thread


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/4a9f3b98
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/4a9f3b98
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/4a9f3b98

Branch: refs/heads/master
Commit: 4a9f3b986693d61040c1db32f46355d04ce75499
Parents: 886d2b9
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Oct 11 15:23:00 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 11 18:01:44 2018 -0400

----------------------------------------------------------------------
 go/src/qpid.apache.org/amqp/message.go        |   3 -
 go/src/qpid.apache.org/electron/connection.go |  10 +-
 go/src/qpid.apache.org/electron/handler.go    |  32 +++---
 go/src/qpid.apache.org/electron/link.go       |   2 +-
 go/src/qpid.apache.org/electron/receiver.go   |   7 +-
 go/src/qpid.apache.org/electron/sender.go     | 128 ++++++++++++---------
 6 files changed, 106 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a9f3b98/go/src/qpid.apache.org/amqp/message.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/amqp/message.go b/go/src/qpid.apache.org/amqp/message.go
index 919904c..e96b331 100644
--- a/go/src/qpid.apache.org/amqp/message.go
+++ b/go/src/qpid.apache.org/amqp/message.go
@@ -325,9 +325,6 @@ func (m *message) SetApplicationProperties(x map[string]interface{}) {
 func (m *message) Marshal(v interface{}) { m.body = v }
 
 func (m *message) Unmarshal(v interface{}) {
-	// FIXME aconway 2018-09-28: this is inefficient, replace with a
-	// reflective conversion from the existing body value that respects
-	// the Unmarshal() rules.
 	pnData := C.pn_data(2)
 	marshal(m.body, pnData)
 	unmarshal(v, pnData)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a9f3b98/go/src/qpid.apache.org/electron/connection.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/connection.go b/go/src/qpid.apache.org/electron/connection.go
index 48cba60..464c4df 100644
--- a/go/src/qpid.apache.org/electron/connection.go
+++ b/go/src/qpid.apache.org/electron/connection.go
@@ -30,6 +30,7 @@ import (
 	"sync"
 	"time"
 
+	"qpid.apache.org/amqp"
 	"qpid.apache.org/proton"
 )
 
@@ -183,6 +184,7 @@ type connection struct {
 	handler        *handler
 	engine         *proton.Engine
 	pConnection    proton.Connection
+	mc             amqp.MessageCodec
 
 	defaultSession Session
 }
@@ -244,8 +246,12 @@ func (c *connection) run() {
 }
 
 func (c *connection) Close(err error) {
-	c.err.Set(err)
-	c.engine.Close(err)
+	c.closeOnce.Do(func() {
+		c.err.Set(err)
+		c.engine.Close(err)
+		c.mc.Close()
+	})
+
 }
 
 func (c *connection) Disconnect(err error) {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a9f3b98/go/src/qpid.apache.org/electron/handler.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/handler.go b/go/src/qpid.apache.org/electron/handler.go
index 899d6a9..753f9d3 100644
--- a/go/src/qpid.apache.org/electron/handler.go
+++ b/go/src/qpid.apache.org/electron/handler.go
@@ -27,19 +27,19 @@ import (
 // NOTE: methods in this file are called only in the proton goroutine unless otherwise indicated.
 
 type handler struct {
-	delegator    *proton.MessagingAdapter
-	connection   *connection
-	links        map[proton.Link]Endpoint
-	sentMessages map[proton.Delivery]sentMessage
-	sessions     map[proton.Session]*session
+	delegator  *proton.MessagingAdapter
+	connection *connection
+	links      map[proton.Link]Endpoint
+	sent       map[proton.Delivery]*sendable // Waiting for outcome
+	sessions   map[proton.Session]*session
 }
 
 func newHandler(c *connection) *handler {
 	h := &handler{
-		connection:   c,
-		links:        make(map[proton.Link]Endpoint),
-		sentMessages: make(map[proton.Delivery]sentMessage),
-		sessions:     make(map[proton.Session]*session),
+		connection: c,
+		links:      make(map[proton.Link]Endpoint),
+		sent:       make(map[proton.Delivery]*sendable),
+		sessions:   make(map[proton.Session]*session),
 	}
 	h.delegator = proton.NewMessagingAdapter(h)
 	// Disable auto features of MessagingAdapter, we do these ourselves.
@@ -65,15 +65,15 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
 		}
 
 	case proton.MSettled:
-		if sm, ok := h.sentMessages[e.Delivery()]; ok {
+		if sm, ok := h.sent[e.Delivery()]; ok {
 			d := e.Delivery().Remote()
-			sm.ack <- Outcome{sentStatus(d.Type()), d.Condition().Error(), sm.value}
-			delete(h.sentMessages, e.Delivery())
+			sm.ack <- Outcome{sentStatus(d.Type()), d.Condition().Error(), sm.v}
+			delete(h.sent, e.Delivery())
 		}
 
 	case proton.MSendable:
 		if s, ok := h.links[e.Link()].(*sender); ok {
-			s.sendable()
+			s.trySend()
 		} else {
 			h.linkError(e.Link(), "no sender")
 		}
@@ -182,10 +182,10 @@ func (h *handler) sessionClosed(ps proton.Session, err error) {
 
 func (h *handler) shutdown(err error) {
 	err = h.connection.closed(err)
-	for _, sm := range h.sentMessages {
+	for _, sm := range h.sent {
 		// Don't block but ensure outcome is sent eventually.
 		if sm.ack != nil {
-			o := Outcome{Unacknowledged, err, sm.value}
+			o := Outcome{Unacknowledged, err, sm.v}
 			select {
 			case sm.ack <- o:
 			default:
@@ -193,7 +193,7 @@ func (h *handler) shutdown(err error) {
 			}
 		}
 	}
-	h.sentMessages = nil
+	h.sent = nil
 	for _, l := range h.links {
 		_ = l.closed(err)
 	}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a9f3b98/go/src/qpid.apache.org/electron/link.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/link.go b/go/src/qpid.apache.org/electron/link.go
index de8a995..dd974f5 100644
--- a/go/src/qpid.apache.org/electron/link.go
+++ b/go/src/qpid.apache.org/electron/link.go
@@ -146,7 +146,7 @@ const (
 	// Messages are sent already settled
 	SndSettled = SndSettleMode(proton.SndSettled)
 	// Sender can send either unsettled or settled messages.
-	SendMixed = SndSettleMode(proton.SndMixed)
+	SndMixed = SndSettleMode(proton.SndMixed)
 )
 
 // RcvSettleMode defines when the receiving end of the link settles message delivery.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a9f3b98/go/src/qpid.apache.org/electron/receiver.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/receiver.go b/go/src/qpid.apache.org/electron/receiver.go
index 8e3b1a7..eba300b 100644
--- a/go/src/qpid.apache.org/electron/receiver.go
+++ b/go/src/qpid.apache.org/electron/receiver.go
@@ -162,7 +162,12 @@ func (r *receiver) message(delivery proton.Delivery) {
 		return
 	}
 	if delivery.HasMessage() {
-		m, err := delivery.Message()
+		bytes, err := delivery.MessageBytes()
+		var m amqp.Message
+		if err == nil {
+			m = amqp.NewMessage()
+			err = r.session.connection.mc.Decode(m, bytes)
+		}
 		if err != nil {
 			localClose(r.pLink, err)
 			return

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a9f3b98/go/src/qpid.apache.org/electron/sender.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/sender.go b/go/src/qpid.apache.org/electron/sender.go
index f46fdc4..98e6c9b 100644
--- a/go/src/qpid.apache.org/electron/sender.go
+++ b/go/src/qpid.apache.org/electron/sender.go
@@ -24,9 +24,10 @@ import "C"
 
 import (
 	"fmt"
+	"time"
+
 	"qpid.apache.org/amqp"
 	"qpid.apache.org/proton"
-	"time"
 )
 
 // Sender is a Link that sends messages.
@@ -148,54 +149,86 @@ func sentStatus(d uint64) SentStatus {
 	}
 }
 
-// Sender implementation, held by handler.
+type sendable struct {
+	m    amqp.Message
+	ack  chan<- Outcome // Channel for acknowledgement of m
+	v    interface{}    // Correlation value
+	sent chan struct{}  // Closed when m is encoded and will be sent
+}
+
+func (sm *sendable) unsent(err error) {
+	Outcome{Unsent, err, sm.v}.send(sm.ack)
+}
+
 type sender struct {
 	link
-	credit chan struct{} // Signal available credit.
+	sending []*sendable
 }
 
-func (s *sender) SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, v interface{}, t time.Duration) {
-	// wait for credit
-	if _, err := timedReceive(s.credit, t); err != nil {
-		if err == Closed && s.Error() != nil {
-			err = s.Error()
-		}
-		Outcome{Unsent, err, v}.send(ack)
+func newSender(ls linkSettings) *sender {
+	s := &sender{link: link{linkSettings: ls}}
+	s.endpoint.init(s.link.pLink.String())
+	s.handler().addLink(s.pLink, s)
+	s.link.pLink.Open()
+	return s
+}
+
+// Called in handler goroutine
+func (s *sender) startSend(sm *sendable) {
+	s.sending = append(s.sending, sm)
+	s.trySend()
+}
+
+// Called in handler goroutine
+func (s *sender) trySend() {
+	for s.pLink.Credit() > 0 && len(s.sending) > 0 {
+		sm := s.sending[0]
+		s.sending = s.sending[1:]
+		s.send(sm)
+	}
+}
+
+// Called in handler goroutine with credit > 0
+func (s *sender) send(sm *sendable) {
+	var err error
+	bytes, err := s.session.connection.mc.Encode(sm.m, nil)
+	close(sm.sent) // Safe to re-use sm.m now
+	if err != nil {
+		sm.unsent(err)
 		return
 	}
-	// Send a message in handler goroutine
-	err := s.engine().Inject(func() {
-		if s.Error() != nil {
-			Outcome{Unsent, s.Error(), v}.send(ack)
-			return
-		}
+	d, err := s.pLink.SendMessageBytes(bytes)
+	if err != nil {
+		sm.unsent(err)
+		return
+	}
+	if s.SndSettle() == SndSettled || (s.SndSettle() == SndMixed && sm.ack == nil) {
+		d.Settle()                                // Pre-settled
+		Outcome{Accepted, nil, sm.v}.send(sm.ack) // Assume accepted
+	} else {
+		// Register with handler to receive the remote outcome
+		s.handler().sent[d] = sm
+	}
+}
 
-		delivery, err2 := s.pLink.Send(m)
-		switch {
-		case err2 != nil:
-			Outcome{Unsent, err2, v}.send(ack)
-		case ack == nil || s.SndSettle() == SndSettled: // Pre-settled
-			if s.SndSettle() != SndUnsettled { // Not forced to send unsettled by link policy
-				delivery.Settle()
-			}
-			Outcome{Accepted, nil, v}.send(ack) // Assume accepted
-		default:
-			s.handler().sentMessages[delivery] = sentMessage{ack, v} // Register with handler
-		}
-		if s.pLink.Credit() > 0 { // Signal there is still credit
-			s.sendable()
+func (s *sender) timeoutSend(sm *sendable) {
+	for i, sm2 := range s.sending {
+		if sm2 == sm {
+			n := copy(s.sending[i:], s.sending[i+1:])
+			s.sending = s.sending[:i+n] // delete
+			close(sm.sent)
+			return
 		}
-	})
-	if err != nil {
-		Outcome{Unsent, err, v}.send(ack)
 	}
 }
 
-// Set credit flag if not already set. Non-blocking, any goroutine
-func (s *sender) sendable() {
-	select { // Non-blocking
-	case s.credit <- struct{}{}:
-	default:
+func (s *sender) SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, v interface{}, t time.Duration) {
+	sm := &sendable{m, ack, v, make(chan struct{})}
+	s.engine().Inject(func() { s.startSend(sm) })
+	select {
+	case <-sm.sent: // OK
+	case <-After(t): // Try to timeout sm
+		s.engine().Inject(func() { s.timeoutSend(sm) })
 	}
 }
 
@@ -244,24 +277,13 @@ func (s *sender) SendSync(m amqp.Message) Outcome {
 
 // handler goroutine
 func (s *sender) closed(err error) error {
-	close(s.credit)
+	for _, sm := range s.sending {
+		close(sm.sent)
+	}
+	s.sending = nil
 	return s.link.closed(err)
 }
 
-func newSender(ls linkSettings) *sender {
-	s := &sender{link: link{linkSettings: ls}, credit: make(chan struct{}, 1)}
-	s.endpoint.init(s.link.pLink.String())
-	s.handler().addLink(s.pLink, s)
-	s.link.pLink.Open()
-	return s
-}
-
-// sentMessage records a sent message on the handler.
-type sentMessage struct {
-	ack   chan<- Outcome
-	value interface{}
-}
-
 // IncomingSender is sent on the Connection.Incoming() channel when there is
 // an incoming request to open a sender link.
 type IncomingSender struct {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org