You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/10/11 22:21:34 UTC

[02/12] qpid-proton git commit: PROTON-1910: [go] test refactor and benchmarks

PROTON-1910: [go] test refactor and benchmarks

- Simplify commmon test tools, move to commmon_test.go
- Use net.Pipe for most tests, more efficient than a full network socket
- Added simple benchmarks


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d9b4b989
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d9b4b989
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d9b4b989

Branch: refs/heads/master
Commit: d9b4b9893412676e947d693a7d4e336fad4d4110
Parents: 486fbaf
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Sep 21 00:51:45 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 11 12:32:59 2018 -0400

----------------------------------------------------------------------
 go/src/qpid.apache.org/amqp/message_test.go     | 138 ++++++--
 go/src/qpid.apache.org/electron/auth_test.go    |  39 +--
 .../qpid.apache.org/electron/benchmark_test.go  | 132 ++++++++
 go/src/qpid.apache.org/electron/common_test.go  | 148 ++++++++
 .../qpid.apache.org/electron/electron_test.go   | 337 ++++---------------
 5 files changed, 459 insertions(+), 335 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9b4b989/go/src/qpid.apache.org/amqp/message_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/amqp/message_test.go b/go/src/qpid.apache.org/amqp/message_test.go
index 663e82f..b22c60d 100644
--- a/go/src/qpid.apache.org/amqp/message_test.go
+++ b/go/src/qpid.apache.org/amqp/message_test.go
@@ -20,6 +20,7 @@ under the License.
 package amqp
 
 import (
+	"reflect"
 	"testing"
 	"time"
 )
@@ -28,9 +29,7 @@ func roundTrip(m Message) error {
 	var err error
 	if buffer, err := m.Encode(nil); err == nil {
 		if m2, err := DecodeMessage(buffer); err == nil {
-			if err = checkEqual(m, m2); err == nil {
-				err = checkEqual(m.String(), m2.String())
-			}
+			err = checkEqual(m, m2)
 		}
 	}
 	return err
@@ -38,35 +37,40 @@ func roundTrip(m Message) error {
 
 func TestDefaultMessage(t *testing.T) {
 	m := NewMessage()
+	mv := reflect.ValueOf(m)
 	// Check defaults
-	for _, data := range [][]interface{}{
-		{m.Inferred(), false},
-		{m.Durable(), false},
-		{m.Priority(), uint8(4)},
-		{m.TTL(), time.Duration(0)},
-		{m.UserId(), ""},
-		{m.Address(), ""},
-		{m.Subject(), ""},
-		{m.ReplyTo(), ""},
-		{m.ContentType(), ""},
-		{m.ContentEncoding(), ""},
-		{m.GroupId(), ""},
-		{m.GroupSequence(), int32(0)},
-		{m.ReplyToGroupId(), ""},
-		{m.MessageId(), nil},
-		{m.CorrelationId(), nil},
-		{m.DeliveryAnnotations(), map[AnnotationKey]interface{}(nil)},
-		{m.MessageAnnotations(), map[AnnotationKey]interface{}(nil)},
-		{m.ApplicationProperties(), map[string]interface{}(nil)},
+	for _, x := range []struct {
+		method string
+		want   interface{}
+	}{
+		{"Inferred", false},
+		{"Durable", false},
+		{"Priority", uint8(4)},
+		{"TTL", time.Duration(0)},
+		{"UserId", ""},
+		{"Address", ""},
+		{"Subject", ""},
+		{"ReplyTo", ""},
+		{"ContentType", ""},
+		{"ContentEncoding", ""},
+		{"GroupId", ""},
+		{"GroupSequence", int32(0)},
+		{"ReplyToGroupId", ""},
+		{"MessageId", nil},
+		{"CorrelationId", nil},
+		{"DeliveryAnnotations", map[AnnotationKey]interface{}(nil)},
+		{"MessageAnnotations", map[AnnotationKey]interface{}(nil)},
+		{"ApplicationProperties", map[string]interface{}(nil)},
 
 		// Deprecated
-		{m.Instructions(), map[string]interface{}(nil)},
-		{m.Annotations(), map[string]interface{}(nil)},
-		{m.Properties(), map[string]interface{}(nil)},
-		{m.Body(), nil},
+		{"Instructions", map[string]interface{}(nil)},
+		{"Annotations", map[string]interface{}(nil)},
+		{"Properties", map[string]interface{}(nil)},
+		{"Body", nil},
 	} {
-		if err := checkEqual(data[0], data[1]); err != nil {
-			t.Error(err)
+		ret := mv.MethodByName(x.method).Call(nil)
+		if err := checkEqual(x.want, ret[0].Interface()); err != nil {
+			t.Errorf("%s: %s", x.method, err)
 		}
 	}
 	if err := roundTrip(m); err != nil {
@@ -84,14 +88,17 @@ func TestMessageString(t *testing.T) {
 	m.SetDeliveryAnnotations(map[AnnotationKey]interface{}{AnnotationKeySymbol("instructions"): "foo"})
 	m.SetMessageAnnotations(map[AnnotationKey]interface{}{AnnotationKeySymbol("annotations"): "bar"})
 	m.SetApplicationProperties(map[string]interface{}{"int": int32(32)})
+	if err := roundTrip(m); err != nil {
+		t.Error(err)
+	}
 	msgstr := `Message{user_id="user", instructions={:instructions="foo"}, annotations={:annotations="bar"}, properties={"int"=32}, body="hello"}`
 	if err := checkEqual(msgstr, m.String()); err != nil {
 		t.Error(err)
 	}
 }
 
-func TestMessageRoundTrip(t *testing.T) {
-	m := NewMessage()
+// Set all message properties
+func setMessageProperties(m Message) Message {
 	m.SetInferred(false)
 	m.SetDurable(true)
 	m.SetPriority(42)
@@ -110,7 +117,25 @@ func TestMessageRoundTrip(t *testing.T) {
 	m.SetDeliveryAnnotations(map[AnnotationKey]interface{}{AnnotationKeySymbol("instructions"): "foo"})
 	m.SetMessageAnnotations(map[AnnotationKey]interface{}{AnnotationKeySymbol("annotations"): "bar"})
 	m.SetApplicationProperties(map[string]interface{}{"int": int32(32), "bool": true})
-	m.Marshal("hello")
+	return m
+}
+
+func TestMessageRoundTrip(t *testing.T) {
+	m1 := NewMessage()
+	setMessageProperties(m1)
+	m1.Marshal("hello")
+
+	buffer, err := m1.Encode(nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+	m, err := DecodeMessage(buffer)
+	if err != nil {
+		t.Fatal(err)
+	}
+	if err = checkEqual(m1, m); err != nil {
+		t.Error(err)
+	}
 
 	for _, data := range [][]interface{}{
 		{m.Inferred(), false},
@@ -215,3 +240,54 @@ func TestMessageBodyTypes(t *testing.T) {
 
 	// TODO aconway 2015-09-08: array etc.
 }
+
+// Benchmarks assign to package-scope variables to prevent being optimized out.
+var bmM Message
+var bmBuf []byte
+
+func BenchmarkNewMessageEmpty(b *testing.B) {
+	for n := 0; n < b.N; n++ {
+		bmM = NewMessage()
+	}
+}
+
+func BenchmarkNewMessageString(b *testing.B) {
+	for n := 0; n < b.N; n++ {
+		bmM = NewMessageWith("hello")
+	}
+}
+
+func BenchmarkNewMessageAll(b *testing.B) {
+	for n := 0; n < b.N; n++ {
+		bmM = setMessageProperties(NewMessageWith("hello"))
+	}
+}
+
+func BenchmarkEncode(b *testing.B) {
+	m := setMessageProperties(NewMessageWith("hello"))
+	var buf []byte
+	b.ResetTimer()
+	for n := 0; n < b.N; n++ {
+		buf, err := m.Encode(buf)
+		if err != nil {
+			b.Fatal(err)
+		}
+		bmBuf = buf
+	}
+}
+
+func BenchmarkDecode(b *testing.B) {
+	var buf []byte
+	buf, err := setMessageProperties(NewMessageWith("hello")).Encode(buf)
+	if err != nil {
+		b.Fatal(err)
+	}
+	m := NewMessage()
+	b.ResetTimer()
+	for n := 0; n < b.N; n++ {
+		if err := m.Decode(buf); err != nil {
+			b.Fatal(err)
+		}
+		bmM = m
+	}
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9b4b989/go/src/qpid.apache.org/electron/auth_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/auth_test.go b/go/src/qpid.apache.org/electron/auth_test.go
index 162b366..30ef401 100644
--- a/go/src/qpid.apache.org/electron/auth_test.go
+++ b/go/src/qpid.apache.org/electron/auth_test.go
@@ -29,57 +29,40 @@ import (
 	"testing"
 )
 
-func testAuthClientServer(t *testing.T, copts []ConnectionOption, sopts []ConnectionOption) (got connectionSettings, err error) {
-	client, server := newClientServerOpts(t, copts, sopts)
-	defer closeClientServer(client, server)
-
-	go func() {
-		for in := range server.Incoming() {
-			switch in := in.(type) {
-			case *IncomingConnection:
-				got = connectionSettings{user: in.User(), virtualHost: in.VirtualHost()}
-			}
-			in.Accept()
-		}
-	}()
-
-	err = client.Sync()
-	return
-}
-
 func TestAuthAnonymous(t *testing.T) {
-	got, err := testAuthClientServer(t,
+	p := newPipe(t,
 		[]ConnectionOption{User("fred"), VirtualHost("vhost"), SASLAllowInsecure(true)},
 		[]ConnectionOption{SASLAllowedMechs("ANONYMOUS"), SASLAllowInsecure(true)})
-	fatalIf(t, err)
-	errorIf(t, checkEqual(connectionSettings{user: "anonymous", virtualHost: "vhost"}, got))
+	fatalIf(t, p.server.Sync())
+	errorIf(t, checkEqual("anonymous", p.server.User()))
+	errorIf(t, checkEqual("vhost", p.server.VirtualHost()))
 }
 
 func TestAuthPlain(t *testing.T) {
 	extendedSASL.startTest(t)
-	got, err := testAuthClientServer(t,
+	p := newPipe(t,
 		[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN"), User("fred@proton"), Password([]byte("xxx"))},
 		[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN")})
-	fatalIf(t, err)
-	errorIf(t, checkEqual(connectionSettings{user: "fred@proton"}, got))
+	fatalIf(t, p.server.Sync())
+	errorIf(t, checkEqual("fred@proton", p.server.User()))
 }
 
 func TestAuthBadPass(t *testing.T) {
 	extendedSASL.startTest(t)
-	_, err := testAuthClientServer(t,
+	p := newPipe(t,
 		[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN"), User("fred@proton"), Password([]byte("yyy"))},
 		[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN")})
-	if err == nil {
+	if p.server.Sync() == nil {
 		t.Error("Expected auth failure for bad pass")
 	}
 }
 
 func TestAuthBadUser(t *testing.T) {
 	extendedSASL.startTest(t)
-	_, err := testAuthClientServer(t,
+	p := newPipe(t,
 		[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN"), User("foo@bar"), Password([]byte("yyy"))},
 		[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN")})
-	if err == nil {
+	if p.server.Sync() == nil {
 		t.Error("Expected auth failure for bad user")
 	}
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9b4b989/go/src/qpid.apache.org/electron/benchmark_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/benchmark_test.go b/go/src/qpid.apache.org/electron/benchmark_test.go
new file mode 100644
index 0000000..ae9d47c
--- /dev/null
+++ b/go/src/qpid.apache.org/electron/benchmark_test.go
@@ -0,0 +1,132 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
+	"flag"
+	"strings"
+	"sync"
+	"testing"
+
+	"qpid.apache.org/amqp"
+)
+
+// To change capacity use
+//     go test -bench=. -args -capacity 100
+var capacity = flag.Int("capacity", 1000, "Prefetch capacity")
+var bodySize = flag.Int("bodySize", 1000, "Message body size")
+
+type bmCommon struct {
+	b    *testing.B
+	p    *pair
+	s    Sender
+	r    Receiver
+	ack  chan Outcome
+	done sync.WaitGroup
+}
+
+func makeBmCommon(p *pair, waitCount int) bmCommon {
+	bm := bmCommon{p: p, b: p.t.(*testing.B)}
+	bm.p.capacity = *capacity
+	bm.p.prefetch = true
+	bm.s, bm.r = p.sender()
+	bm.ack = make(chan Outcome, *capacity)
+	bm.done.Add(waitCount)
+	bm.b.ResetTimer()
+	return bm
+}
+
+func (bm *bmCommon) receiveAccept() {
+	defer bm.done.Done()
+	for n := 0; n < bm.b.N; n++ {
+		if rm, err := bm.r.Receive(); err != nil {
+			bm.b.Fatal(err)
+		} else {
+			fatalIf(bm.b, rm.Accept())
+		}
+	}
+}
+
+func (bm *bmCommon) outcomes() {
+	defer bm.done.Done()
+	for n := 0; n < bm.b.N; n++ {
+		fatalIf(bm.b, (<-bm.ack).Error)
+	}
+}
+
+var emptyMsg = amqp.NewMessage()
+
+func BenchmarkSendForget(b *testing.B) {
+	bm := makeBmCommon(newPipe(b, nil, nil), 1)
+	defer bm.p.close()
+
+	go func() { // Receive, no ack
+		defer bm.done.Done()
+		for n := 0; n < b.N; n++ {
+			if _, err := bm.r.Receive(); err != nil {
+				b.Fatal(err)
+			}
+		}
+	}()
+
+	for n := 0; n < b.N; n++ {
+		bm.s.SendForget(emptyMsg)
+	}
+	bm.done.Wait()
+}
+
+func BenchmarkSendSync(b *testing.B) {
+	bm := makeBmCommon(newPipe(b, nil, nil), 1)
+	defer bm.p.close()
+
+	go bm.receiveAccept()
+	for n := 0; n < b.N; n++ {
+		fatalIf(b, bm.s.SendSync(emptyMsg).Error)
+	}
+	bm.done.Wait()
+}
+
+func BenchmarkSendAsync(b *testing.B) {
+	bm := makeBmCommon(newPipe(b, nil, nil), 2)
+	defer bm.p.close()
+
+	go bm.outcomes()      // Handle outcomes
+	go bm.receiveAccept() // Receive
+	for n := 0; n < b.N; n++ {
+		bm.s.SendAsync(emptyMsg, bm.ack, nil)
+	}
+	bm.done.Wait()
+}
+
+// Create a new message for each send, with body and property.
+func BenchmarkSendAsyncNewMessage(b *testing.B) {
+	body := strings.Repeat("x", *bodySize)
+	bm := makeBmCommon(newPipe(b, nil, nil), 2)
+	defer bm.p.close()
+
+	go bm.outcomes()      // Handle outcomes
+	go bm.receiveAccept() // Receive
+	for n := 0; n < b.N; n++ {
+		msg := amqp.NewMessageWith(body)
+		msg.SetApplicationProperties(map[string]interface{}{"prop": "value"})
+		bm.s.SendAsync(msg, bm.ack, nil)
+	}
+	bm.done.Wait()
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9b4b989/go/src/qpid.apache.org/electron/common_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/common_test.go b/go/src/qpid.apache.org/electron/common_test.go
new file mode 100644
index 0000000..3aad825
--- /dev/null
+++ b/go/src/qpid.apache.org/electron/common_test.go
@@ -0,0 +1,148 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
+	"fmt"
+	"net"
+	"path"
+	"reflect"
+	"runtime"
+	"testing"
+)
+
+func decorate(err error, callDepth int) string {
+	_, file, line, _ := runtime.Caller(callDepth + 1) // annotate with location of caller.
+	_, file = path.Split(file)
+	return fmt.Sprintf("\n%s:%d: %v", file, line, err)
+}
+
+func fatalIfN(t testing.TB, err error, callDepth int) {
+	if err != nil {
+		t.Fatal(decorate(err, callDepth+1))
+	}
+}
+
+func fatalIf(t testing.TB, err error) {
+	fatalIfN(t, err, 1)
+}
+
+func errorIf(t testing.TB, err error) {
+	if err != nil {
+		t.Errorf(decorate(err, 1))
+	}
+}
+
+func checkEqual(want interface{}, got interface{}) error {
+	if !reflect.DeepEqual(want, got) {
+		return fmt.Errorf("(%#v != %#v)", want, got)
+	}
+	return nil
+}
+
+// AMQP client/server pair
+type pair struct {
+	t        testing.TB
+	client   Session
+	server   Connection
+	capacity int
+	prefetch bool
+	rchan    chan Receiver
+	schan    chan Sender
+	auth     connectionSettings
+}
+
+func newPair(t testing.TB, cli, srv net.Conn, clientOpts, serverOpts []ConnectionOption) *pair {
+	opts := append([]ConnectionOption{Server()}, serverOpts...)
+	sc, _ := NewConnection(srv, opts...)
+	opts = append([]ConnectionOption{}, clientOpts...)
+	cc, _ := NewConnection(cli, opts...)
+	cs, _ := cc.Session()
+	p := &pair{
+		t:        t,
+		client:   cs,
+		server:   sc,
+		capacity: 100,
+		rchan:    make(chan Receiver),
+		schan:    make(chan Sender)}
+
+	go func() {
+		for i := range p.server.Incoming() {
+			switch i := i.(type) {
+			case *IncomingReceiver:
+				if p.capacity > 0 {
+					i.SetCapacity(p.capacity)
+				}
+				i.SetPrefetch(p.prefetch)
+				p.rchan <- i.Accept().(Receiver)
+				break
+			case *IncomingSender:
+				p.schan <- i.Accept().(Sender)
+			default:
+				i.Accept()
+			}
+		}
+	}()
+
+	return p
+}
+
+// AMQP pair linked by in-memory pipe
+func newPipe(t testing.TB, clientOpts, serverOpts []ConnectionOption) *pair {
+	cli, srv := net.Pipe()
+	return newPair(t, cli, srv, clientOpts, serverOpts)
+}
+
+// AMQP pair linked by TCP socket
+func newSocketPair(t testing.TB, clientOpts, serverOpts []ConnectionOption) *pair {
+	l, err := net.Listen("tcp4", ":0") // For systems with ipv6 disabled
+	fatalIfN(t, err, 1)
+	srvCh := make(chan net.Conn)
+	var srvErr error
+	go func() {
+		var c net.Conn
+		c, srvErr = l.Accept()
+		srvCh <- c
+	}()
+	addr := l.Addr()
+	cli, err := net.Dial(addr.Network(), addr.String())
+	fatalIfN(t, err, 1)
+	srv := <-srvCh
+	fatalIfN(t, srvErr, 1)
+	return newPair(t, cli, srv, clientOpts, serverOpts)
+}
+
+func (p *pair) close() { p.client.Connection().Close(nil); p.server.Close(nil) }
+
+// Return a client sender and server receiver
+func (p *pair) sender(opts ...LinkOption) (Sender, Receiver) {
+	snd, err := p.client.Sender(opts...)
+	fatalIfN(p.t, err, 2)
+	rcv := <-p.rchan
+	return snd, rcv
+}
+
+// Return a client receiver and server sender
+func (p *pair) receiver(opts ...LinkOption) (Receiver, Sender) {
+	rcv, err := p.client.Receiver(opts...)
+	fatalIfN(p.t, err, 2)
+	snd := <-p.schan
+	return rcv, snd
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9b4b989/go/src/qpid.apache.org/electron/electron_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/electron_test.go b/go/src/qpid.apache.org/electron/electron_test.go
index 74759f5..c8a51c7 100644
--- a/go/src/qpid.apache.org/electron/electron_test.go
+++ b/go/src/qpid.apache.org/electron/electron_test.go
@@ -21,119 +21,23 @@ package electron
 
 import (
 	"fmt"
-	"net"
-	"path"
 	"qpid.apache.org/amqp"
-	"reflect"
-	"runtime"
 	"testing"
 	"time"
 )
 
-func fatalIf(t *testing.T, err error) {
-	if err != nil {
-		_, file, line, ok := runtime.Caller(1) // annotate with location of caller.
-		if ok {
-			_, file = path.Split(file)
-		}
-		t.Fatalf("(from %s:%d) %v", file, line, err)
-	}
-}
-
-func errorIf(t *testing.T, err error) {
-	if err != nil {
-		_, file, line, ok := runtime.Caller(1) // annotate with location of caller.
-		if ok {
-			_, file = path.Split(file)
-		}
-		t.Errorf("(from %s:%d) %v", file, line, err)
-	}
-}
-
-func checkEqual(want interface{}, got interface{}) error {
-	if !reflect.DeepEqual(want, got) {
-		return fmt.Errorf("%#v != %#v", want, got)
-	}
-	return nil
-}
-
-// Start a server, return listening addr and channel for incoming Connections.
-func newServer(t *testing.T, cont Container, opts ...ConnectionOption) (net.Addr, <-chan Connection) {
-	listener, err := net.Listen("tcp4", "") // For systems with ipv6 disabled
-	fatalIf(t, err)
-	addr := listener.Addr()
-	ch := make(chan Connection)
-	go func() {
-		conn, err := listener.Accept()
-		c, err := cont.Connection(conn, append([]ConnectionOption{Server()}, opts...)...)
-		fatalIf(t, err)
-		ch <- c
-	}()
-	return addr, ch
-}
-
-// Open a client connection and session, return the session.
-func newClient(t *testing.T, cont Container, addr net.Addr, opts ...ConnectionOption) Session {
-	conn, err := net.Dial(addr.Network(), addr.String())
-	fatalIf(t, err)
-	// Don't  bother checking error here, it's an async error so it's racy to do so anyway.
-	// Let caller use Sync() or catch it on first use.
-	c, _ := cont.Connection(conn, opts...)
-	sn, _ := c.Session()
-	return sn
-}
-
-// Return client and server ends of the same connection.
-func newClientServerOpts(t *testing.T, copts []ConnectionOption, sopts []ConnectionOption) (client Session, server Connection) {
-	addr, ch := newServer(t, NewContainer("test-server"), sopts...)
-	client = newClient(t, NewContainer("test-client"), addr, copts...)
-	return client, <-ch
-}
-
-// Return client and server ends of the same connection.
-func newClientServer(t *testing.T) (client Session, server Connection) {
-	return newClientServerOpts(t, nil, nil)
-}
-
-// Close client and server
-func closeClientServer(client Session, server Connection) {
-	client.Connection().Close(nil)
-	server.Close(nil)
-}
-
 // Send a message one way with a client sender and server receiver, verify ack.
-func TestClientSendServerReceive(t *testing.T) {
+func TestClientSender(t *testing.T) {
+	p := newPipe(t, nil, nil)
+	defer func() { p.close() }()
+
 	nLinks := 3
 	nMessages := 3
 
-	rchan := make(chan Receiver, nLinks)
-	client, server := newClientServer(t)
-	go func() {
-		for in := range server.Incoming() {
-			switch in := in.(type) {
-			case *IncomingReceiver:
-				in.SetCapacity(1)
-				in.SetPrefetch(false)
-				rchan <- in.Accept().(Receiver)
-			default:
-				in.Accept()
-			}
-		}
-	}()
-
-	defer func() { closeClientServer(client, server) }()
-
 	s := make([]Sender, nLinks)
-	for i := 0; i < nLinks; i++ {
-		var err error
-		s[i], err = client.Sender(Target(fmt.Sprintf("foo%d", i)))
-		if err != nil {
-			t.Fatal(err)
-		}
-	}
 	r := make([]Receiver, nLinks)
 	for i := 0; i < nLinks; i++ {
-		r[i] = <-rchan
+		s[i], r[i] = p.sender(Target(fmt.Sprintf("foo%d", i)))
 	}
 
 	for i := 0; i < nLinks; i++ {
@@ -146,16 +50,12 @@ func TestClientSendServerReceive(t *testing.T) {
 				m := amqp.NewMessageWith(fmt.Sprintf("foobar%v-%v", i, j))
 				var err error
 				s[i].SendAsync(m, ack, "testing")
-				if err != nil {
-					t.Fatal(err)
-				}
+				fatalIf(t, err)
 			}()
 
 			// Server receive
 			rm, err := r[i].Receive()
-			if err != nil {
-				t.Fatal(err)
-			}
+			fatalIf(t, err)
 			if want, got := interface{}(fmt.Sprintf("foobar%v-%v", i, j)), rm.Message.Body(); want != got {
 				t.Errorf("%#v != %#v", want, got)
 			}
@@ -182,76 +82,28 @@ func TestClientSendServerReceive(t *testing.T) {
 
 func TestClientReceiver(t *testing.T) {
 	nMessages := 3
-	client, server := newClientServer(t)
+	p := newPipe(t, nil, nil)
+	defer func() { p.close() }()
+	r, s := p.receiver(Source("foo"), Capacity(nMessages), Prefetch(true))
 	go func() {
-		for in := range server.Incoming() {
-			switch in := in.(type) {
-			case *IncomingSender:
-				s := in.Accept().(Sender)
-				go func() {
-					for i := int32(0); i < int32(nMessages); i++ {
-						out := s.SendSync(amqp.NewMessageWith(i))
-						if out.Error != nil {
-							t.Error(out.Error)
-							return
-						}
-					}
-					s.Close(nil)
-				}()
-			default:
-				in.Accept()
-			}
+		for i := 0; i < nMessages; i++ { // Server sends
+			out := s.SendSync(amqp.NewMessageWith(int32(i)))
+			fatalIf(t, out.Error)
 		}
 	}()
-
-	r, err := client.Receiver(Source("foo"))
-	if err != nil {
-		t.Fatal(err)
-	}
-	for i := int32(0); i < int32(nMessages); i++ {
+	for i := 0; i < nMessages; i++ { // Client receives
 		rm, err := r.Receive()
-		if err != nil {
-			if err != Closed {
-				t.Error(err)
-			}
-			break
-		}
-		if err := rm.Accept(); err != nil {
-			t.Error(err)
-		}
-		if b, ok := rm.Message.Body().(int32); !ok || b != i {
-			t.Errorf("want %v, true got %v, %v", i, b, ok)
-		}
+		fatalIf(t, err)
+		errorIf(t, checkEqual(int32(i), rm.Message.Body()))
+		errorIf(t, rm.Accept())
 	}
-	server.Close(nil)
-	client.Connection().Close(nil)
 }
 
 // Test timeout versions of waiting functions.
 func TestTimeouts(t *testing.T) {
-	var err error
-	rchan := make(chan Receiver, 1)
-	client, server := newClientServer(t)
-	go func() {
-		for i := range server.Incoming() {
-			switch i := i.(type) {
-			case *IncomingReceiver:
-				i.SetCapacity(1)
-				i.SetPrefetch(false)
-				rchan <- i.Accept().(Receiver) // Issue credit only on receive
-			default:
-				i.Accept()
-			}
-		}
-	}()
-	defer func() { closeClientServer(client, server) }()
-
-	// Open client sender
-	snd, err := client.Sender(Target("test"))
-	if err != nil {
-		t.Fatal(err)
-	}
-	rcv := <-rchan
+	p := newPipe(t, nil, nil)
+	defer func() { p.close() }()
+	snd, rcv := p.sender(Target("test"))
 
 	// Test send with timeout
 	short := time.Millisecond
@@ -264,11 +116,11 @@ func TestTimeouts(t *testing.T) {
 		t.Error("want Timeout got", err)
 	}
 	// Test receive with timeout
-	if _, err = rcv.ReceiveTimeout(0); err != Timeout { // No credit, expect timeout.
+	if _, err := rcv.ReceiveTimeout(0); err != Timeout { // No credit, expect timeout.
 		t.Error("want Timeout got", err)
 	}
 	// Test receive with timeout
-	if _, err = rcv.ReceiveTimeout(short); err != Timeout { // No credit, expect timeout.
+	if _, err := rcv.ReceiveTimeout(short); err != Timeout { // No credit, expect timeout.
 		t.Error("want Timeout got", err)
 	}
 	// There is now a credit on the link due to receive
@@ -295,57 +147,6 @@ func TestTimeouts(t *testing.T) {
 	}
 }
 
-// A server that returns the opposite end of each client link via channels.
-type pairs struct {
-	t        *testing.T
-	client   Session
-	server   Connection
-	rchan    chan Receiver
-	schan    chan Sender
-	capacity int
-	prefetch bool
-}
-
-func newPairs(t *testing.T, capacity int, prefetch bool) *pairs {
-	p := &pairs{t: t, rchan: make(chan Receiver, 1), schan: make(chan Sender, 1)}
-	p.client, p.server = newClientServer(t)
-	go func() {
-		for i := range p.server.Incoming() {
-			switch i := i.(type) {
-			case *IncomingReceiver:
-				i.SetCapacity(capacity)
-				i.SetPrefetch(prefetch)
-				p.rchan <- i.Accept().(Receiver)
-			case *IncomingSender:
-				p.schan <- i.Accept().(Sender)
-			default:
-				i.Accept()
-			}
-		}
-	}()
-	return p
-}
-
-func (p *pairs) close() {
-	closeClientServer(p.client, p.server)
-}
-
-// Return a client sender and server receiver
-func (p *pairs) senderReceiver() (Sender, Receiver) {
-	snd, err := p.client.Sender()
-	fatalIf(p.t, err)
-	rcv := <-p.rchan
-	return snd, rcv
-}
-
-// Return a client receiver and server sender
-func (p *pairs) receiverSender() (Receiver, Sender) {
-	rcv, err := p.client.Receiver()
-	fatalIf(p.t, err)
-	snd := <-p.schan
-	return rcv, snd
-}
-
 type result struct {
 	label string
 	err   error
@@ -370,8 +171,9 @@ func doDisposition(ack <-chan Outcome, results chan result) {
 
 // Senders get credit immediately if receivers have prefetch set
 func TestSendReceivePrefetch(t *testing.T) {
-	pairs := newPairs(t, 1, true)
-	s, r := pairs.senderReceiver()
+	p := newPipe(t, nil, nil)
+	p.prefetch = true
+	s, r := p.sender()
 	s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // Should not block for credit.
 	if _, err := r.Receive(); err != nil {
 		t.Error(err)
@@ -380,8 +182,9 @@ func TestSendReceivePrefetch(t *testing.T) {
 
 // Senders do not get credit till Receive() if receivers don't have prefetch
 func TestSendReceiveNoPrefetch(t *testing.T) {
-	pairs := newPairs(t, 1, false)
-	s, r := pairs.senderReceiver()
+	p := newPipe(t, nil, nil)
+	p.prefetch = false
+	s, r := p.sender()
 	done := make(chan struct{}, 1)
 	go func() {
 		s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // Should block for credit.
@@ -402,14 +205,14 @@ func TestSendReceiveNoPrefetch(t *testing.T) {
 // Test that closing Links interrupts blocked link functions.
 func TestLinkCloseInterrupt(t *testing.T) {
 	want := amqp.Error{Name: "x", Description: "all bad"}
-	pairs := newPairs(t, 1, false)
+	p := newPipe(t, nil, nil)
 	results := make(chan result) // Collect expected errors
 
 	// Note closing the link does not interrupt Send() calls, the AMQP spec says
 	// that deliveries can be settled after the link is closed.
 
 	// Receiver.Close() interrupts Receive()
-	snd, rcv := pairs.senderReceiver()
+	snd, rcv := p.sender()
 	go doReceive(rcv, results)
 	rcv.Close(want)
 	if r := <-results; want != r.err {
@@ -417,7 +220,7 @@ func TestLinkCloseInterrupt(t *testing.T) {
 	}
 
 	// Remote Sender.Close() interrupts Receive()
-	snd, rcv = pairs.senderReceiver()
+	snd, rcv = p.sender()
 	go doReceive(rcv, results)
 	snd.Close(want)
 	if r := <-results; want != r.err {
@@ -428,27 +231,28 @@ func TestLinkCloseInterrupt(t *testing.T) {
 // Test closing the server end of a connection.
 func TestConnectionCloseInterrupt1(t *testing.T) {
 	want := amqp.Error{Name: "x", Description: "bad"}
-	pairs := newPairs(t, 1, true)
+	p := newSocketPair(t, nil, nil)
+	p.prefetch = true
 	results := make(chan result) // Collect expected errors
 
 	// Connection.Close() interrupts Send, Receive, Disposition.
-	snd, rcv := pairs.senderReceiver()
+	snd, rcv := p.sender()
 	go doSend(snd, results)
 
 	if _, err := rcv.Receive(); err != nil {
 		t.Error("receive", err)
 	}
-	rcv, snd = pairs.receiverSender()
+	rcv, snd = p.receiver()
 	go doReceive(rcv, results)
 
-	snd, rcv = pairs.senderReceiver()
+	snd, rcv = p.sender()
 	ack := snd.SendWaitable(amqp.NewMessage())
 	if _, err := rcv.Receive(); err != nil {
 		t.Error("receive", err)
 	}
 	go doDisposition(ack, results)
 
-	pairs.server.Close(want)
+	p.server.Close(want)
 	for i := 0; i < 3; i++ {
 		if r := <-results; want != r.err {
 			t.Errorf("want %v got %v", want, r)
@@ -459,24 +263,25 @@ func TestConnectionCloseInterrupt1(t *testing.T) {
 // Test closing the client end of the connection.
 func TestConnectionCloseInterrupt2(t *testing.T) {
 	want := amqp.Error{Name: "x", Description: "bad"}
-	pairs := newPairs(t, 1, true)
+	p := newSocketPair(t, nil, nil)
+	p.prefetch = true
 	results := make(chan result) // Collect expected errors
 
 	// Connection.Close() interrupts Send, Receive, Disposition.
-	snd, rcv := pairs.senderReceiver()
+	snd, rcv := p.sender()
 	go doSend(snd, results)
 	if _, err := rcv.Receive(); err != nil {
 		t.Error("receive", err)
 	}
 
-	rcv, snd = pairs.receiverSender()
+	rcv, snd = p.receiver()
 	go doReceive(rcv, results)
 
-	snd, rcv = pairs.senderReceiver()
+	snd, rcv = p.sender()
 	ack := snd.SendWaitable(amqp.NewMessage())
 	go doDisposition(ack, results)
 
-	pairs.client.Connection().Close(want)
+	p.client.Connection().Close(want)
 	for i := 0; i < 3; i++ {
 		if r := <-results; want != r.err {
 			t.Errorf("want %v got %v", want, r.err)
@@ -484,63 +289,43 @@ func TestConnectionCloseInterrupt2(t *testing.T) {
 	}
 }
 
-func heartbeat(c Connection) time.Duration {
-	return c.(*connection).engine.Transport().RemoteIdleTimeout()
-}
-
 func TestHeartbeat(t *testing.T) {
-	client, server := newClientServerOpts(t,
-		[]ConnectionOption{Heartbeat(102 * time.Millisecond)},
-		nil)
-	defer closeClientServer(client, server)
-
-	var serverHeartbeat time.Duration
-
-	go func() {
-		for in := range server.Incoming() {
-			switch in := in.(type) {
-			case *IncomingConnection:
-				serverHeartbeat = in.Heartbeat()
-				in.AcceptConnection(Heartbeat(101 * time.Millisecond))
-			default:
-				in.Accept()
-			}
-		}
-	}()
+	p := newSocketPair(t,
+		[]ConnectionOption{Heartbeat(12 * time.Millisecond)},
+		[]ConnectionOption{Heartbeat(11 * time.Millisecond)})
+	defer func() { p.close() }()
 
-	// Freeze the server to stop it sending heartbeats.
+	// Function to freeze the server to stop it sending heartbeats.
 	unfreeze := make(chan bool)
 	defer close(unfreeze)
-	freeze := func() error { return server.(*connection).engine.Inject(func() { <-unfreeze }) }
+	freeze := func() error { return p.server.(*connection).engine.Inject(func() { <-unfreeze }) }
 
-	fatalIf(t, client.Sync())
-	errorIf(t, checkEqual(101*time.Millisecond, heartbeat(client.Connection())))
-	errorIf(t, checkEqual(102*time.Millisecond, serverHeartbeat))
-	errorIf(t, client.Connection().Error())
+	fatalIf(t, p.client.Sync())
+	errorIf(t, checkEqual(11*time.Millisecond, p.client.Connection().Heartbeat()))
+	errorIf(t, checkEqual(12*time.Millisecond, p.server.Heartbeat()))
 
 	// Freeze the server for less than a heartbeat
 	fatalIf(t, freeze())
-	time.Sleep(50 * time.Millisecond)
+	time.Sleep(5 * time.Millisecond)
 	unfreeze <- true
 	// Make sure server is still responding.
-	s, err := client.Sender()
-	errorIf(t, err)
+	s, _ := p.sender()
 	errorIf(t, s.Sync())
 
-	// Freeze the server till the client times out the connection
+	// Freeze the server till the p.client times out the connection
 	fatalIf(t, freeze())
 	select {
-	case <-client.Done():
-		if amqp.ResourceLimitExceeded != client.Error().(amqp.Error).Name {
-			t.Error("bad timeout error:", client.Error())
+	case <-p.client.Done():
+		if amqp.ResourceLimitExceeded != p.client.Error().(amqp.Error).Name {
+			t.Error("bad timeout error:", p.client.Error())
 		}
 	case <-time.After(400 * time.Millisecond):
 		t.Error("connection failed to time out")
 	}
 
 	unfreeze <- true // Unfreeze the server
-	<-server.Done()
-	if amqp.ResourceLimitExceeded != server.Error().(amqp.Error).Name {
-		t.Error("bad timeout error:", server.Error())
+	<-p.server.Done()
+	if amqp.ResourceLimitExceeded != p.server.Error().(amqp.Error).Name {
+		t.Error("bad timeout error:", p.server.Error())
 	}
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org