You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/10/11 22:21:34 UTC
[02/12] qpid-proton git commit: PROTON-1910: [go] test refactor and
benchmarks
PROTON-1910: [go] test refactor and benchmarks
- Simplify commmon test tools, move to commmon_test.go
- Use net.Pipe for most tests, more efficient than a full network socket
- Added simple benchmarks
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d9b4b989
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d9b4b989
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d9b4b989
Branch: refs/heads/master
Commit: d9b4b9893412676e947d693a7d4e336fad4d4110
Parents: 486fbaf
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Sep 21 00:51:45 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 11 12:32:59 2018 -0400
----------------------------------------------------------------------
go/src/qpid.apache.org/amqp/message_test.go | 138 ++++++--
go/src/qpid.apache.org/electron/auth_test.go | 39 +--
.../qpid.apache.org/electron/benchmark_test.go | 132 ++++++++
go/src/qpid.apache.org/electron/common_test.go | 148 ++++++++
.../qpid.apache.org/electron/electron_test.go | 337 ++++---------------
5 files changed, 459 insertions(+), 335 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9b4b989/go/src/qpid.apache.org/amqp/message_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/amqp/message_test.go b/go/src/qpid.apache.org/amqp/message_test.go
index 663e82f..b22c60d 100644
--- a/go/src/qpid.apache.org/amqp/message_test.go
+++ b/go/src/qpid.apache.org/amqp/message_test.go
@@ -20,6 +20,7 @@ under the License.
package amqp
import (
+ "reflect"
"testing"
"time"
)
@@ -28,9 +29,7 @@ func roundTrip(m Message) error {
var err error
if buffer, err := m.Encode(nil); err == nil {
if m2, err := DecodeMessage(buffer); err == nil {
- if err = checkEqual(m, m2); err == nil {
- err = checkEqual(m.String(), m2.String())
- }
+ err = checkEqual(m, m2)
}
}
return err
@@ -38,35 +37,40 @@ func roundTrip(m Message) error {
func TestDefaultMessage(t *testing.T) {
m := NewMessage()
+ mv := reflect.ValueOf(m)
// Check defaults
- for _, data := range [][]interface{}{
- {m.Inferred(), false},
- {m.Durable(), false},
- {m.Priority(), uint8(4)},
- {m.TTL(), time.Duration(0)},
- {m.UserId(), ""},
- {m.Address(), ""},
- {m.Subject(), ""},
- {m.ReplyTo(), ""},
- {m.ContentType(), ""},
- {m.ContentEncoding(), ""},
- {m.GroupId(), ""},
- {m.GroupSequence(), int32(0)},
- {m.ReplyToGroupId(), ""},
- {m.MessageId(), nil},
- {m.CorrelationId(), nil},
- {m.DeliveryAnnotations(), map[AnnotationKey]interface{}(nil)},
- {m.MessageAnnotations(), map[AnnotationKey]interface{}(nil)},
- {m.ApplicationProperties(), map[string]interface{}(nil)},
+ for _, x := range []struct {
+ method string
+ want interface{}
+ }{
+ {"Inferred", false},
+ {"Durable", false},
+ {"Priority", uint8(4)},
+ {"TTL", time.Duration(0)},
+ {"UserId", ""},
+ {"Address", ""},
+ {"Subject", ""},
+ {"ReplyTo", ""},
+ {"ContentType", ""},
+ {"ContentEncoding", ""},
+ {"GroupId", ""},
+ {"GroupSequence", int32(0)},
+ {"ReplyToGroupId", ""},
+ {"MessageId", nil},
+ {"CorrelationId", nil},
+ {"DeliveryAnnotations", map[AnnotationKey]interface{}(nil)},
+ {"MessageAnnotations", map[AnnotationKey]interface{}(nil)},
+ {"ApplicationProperties", map[string]interface{}(nil)},
// Deprecated
- {m.Instructions(), map[string]interface{}(nil)},
- {m.Annotations(), map[string]interface{}(nil)},
- {m.Properties(), map[string]interface{}(nil)},
- {m.Body(), nil},
+ {"Instructions", map[string]interface{}(nil)},
+ {"Annotations", map[string]interface{}(nil)},
+ {"Properties", map[string]interface{}(nil)},
+ {"Body", nil},
} {
- if err := checkEqual(data[0], data[1]); err != nil {
- t.Error(err)
+ ret := mv.MethodByName(x.method).Call(nil)
+ if err := checkEqual(x.want, ret[0].Interface()); err != nil {
+ t.Errorf("%s: %s", x.method, err)
}
}
if err := roundTrip(m); err != nil {
@@ -84,14 +88,17 @@ func TestMessageString(t *testing.T) {
m.SetDeliveryAnnotations(map[AnnotationKey]interface{}{AnnotationKeySymbol("instructions"): "foo"})
m.SetMessageAnnotations(map[AnnotationKey]interface{}{AnnotationKeySymbol("annotations"): "bar"})
m.SetApplicationProperties(map[string]interface{}{"int": int32(32)})
+ if err := roundTrip(m); err != nil {
+ t.Error(err)
+ }
msgstr := `Message{user_id="user", instructions={:instructions="foo"}, annotations={:annotations="bar"}, properties={"int"=32}, body="hello"}`
if err := checkEqual(msgstr, m.String()); err != nil {
t.Error(err)
}
}
-func TestMessageRoundTrip(t *testing.T) {
- m := NewMessage()
+// Set all message properties
+func setMessageProperties(m Message) Message {
m.SetInferred(false)
m.SetDurable(true)
m.SetPriority(42)
@@ -110,7 +117,25 @@ func TestMessageRoundTrip(t *testing.T) {
m.SetDeliveryAnnotations(map[AnnotationKey]interface{}{AnnotationKeySymbol("instructions"): "foo"})
m.SetMessageAnnotations(map[AnnotationKey]interface{}{AnnotationKeySymbol("annotations"): "bar"})
m.SetApplicationProperties(map[string]interface{}{"int": int32(32), "bool": true})
- m.Marshal("hello")
+ return m
+}
+
+func TestMessageRoundTrip(t *testing.T) {
+ m1 := NewMessage()
+ setMessageProperties(m1)
+ m1.Marshal("hello")
+
+ buffer, err := m1.Encode(nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ m, err := DecodeMessage(buffer)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if err = checkEqual(m1, m); err != nil {
+ t.Error(err)
+ }
for _, data := range [][]interface{}{
{m.Inferred(), false},
@@ -215,3 +240,54 @@ func TestMessageBodyTypes(t *testing.T) {
// TODO aconway 2015-09-08: array etc.
}
+
+// Benchmarks assign to package-scope variables to prevent being optimized out.
+var bmM Message
+var bmBuf []byte
+
+func BenchmarkNewMessageEmpty(b *testing.B) {
+ for n := 0; n < b.N; n++ {
+ bmM = NewMessage()
+ }
+}
+
+func BenchmarkNewMessageString(b *testing.B) {
+ for n := 0; n < b.N; n++ {
+ bmM = NewMessageWith("hello")
+ }
+}
+
+func BenchmarkNewMessageAll(b *testing.B) {
+ for n := 0; n < b.N; n++ {
+ bmM = setMessageProperties(NewMessageWith("hello"))
+ }
+}
+
+func BenchmarkEncode(b *testing.B) {
+ m := setMessageProperties(NewMessageWith("hello"))
+ var buf []byte
+ b.ResetTimer()
+ for n := 0; n < b.N; n++ {
+ buf, err := m.Encode(buf)
+ if err != nil {
+ b.Fatal(err)
+ }
+ bmBuf = buf
+ }
+}
+
+func BenchmarkDecode(b *testing.B) {
+ var buf []byte
+ buf, err := setMessageProperties(NewMessageWith("hello")).Encode(buf)
+ if err != nil {
+ b.Fatal(err)
+ }
+ m := NewMessage()
+ b.ResetTimer()
+ for n := 0; n < b.N; n++ {
+ if err := m.Decode(buf); err != nil {
+ b.Fatal(err)
+ }
+ bmM = m
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9b4b989/go/src/qpid.apache.org/electron/auth_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/auth_test.go b/go/src/qpid.apache.org/electron/auth_test.go
index 162b366..30ef401 100644
--- a/go/src/qpid.apache.org/electron/auth_test.go
+++ b/go/src/qpid.apache.org/electron/auth_test.go
@@ -29,57 +29,40 @@ import (
"testing"
)
-func testAuthClientServer(t *testing.T, copts []ConnectionOption, sopts []ConnectionOption) (got connectionSettings, err error) {
- client, server := newClientServerOpts(t, copts, sopts)
- defer closeClientServer(client, server)
-
- go func() {
- for in := range server.Incoming() {
- switch in := in.(type) {
- case *IncomingConnection:
- got = connectionSettings{user: in.User(), virtualHost: in.VirtualHost()}
- }
- in.Accept()
- }
- }()
-
- err = client.Sync()
- return
-}
-
func TestAuthAnonymous(t *testing.T) {
- got, err := testAuthClientServer(t,
+ p := newPipe(t,
[]ConnectionOption{User("fred"), VirtualHost("vhost"), SASLAllowInsecure(true)},
[]ConnectionOption{SASLAllowedMechs("ANONYMOUS"), SASLAllowInsecure(true)})
- fatalIf(t, err)
- errorIf(t, checkEqual(connectionSettings{user: "anonymous", virtualHost: "vhost"}, got))
+ fatalIf(t, p.server.Sync())
+ errorIf(t, checkEqual("anonymous", p.server.User()))
+ errorIf(t, checkEqual("vhost", p.server.VirtualHost()))
}
func TestAuthPlain(t *testing.T) {
extendedSASL.startTest(t)
- got, err := testAuthClientServer(t,
+ p := newPipe(t,
[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN"), User("fred@proton"), Password([]byte("xxx"))},
[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN")})
- fatalIf(t, err)
- errorIf(t, checkEqual(connectionSettings{user: "fred@proton"}, got))
+ fatalIf(t, p.server.Sync())
+ errorIf(t, checkEqual("fred@proton", p.server.User()))
}
func TestAuthBadPass(t *testing.T) {
extendedSASL.startTest(t)
- _, err := testAuthClientServer(t,
+ p := newPipe(t,
[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN"), User("fred@proton"), Password([]byte("yyy"))},
[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN")})
- if err == nil {
+ if p.server.Sync() == nil {
t.Error("Expected auth failure for bad pass")
}
}
func TestAuthBadUser(t *testing.T) {
extendedSASL.startTest(t)
- _, err := testAuthClientServer(t,
+ p := newPipe(t,
[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN"), User("foo@bar"), Password([]byte("yyy"))},
[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN")})
- if err == nil {
+ if p.server.Sync() == nil {
t.Error("Expected auth failure for bad user")
}
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9b4b989/go/src/qpid.apache.org/electron/benchmark_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/benchmark_test.go b/go/src/qpid.apache.org/electron/benchmark_test.go
new file mode 100644
index 0000000..ae9d47c
--- /dev/null
+++ b/go/src/qpid.apache.org/electron/benchmark_test.go
@@ -0,0 +1,132 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
+ "flag"
+ "strings"
+ "sync"
+ "testing"
+
+ "qpid.apache.org/amqp"
+)
+
+// To change capacity use
+// go test -bench=. -args -capacity 100
+var capacity = flag.Int("capacity", 1000, "Prefetch capacity")
+var bodySize = flag.Int("bodySize", 1000, "Message body size")
+
+type bmCommon struct {
+ b *testing.B
+ p *pair
+ s Sender
+ r Receiver
+ ack chan Outcome
+ done sync.WaitGroup
+}
+
+func makeBmCommon(p *pair, waitCount int) bmCommon {
+ bm := bmCommon{p: p, b: p.t.(*testing.B)}
+ bm.p.capacity = *capacity
+ bm.p.prefetch = true
+ bm.s, bm.r = p.sender()
+ bm.ack = make(chan Outcome, *capacity)
+ bm.done.Add(waitCount)
+ bm.b.ResetTimer()
+ return bm
+}
+
+func (bm *bmCommon) receiveAccept() {
+ defer bm.done.Done()
+ for n := 0; n < bm.b.N; n++ {
+ if rm, err := bm.r.Receive(); err != nil {
+ bm.b.Fatal(err)
+ } else {
+ fatalIf(bm.b, rm.Accept())
+ }
+ }
+}
+
+func (bm *bmCommon) outcomes() {
+ defer bm.done.Done()
+ for n := 0; n < bm.b.N; n++ {
+ fatalIf(bm.b, (<-bm.ack).Error)
+ }
+}
+
+var emptyMsg = amqp.NewMessage()
+
+func BenchmarkSendForget(b *testing.B) {
+ bm := makeBmCommon(newPipe(b, nil, nil), 1)
+ defer bm.p.close()
+
+ go func() { // Receive, no ack
+ defer bm.done.Done()
+ for n := 0; n < b.N; n++ {
+ if _, err := bm.r.Receive(); err != nil {
+ b.Fatal(err)
+ }
+ }
+ }()
+
+ for n := 0; n < b.N; n++ {
+ bm.s.SendForget(emptyMsg)
+ }
+ bm.done.Wait()
+}
+
+func BenchmarkSendSync(b *testing.B) {
+ bm := makeBmCommon(newPipe(b, nil, nil), 1)
+ defer bm.p.close()
+
+ go bm.receiveAccept()
+ for n := 0; n < b.N; n++ {
+ fatalIf(b, bm.s.SendSync(emptyMsg).Error)
+ }
+ bm.done.Wait()
+}
+
+func BenchmarkSendAsync(b *testing.B) {
+ bm := makeBmCommon(newPipe(b, nil, nil), 2)
+ defer bm.p.close()
+
+ go bm.outcomes() // Handle outcomes
+ go bm.receiveAccept() // Receive
+ for n := 0; n < b.N; n++ {
+ bm.s.SendAsync(emptyMsg, bm.ack, nil)
+ }
+ bm.done.Wait()
+}
+
+// Create a new message for each send, with body and property.
+func BenchmarkSendAsyncNewMessage(b *testing.B) {
+ body := strings.Repeat("x", *bodySize)
+ bm := makeBmCommon(newPipe(b, nil, nil), 2)
+ defer bm.p.close()
+
+ go bm.outcomes() // Handle outcomes
+ go bm.receiveAccept() // Receive
+ for n := 0; n < b.N; n++ {
+ msg := amqp.NewMessageWith(body)
+ msg.SetApplicationProperties(map[string]interface{}{"prop": "value"})
+ bm.s.SendAsync(msg, bm.ack, nil)
+ }
+ bm.done.Wait()
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9b4b989/go/src/qpid.apache.org/electron/common_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/common_test.go b/go/src/qpid.apache.org/electron/common_test.go
new file mode 100644
index 0000000..3aad825
--- /dev/null
+++ b/go/src/qpid.apache.org/electron/common_test.go
@@ -0,0 +1,148 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
+ "fmt"
+ "net"
+ "path"
+ "reflect"
+ "runtime"
+ "testing"
+)
+
+func decorate(err error, callDepth int) string {
+ _, file, line, _ := runtime.Caller(callDepth + 1) // annotate with location of caller.
+ _, file = path.Split(file)
+ return fmt.Sprintf("\n%s:%d: %v", file, line, err)
+}
+
+func fatalIfN(t testing.TB, err error, callDepth int) {
+ if err != nil {
+ t.Fatal(decorate(err, callDepth+1))
+ }
+}
+
+func fatalIf(t testing.TB, err error) {
+ fatalIfN(t, err, 1)
+}
+
+func errorIf(t testing.TB, err error) {
+ if err != nil {
+ t.Errorf(decorate(err, 1))
+ }
+}
+
+func checkEqual(want interface{}, got interface{}) error {
+ if !reflect.DeepEqual(want, got) {
+ return fmt.Errorf("(%#v != %#v)", want, got)
+ }
+ return nil
+}
+
+// AMQP client/server pair
+type pair struct {
+ t testing.TB
+ client Session
+ server Connection
+ capacity int
+ prefetch bool
+ rchan chan Receiver
+ schan chan Sender
+ auth connectionSettings
+}
+
+func newPair(t testing.TB, cli, srv net.Conn, clientOpts, serverOpts []ConnectionOption) *pair {
+ opts := append([]ConnectionOption{Server()}, serverOpts...)
+ sc, _ := NewConnection(srv, opts...)
+ opts = append([]ConnectionOption{}, clientOpts...)
+ cc, _ := NewConnection(cli, opts...)
+ cs, _ := cc.Session()
+ p := &pair{
+ t: t,
+ client: cs,
+ server: sc,
+ capacity: 100,
+ rchan: make(chan Receiver),
+ schan: make(chan Sender)}
+
+ go func() {
+ for i := range p.server.Incoming() {
+ switch i := i.(type) {
+ case *IncomingReceiver:
+ if p.capacity > 0 {
+ i.SetCapacity(p.capacity)
+ }
+ i.SetPrefetch(p.prefetch)
+ p.rchan <- i.Accept().(Receiver)
+ break
+ case *IncomingSender:
+ p.schan <- i.Accept().(Sender)
+ default:
+ i.Accept()
+ }
+ }
+ }()
+
+ return p
+}
+
+// AMQP pair linked by in-memory pipe
+func newPipe(t testing.TB, clientOpts, serverOpts []ConnectionOption) *pair {
+ cli, srv := net.Pipe()
+ return newPair(t, cli, srv, clientOpts, serverOpts)
+}
+
+// AMQP pair linked by TCP socket
+func newSocketPair(t testing.TB, clientOpts, serverOpts []ConnectionOption) *pair {
+ l, err := net.Listen("tcp4", ":0") // For systems with ipv6 disabled
+ fatalIfN(t, err, 1)
+ srvCh := make(chan net.Conn)
+ var srvErr error
+ go func() {
+ var c net.Conn
+ c, srvErr = l.Accept()
+ srvCh <- c
+ }()
+ addr := l.Addr()
+ cli, err := net.Dial(addr.Network(), addr.String())
+ fatalIfN(t, err, 1)
+ srv := <-srvCh
+ fatalIfN(t, srvErr, 1)
+ return newPair(t, cli, srv, clientOpts, serverOpts)
+}
+
+func (p *pair) close() { p.client.Connection().Close(nil); p.server.Close(nil) }
+
+// Return a client sender and server receiver
+func (p *pair) sender(opts ...LinkOption) (Sender, Receiver) {
+ snd, err := p.client.Sender(opts...)
+ fatalIfN(p.t, err, 2)
+ rcv := <-p.rchan
+ return snd, rcv
+}
+
+// Return a client receiver and server sender
+func (p *pair) receiver(opts ...LinkOption) (Receiver, Sender) {
+ rcv, err := p.client.Receiver(opts...)
+ fatalIfN(p.t, err, 2)
+ snd := <-p.schan
+ return rcv, snd
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9b4b989/go/src/qpid.apache.org/electron/electron_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/electron_test.go b/go/src/qpid.apache.org/electron/electron_test.go
index 74759f5..c8a51c7 100644
--- a/go/src/qpid.apache.org/electron/electron_test.go
+++ b/go/src/qpid.apache.org/electron/electron_test.go
@@ -21,119 +21,23 @@ package electron
import (
"fmt"
- "net"
- "path"
"qpid.apache.org/amqp"
- "reflect"
- "runtime"
"testing"
"time"
)
-func fatalIf(t *testing.T, err error) {
- if err != nil {
- _, file, line, ok := runtime.Caller(1) // annotate with location of caller.
- if ok {
- _, file = path.Split(file)
- }
- t.Fatalf("(from %s:%d) %v", file, line, err)
- }
-}
-
-func errorIf(t *testing.T, err error) {
- if err != nil {
- _, file, line, ok := runtime.Caller(1) // annotate with location of caller.
- if ok {
- _, file = path.Split(file)
- }
- t.Errorf("(from %s:%d) %v", file, line, err)
- }
-}
-
-func checkEqual(want interface{}, got interface{}) error {
- if !reflect.DeepEqual(want, got) {
- return fmt.Errorf("%#v != %#v", want, got)
- }
- return nil
-}
-
-// Start a server, return listening addr and channel for incoming Connections.
-func newServer(t *testing.T, cont Container, opts ...ConnectionOption) (net.Addr, <-chan Connection) {
- listener, err := net.Listen("tcp4", "") // For systems with ipv6 disabled
- fatalIf(t, err)
- addr := listener.Addr()
- ch := make(chan Connection)
- go func() {
- conn, err := listener.Accept()
- c, err := cont.Connection(conn, append([]ConnectionOption{Server()}, opts...)...)
- fatalIf(t, err)
- ch <- c
- }()
- return addr, ch
-}
-
-// Open a client connection and session, return the session.
-func newClient(t *testing.T, cont Container, addr net.Addr, opts ...ConnectionOption) Session {
- conn, err := net.Dial(addr.Network(), addr.String())
- fatalIf(t, err)
- // Don't bother checking error here, it's an async error so it's racy to do so anyway.
- // Let caller use Sync() or catch it on first use.
- c, _ := cont.Connection(conn, opts...)
- sn, _ := c.Session()
- return sn
-}
-
-// Return client and server ends of the same connection.
-func newClientServerOpts(t *testing.T, copts []ConnectionOption, sopts []ConnectionOption) (client Session, server Connection) {
- addr, ch := newServer(t, NewContainer("test-server"), sopts...)
- client = newClient(t, NewContainer("test-client"), addr, copts...)
- return client, <-ch
-}
-
-// Return client and server ends of the same connection.
-func newClientServer(t *testing.T) (client Session, server Connection) {
- return newClientServerOpts(t, nil, nil)
-}
-
-// Close client and server
-func closeClientServer(client Session, server Connection) {
- client.Connection().Close(nil)
- server.Close(nil)
-}
-
// Send a message one way with a client sender and server receiver, verify ack.
-func TestClientSendServerReceive(t *testing.T) {
+func TestClientSender(t *testing.T) {
+ p := newPipe(t, nil, nil)
+ defer func() { p.close() }()
+
nLinks := 3
nMessages := 3
- rchan := make(chan Receiver, nLinks)
- client, server := newClientServer(t)
- go func() {
- for in := range server.Incoming() {
- switch in := in.(type) {
- case *IncomingReceiver:
- in.SetCapacity(1)
- in.SetPrefetch(false)
- rchan <- in.Accept().(Receiver)
- default:
- in.Accept()
- }
- }
- }()
-
- defer func() { closeClientServer(client, server) }()
-
s := make([]Sender, nLinks)
- for i := 0; i < nLinks; i++ {
- var err error
- s[i], err = client.Sender(Target(fmt.Sprintf("foo%d", i)))
- if err != nil {
- t.Fatal(err)
- }
- }
r := make([]Receiver, nLinks)
for i := 0; i < nLinks; i++ {
- r[i] = <-rchan
+ s[i], r[i] = p.sender(Target(fmt.Sprintf("foo%d", i)))
}
for i := 0; i < nLinks; i++ {
@@ -146,16 +50,12 @@ func TestClientSendServerReceive(t *testing.T) {
m := amqp.NewMessageWith(fmt.Sprintf("foobar%v-%v", i, j))
var err error
s[i].SendAsync(m, ack, "testing")
- if err != nil {
- t.Fatal(err)
- }
+ fatalIf(t, err)
}()
// Server receive
rm, err := r[i].Receive()
- if err != nil {
- t.Fatal(err)
- }
+ fatalIf(t, err)
if want, got := interface{}(fmt.Sprintf("foobar%v-%v", i, j)), rm.Message.Body(); want != got {
t.Errorf("%#v != %#v", want, got)
}
@@ -182,76 +82,28 @@ func TestClientSendServerReceive(t *testing.T) {
func TestClientReceiver(t *testing.T) {
nMessages := 3
- client, server := newClientServer(t)
+ p := newPipe(t, nil, nil)
+ defer func() { p.close() }()
+ r, s := p.receiver(Source("foo"), Capacity(nMessages), Prefetch(true))
go func() {
- for in := range server.Incoming() {
- switch in := in.(type) {
- case *IncomingSender:
- s := in.Accept().(Sender)
- go func() {
- for i := int32(0); i < int32(nMessages); i++ {
- out := s.SendSync(amqp.NewMessageWith(i))
- if out.Error != nil {
- t.Error(out.Error)
- return
- }
- }
- s.Close(nil)
- }()
- default:
- in.Accept()
- }
+ for i := 0; i < nMessages; i++ { // Server sends
+ out := s.SendSync(amqp.NewMessageWith(int32(i)))
+ fatalIf(t, out.Error)
}
}()
-
- r, err := client.Receiver(Source("foo"))
- if err != nil {
- t.Fatal(err)
- }
- for i := int32(0); i < int32(nMessages); i++ {
+ for i := 0; i < nMessages; i++ { // Client receives
rm, err := r.Receive()
- if err != nil {
- if err != Closed {
- t.Error(err)
- }
- break
- }
- if err := rm.Accept(); err != nil {
- t.Error(err)
- }
- if b, ok := rm.Message.Body().(int32); !ok || b != i {
- t.Errorf("want %v, true got %v, %v", i, b, ok)
- }
+ fatalIf(t, err)
+ errorIf(t, checkEqual(int32(i), rm.Message.Body()))
+ errorIf(t, rm.Accept())
}
- server.Close(nil)
- client.Connection().Close(nil)
}
// Test timeout versions of waiting functions.
func TestTimeouts(t *testing.T) {
- var err error
- rchan := make(chan Receiver, 1)
- client, server := newClientServer(t)
- go func() {
- for i := range server.Incoming() {
- switch i := i.(type) {
- case *IncomingReceiver:
- i.SetCapacity(1)
- i.SetPrefetch(false)
- rchan <- i.Accept().(Receiver) // Issue credit only on receive
- default:
- i.Accept()
- }
- }
- }()
- defer func() { closeClientServer(client, server) }()
-
- // Open client sender
- snd, err := client.Sender(Target("test"))
- if err != nil {
- t.Fatal(err)
- }
- rcv := <-rchan
+ p := newPipe(t, nil, nil)
+ defer func() { p.close() }()
+ snd, rcv := p.sender(Target("test"))
// Test send with timeout
short := time.Millisecond
@@ -264,11 +116,11 @@ func TestTimeouts(t *testing.T) {
t.Error("want Timeout got", err)
}
// Test receive with timeout
- if _, err = rcv.ReceiveTimeout(0); err != Timeout { // No credit, expect timeout.
+ if _, err := rcv.ReceiveTimeout(0); err != Timeout { // No credit, expect timeout.
t.Error("want Timeout got", err)
}
// Test receive with timeout
- if _, err = rcv.ReceiveTimeout(short); err != Timeout { // No credit, expect timeout.
+ if _, err := rcv.ReceiveTimeout(short); err != Timeout { // No credit, expect timeout.
t.Error("want Timeout got", err)
}
// There is now a credit on the link due to receive
@@ -295,57 +147,6 @@ func TestTimeouts(t *testing.T) {
}
}
-// A server that returns the opposite end of each client link via channels.
-type pairs struct {
- t *testing.T
- client Session
- server Connection
- rchan chan Receiver
- schan chan Sender
- capacity int
- prefetch bool
-}
-
-func newPairs(t *testing.T, capacity int, prefetch bool) *pairs {
- p := &pairs{t: t, rchan: make(chan Receiver, 1), schan: make(chan Sender, 1)}
- p.client, p.server = newClientServer(t)
- go func() {
- for i := range p.server.Incoming() {
- switch i := i.(type) {
- case *IncomingReceiver:
- i.SetCapacity(capacity)
- i.SetPrefetch(prefetch)
- p.rchan <- i.Accept().(Receiver)
- case *IncomingSender:
- p.schan <- i.Accept().(Sender)
- default:
- i.Accept()
- }
- }
- }()
- return p
-}
-
-func (p *pairs) close() {
- closeClientServer(p.client, p.server)
-}
-
-// Return a client sender and server receiver
-func (p *pairs) senderReceiver() (Sender, Receiver) {
- snd, err := p.client.Sender()
- fatalIf(p.t, err)
- rcv := <-p.rchan
- return snd, rcv
-}
-
-// Return a client receiver and server sender
-func (p *pairs) receiverSender() (Receiver, Sender) {
- rcv, err := p.client.Receiver()
- fatalIf(p.t, err)
- snd := <-p.schan
- return rcv, snd
-}
-
type result struct {
label string
err error
@@ -370,8 +171,9 @@ func doDisposition(ack <-chan Outcome, results chan result) {
// Senders get credit immediately if receivers have prefetch set
func TestSendReceivePrefetch(t *testing.T) {
- pairs := newPairs(t, 1, true)
- s, r := pairs.senderReceiver()
+ p := newPipe(t, nil, nil)
+ p.prefetch = true
+ s, r := p.sender()
s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // Should not block for credit.
if _, err := r.Receive(); err != nil {
t.Error(err)
@@ -380,8 +182,9 @@ func TestSendReceivePrefetch(t *testing.T) {
// Senders do not get credit till Receive() if receivers don't have prefetch
func TestSendReceiveNoPrefetch(t *testing.T) {
- pairs := newPairs(t, 1, false)
- s, r := pairs.senderReceiver()
+ p := newPipe(t, nil, nil)
+ p.prefetch = false
+ s, r := p.sender()
done := make(chan struct{}, 1)
go func() {
s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // Should block for credit.
@@ -402,14 +205,14 @@ func TestSendReceiveNoPrefetch(t *testing.T) {
// Test that closing Links interrupts blocked link functions.
func TestLinkCloseInterrupt(t *testing.T) {
want := amqp.Error{Name: "x", Description: "all bad"}
- pairs := newPairs(t, 1, false)
+ p := newPipe(t, nil, nil)
results := make(chan result) // Collect expected errors
// Note closing the link does not interrupt Send() calls, the AMQP spec says
// that deliveries can be settled after the link is closed.
// Receiver.Close() interrupts Receive()
- snd, rcv := pairs.senderReceiver()
+ snd, rcv := p.sender()
go doReceive(rcv, results)
rcv.Close(want)
if r := <-results; want != r.err {
@@ -417,7 +220,7 @@ func TestLinkCloseInterrupt(t *testing.T) {
}
// Remote Sender.Close() interrupts Receive()
- snd, rcv = pairs.senderReceiver()
+ snd, rcv = p.sender()
go doReceive(rcv, results)
snd.Close(want)
if r := <-results; want != r.err {
@@ -428,27 +231,28 @@ func TestLinkCloseInterrupt(t *testing.T) {
// Test closing the server end of a connection.
func TestConnectionCloseInterrupt1(t *testing.T) {
want := amqp.Error{Name: "x", Description: "bad"}
- pairs := newPairs(t, 1, true)
+ p := newSocketPair(t, nil, nil)
+ p.prefetch = true
results := make(chan result) // Collect expected errors
// Connection.Close() interrupts Send, Receive, Disposition.
- snd, rcv := pairs.senderReceiver()
+ snd, rcv := p.sender()
go doSend(snd, results)
if _, err := rcv.Receive(); err != nil {
t.Error("receive", err)
}
- rcv, snd = pairs.receiverSender()
+ rcv, snd = p.receiver()
go doReceive(rcv, results)
- snd, rcv = pairs.senderReceiver()
+ snd, rcv = p.sender()
ack := snd.SendWaitable(amqp.NewMessage())
if _, err := rcv.Receive(); err != nil {
t.Error("receive", err)
}
go doDisposition(ack, results)
- pairs.server.Close(want)
+ p.server.Close(want)
for i := 0; i < 3; i++ {
if r := <-results; want != r.err {
t.Errorf("want %v got %v", want, r)
@@ -459,24 +263,25 @@ func TestConnectionCloseInterrupt1(t *testing.T) {
// Test closing the client end of the connection.
func TestConnectionCloseInterrupt2(t *testing.T) {
want := amqp.Error{Name: "x", Description: "bad"}
- pairs := newPairs(t, 1, true)
+ p := newSocketPair(t, nil, nil)
+ p.prefetch = true
results := make(chan result) // Collect expected errors
// Connection.Close() interrupts Send, Receive, Disposition.
- snd, rcv := pairs.senderReceiver()
+ snd, rcv := p.sender()
go doSend(snd, results)
if _, err := rcv.Receive(); err != nil {
t.Error("receive", err)
}
- rcv, snd = pairs.receiverSender()
+ rcv, snd = p.receiver()
go doReceive(rcv, results)
- snd, rcv = pairs.senderReceiver()
+ snd, rcv = p.sender()
ack := snd.SendWaitable(amqp.NewMessage())
go doDisposition(ack, results)
- pairs.client.Connection().Close(want)
+ p.client.Connection().Close(want)
for i := 0; i < 3; i++ {
if r := <-results; want != r.err {
t.Errorf("want %v got %v", want, r.err)
@@ -484,63 +289,43 @@ func TestConnectionCloseInterrupt2(t *testing.T) {
}
}
-func heartbeat(c Connection) time.Duration {
- return c.(*connection).engine.Transport().RemoteIdleTimeout()
-}
-
func TestHeartbeat(t *testing.T) {
- client, server := newClientServerOpts(t,
- []ConnectionOption{Heartbeat(102 * time.Millisecond)},
- nil)
- defer closeClientServer(client, server)
-
- var serverHeartbeat time.Duration
-
- go func() {
- for in := range server.Incoming() {
- switch in := in.(type) {
- case *IncomingConnection:
- serverHeartbeat = in.Heartbeat()
- in.AcceptConnection(Heartbeat(101 * time.Millisecond))
- default:
- in.Accept()
- }
- }
- }()
+ p := newSocketPair(t,
+ []ConnectionOption{Heartbeat(12 * time.Millisecond)},
+ []ConnectionOption{Heartbeat(11 * time.Millisecond)})
+ defer func() { p.close() }()
- // Freeze the server to stop it sending heartbeats.
+ // Function to freeze the server to stop it sending heartbeats.
unfreeze := make(chan bool)
defer close(unfreeze)
- freeze := func() error { return server.(*connection).engine.Inject(func() { <-unfreeze }) }
+ freeze := func() error { return p.server.(*connection).engine.Inject(func() { <-unfreeze }) }
- fatalIf(t, client.Sync())
- errorIf(t, checkEqual(101*time.Millisecond, heartbeat(client.Connection())))
- errorIf(t, checkEqual(102*time.Millisecond, serverHeartbeat))
- errorIf(t, client.Connection().Error())
+ fatalIf(t, p.client.Sync())
+ errorIf(t, checkEqual(11*time.Millisecond, p.client.Connection().Heartbeat()))
+ errorIf(t, checkEqual(12*time.Millisecond, p.server.Heartbeat()))
// Freeze the server for less than a heartbeat
fatalIf(t, freeze())
- time.Sleep(50 * time.Millisecond)
+ time.Sleep(5 * time.Millisecond)
unfreeze <- true
// Make sure server is still responding.
- s, err := client.Sender()
- errorIf(t, err)
+ s, _ := p.sender()
errorIf(t, s.Sync())
- // Freeze the server till the client times out the connection
+ // Freeze the server till the p.client times out the connection
fatalIf(t, freeze())
select {
- case <-client.Done():
- if amqp.ResourceLimitExceeded != client.Error().(amqp.Error).Name {
- t.Error("bad timeout error:", client.Error())
+ case <-p.client.Done():
+ if amqp.ResourceLimitExceeded != p.client.Error().(amqp.Error).Name {
+ t.Error("bad timeout error:", p.client.Error())
}
case <-time.After(400 * time.Millisecond):
t.Error("connection failed to time out")
}
unfreeze <- true // Unfreeze the server
- <-server.Done()
- if amqp.ResourceLimitExceeded != server.Error().(amqp.Error).Name {
- t.Error("bad timeout error:", server.Error())
+ <-p.server.Done()
+ if amqp.ResourceLimitExceeded != p.server.Error().(amqp.Error).Name {
+ t.Error("bad timeout error:", p.server.Error())
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org