You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/10/11 22:21:33 UTC
[01/12] qpid-proton git commit: PROTON-1953: [go] occasional
client/server hang with high volume of messages
Repository: qpid-proton
Updated Branches:
refs/heads/master e769f784e -> 4a9f3b986
PROTON-1953: [go] occasional client/server hang with high volume of messages
Due to race/deadlock in read/write and engine main goroutine. Simplified:
- start read/write goroutines as needed
- handle read/write completion via Inject(), no special channels
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/486fbaf0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/486fbaf0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/486fbaf0
Branch: refs/heads/master
Commit: 486fbaf034f2d89688bb678914d798a1a1595cc5
Parents: e769f78
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Oct 10 16:09:33 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 11 11:16:37 2018 -0400
----------------------------------------------------------------------
go/src/qpid.apache.org/proton/engine.go | 173 ++++++++++-----------------
1 file changed, 64 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/486fbaf0/go/src/qpid.apache.org/proton/engine.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/proton/engine.go b/go/src/qpid.apache.org/proton/engine.go
index d28a09f..af26a5f 100644
--- a/go/src/qpid.apache.org/proton/engine.go
+++ b/go/src/qpid.apache.org/proton/engine.go
@@ -103,15 +103,16 @@ type Engine struct {
err ErrorHolder
inject chan func()
- conn net.Conn
- connection Connection
- transport Transport
- collector *C.pn_collector_t
- handlers []EventHandler // Handlers for proton events.
- running chan struct{} // This channel will be closed when the goroutines are done.
- closeOnce sync.Once
- timer *time.Timer
- traceEvent bool
+ conn net.Conn
+ connection Connection
+ transport Transport
+ collector *C.pn_collector_t
+ handlers []EventHandler // Handlers for proton events.
+ running chan struct{} // This channel will be closed when the goroutines are done.
+ closeOnce sync.Once
+ timer *time.Timer
+ traceEvent bool
+ reading, writing bool
}
const bufferSize = 4096
@@ -255,6 +256,12 @@ func (eng *Engine) Disconnect(err error) {
<-eng.running
}
+// For debugging purposes: like Transport.Log() but takes a format string
+// and works even if the transport has been freed.
+func (eng *Engine) log(format string, args ...interface{}) {
+ fmt.Fprintf(os.Stderr, "[%p]: %v", eng.transport, fmt.Sprintf(format, args...))
+}
+
// Let proton run timed activity and set up the next tick
func (eng *Engine) tick() {
now := time.Now()
@@ -281,16 +288,50 @@ func (eng *Engine) dispatch() bool {
return !eng.transport.Closed() || C.pn_collector_peek(eng.collector) != nil
}
-func (eng *Engine) writeBuffer() []byte {
- size := eng.Transport().Pending() // Evaluate before Head(), may change buffer.
- start := eng.Transport().Head()
- return cByteSlice(start, size)
+func (eng *Engine) write() {
+ if !eng.writing {
+ size := eng.Transport().Pending() // Evaluate before Head(), may change buffer.
+ start := eng.Transport().Head()
+ if size > 0 {
+ eng.writing = true
+ go func() { // Blocking Write() in separate goroutineb
+ n, err := eng.conn.Write(cByteSlice(start, size))
+ eng.Inject(func() { // Inject results of Write back to engine goroutine
+ eng.writing = false
+ if n > 0 {
+ eng.transport.Pop(uint(n))
+ }
+ if err != nil {
+ eng.Transport().Condition().SetError(err)
+ eng.Transport().CloseHead()
+ }
+ })
+ }()
+ }
+ }
}
-func (eng *Engine) readBuffer() []byte {
- size := eng.Transport().Capacity()
- start := eng.Transport().Tail()
- return cByteSlice(start, size)
+func (eng *Engine) read() {
+ if !eng.reading {
+ size := eng.Transport().Capacity()
+ start := eng.Transport().Tail()
+ if size > 0 {
+ eng.reading = true
+ go func() { // Blocking Read in separate goroutine
+ n, err := eng.conn.Read(cByteSlice(start, size))
+ eng.Inject(func() {
+ eng.reading = false
+ if n > 0 {
+ eng.Transport().Process(uint(n))
+ }
+ if err != nil {
+ eng.Transport().Condition().SetError(err)
+ eng.Transport().CloseTail()
+ }
+ })
+ }()
+ }
+ }
}
func (eng *Engine) free() {
@@ -317,106 +358,20 @@ func (eng *Engine) Run() error {
defer eng.free()
eng.transport.Bind(eng.connection)
eng.tick() // Start ticking if needed
-
- // Channels for read and write buffers going in and out of the read/write goroutines.
- // The channels are unbuffered: we want to exchange buffers in sequence.
- readsIn, writesIn := make(chan []byte), make(chan []byte)
- readsOut, writesOut := make(chan []byte), make(chan []byte)
-
- wait := sync.WaitGroup{}
- wait.Add(2) // Read and write goroutines
-
- go func() { // Read goroutine
- defer wait.Done()
- for {
- rbuf, ok := <-readsIn
- if !ok {
- return
- }
- n, err := eng.conn.Read(rbuf)
- if n > 0 {
- readsOut <- rbuf[:n]
- } else if err != nil {
- _ = eng.Inject(func() {
- eng.Transport().Condition().SetError(err)
- eng.Transport().CloseTail()
- })
- return
- }
- }
- }()
-
- go func() { // Write goroutine
- defer wait.Done()
- for {
- wbuf, ok := <-writesIn
- if !ok {
- return
- }
- n, err := eng.conn.Write(wbuf)
- if n > 0 {
- writesOut <- wbuf[:n]
- } else if err != nil {
- _ = eng.Inject(func() {
- eng.Transport().Condition().SetError(err)
- eng.Transport().CloseHead()
- })
- return
- }
- }
- }()
-
for eng.dispatch() {
- readBuf := eng.readBuffer()
- writeBuf := eng.writeBuffer()
- // Note that getting the buffers can generate events (eg. SASL events) that
- // might close the transport. Check if we are already finished before
- // blocking for IO.
- if !eng.dispatch() {
- break
- }
-
- // sendReads/sendWrites are nil (not sendable in select) unless we have a
- // buffer to read/write
- var sendReads, sendWrites chan []byte
- if readBuf != nil {
- sendReads = readsIn
- }
- if writeBuf != nil {
- sendWrites = writesIn
- }
-
- // Send buffers to the read/write goroutines if we have them.
- // Get buffers from the read/write goroutines and process them
- // Check for injected functions
+ // Initiate read/write if needed
+ eng.read()
+ eng.write()
select {
-
- case sendReads <- readBuf:
-
- case sendWrites <- writeBuf:
-
- case buf := <-readsOut:
- eng.transport.Process(uint(len(buf)))
-
- case buf := <-writesOut:
- eng.transport.Pop(uint(len(buf)))
-
- case f, ok := <-eng.inject: // Function injected from another goroutine
- if ok {
- f()
- }
-
+ case f := <-eng.inject: // User or IO action
+ f()
case <-eng.timer.C:
eng.tick()
}
}
-
eng.err.Set(EndpointError(eng.Connection()))
eng.err.Set(eng.Transport().Condition().Error())
- close(readsIn)
- close(writesIn)
close(eng.running) // Signal goroutines have exited and Error is set, disable Inject()
- _ = eng.conn.Close() // Close conn, force read/write goroutines to exit (they will Inject)
- wait.Wait() // Wait for goroutines
+ _ = eng.conn.Close() // Close conn, force read goroutine to exit (Inject will fail)
return eng.err.Get()
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[05/12] qpid-proton git commit: PROTON-1954: [go] Container should
default to random container-id
Posted by ac...@apache.org.
PROTON-1954: [go] Container should default to random container-id
- default to random container-id
- add ContainerId option to set container ID without a Container object
- set remote heartbeat on incoming connection settings
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/c2491078
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/c2491078
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/c2491078
Branch: refs/heads/master
Commit: c24910781489de19669384180d96a23ca5a4fbcd
Parents: d9b4b98
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Sep 20 20:53:14 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 11 12:33:08 2018 -0400
----------------------------------------------------------------------
go/src/qpid.apache.org/electron/connection.go | 24 ++++++++++++++++++----
1 file changed, 20 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c2491078/go/src/qpid.apache.org/electron/connection.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/connection.go b/go/src/qpid.apache.org/electron/connection.go
index 295dd50..0d5e7c5 100644
--- a/go/src/qpid.apache.org/electron/connection.go
+++ b/go/src/qpid.apache.org/electron/connection.go
@@ -23,6 +23,8 @@ package electron
import "C"
import (
+ "crypto/rand"
+ "encoding/hex"
"net"
"qpid.apache.org/proton"
"sync"
@@ -106,7 +108,7 @@ func (c connectionSettings) User() string { return c.user }
func (c connectionSettings) VirtualHost() string { return c.virtualHost }
func (c connectionSettings) Heartbeat() time.Duration { return c.heartbeat }
-// ConnectionOption can be passed when creating a connection to configure various options
+// ConnectionOption arguments can be passed when creating a connection to configure it.
type ConnectionOption func(*connection)
// User returns a ConnectionOption sets the user name for a connection
@@ -160,6 +162,12 @@ func Parent(cont Container) ConnectionOption {
return func(c *connection) { c.container = cont.(*container) }
}
+// ContainerId returns a ConnectionOption that creates a new Container
+// with id and associates it with the connection
+func ContainerId(id string) ConnectionOption {
+ return func(c *connection) { c.container = NewContainer(id).(*container) }
+}
+
type connection struct {
endpoint
connectionSettings
@@ -178,6 +186,7 @@ type connection struct {
}
// NewConnection creates a connection with the given options.
+// Options are applied in order.
func NewConnection(conn net.Conn, opts ...ConnectionOption) (*connection, error) {
c := &connection{
conn: conn,
@@ -193,7 +202,12 @@ func NewConnection(conn net.Conn, opts ...ConnectionOption) (*connection, error)
set(c)
}
if c.container == nil {
- c.container = NewContainer("").(*container)
+ // Generate a random container-id. Not an RFC4122-compliant UUID but probably-unique
+ id := make([]byte, 16)
+ if _, err = rand.Read(id); err != nil {
+ return nil, err
+ }
+ c.container = NewContainer(hex.EncodeToString(id)).(*container)
}
c.pConnection.SetContainer(c.container.Id())
saslConfig.setup(c.engine)
@@ -294,14 +308,16 @@ type IncomingConnection struct {
func newIncomingConnection(c *connection) *IncomingConnection {
c.user = c.pConnection.Transport().User()
c.virtualHost = c.pConnection.RemoteHostname()
+ c.heartbeat = c.pConnection.Transport().RemoteIdleTimeout()
return &IncomingConnection{
incoming: makeIncoming(c.pConnection),
connectionSettings: c.connectionSettings,
c: c}
}
-// AcceptConnection is like Accept() but takes ConnectionOption s
-// For example you can set the Heartbeat() for the accepted connection.
+// AcceptConnection is like Accept() but takes ConnectionOption
+// arguments like NewConnection(). For example you can set the
+// Heartbeat() for the incoming connection.
func (in *IncomingConnection) AcceptConnection(opts ...ConnectionOption) Connection {
return in.accept(func() Endpoint {
for _, opt := range opts {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[02/12] qpid-proton git commit: PROTON-1910: [go] test refactor and
benchmarks
Posted by ac...@apache.org.
PROTON-1910: [go] test refactor and benchmarks
- Simplify commmon test tools, move to commmon_test.go
- Use net.Pipe for most tests, more efficient than a full network socket
- Added simple benchmarks
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d9b4b989
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d9b4b989
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d9b4b989
Branch: refs/heads/master
Commit: d9b4b9893412676e947d693a7d4e336fad4d4110
Parents: 486fbaf
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Sep 21 00:51:45 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 11 12:32:59 2018 -0400
----------------------------------------------------------------------
go/src/qpid.apache.org/amqp/message_test.go | 138 ++++++--
go/src/qpid.apache.org/electron/auth_test.go | 39 +--
.../qpid.apache.org/electron/benchmark_test.go | 132 ++++++++
go/src/qpid.apache.org/electron/common_test.go | 148 ++++++++
.../qpid.apache.org/electron/electron_test.go | 337 ++++---------------
5 files changed, 459 insertions(+), 335 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9b4b989/go/src/qpid.apache.org/amqp/message_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/amqp/message_test.go b/go/src/qpid.apache.org/amqp/message_test.go
index 663e82f..b22c60d 100644
--- a/go/src/qpid.apache.org/amqp/message_test.go
+++ b/go/src/qpid.apache.org/amqp/message_test.go
@@ -20,6 +20,7 @@ under the License.
package amqp
import (
+ "reflect"
"testing"
"time"
)
@@ -28,9 +29,7 @@ func roundTrip(m Message) error {
var err error
if buffer, err := m.Encode(nil); err == nil {
if m2, err := DecodeMessage(buffer); err == nil {
- if err = checkEqual(m, m2); err == nil {
- err = checkEqual(m.String(), m2.String())
- }
+ err = checkEqual(m, m2)
}
}
return err
@@ -38,35 +37,40 @@ func roundTrip(m Message) error {
func TestDefaultMessage(t *testing.T) {
m := NewMessage()
+ mv := reflect.ValueOf(m)
// Check defaults
- for _, data := range [][]interface{}{
- {m.Inferred(), false},
- {m.Durable(), false},
- {m.Priority(), uint8(4)},
- {m.TTL(), time.Duration(0)},
- {m.UserId(), ""},
- {m.Address(), ""},
- {m.Subject(), ""},
- {m.ReplyTo(), ""},
- {m.ContentType(), ""},
- {m.ContentEncoding(), ""},
- {m.GroupId(), ""},
- {m.GroupSequence(), int32(0)},
- {m.ReplyToGroupId(), ""},
- {m.MessageId(), nil},
- {m.CorrelationId(), nil},
- {m.DeliveryAnnotations(), map[AnnotationKey]interface{}(nil)},
- {m.MessageAnnotations(), map[AnnotationKey]interface{}(nil)},
- {m.ApplicationProperties(), map[string]interface{}(nil)},
+ for _, x := range []struct {
+ method string
+ want interface{}
+ }{
+ {"Inferred", false},
+ {"Durable", false},
+ {"Priority", uint8(4)},
+ {"TTL", time.Duration(0)},
+ {"UserId", ""},
+ {"Address", ""},
+ {"Subject", ""},
+ {"ReplyTo", ""},
+ {"ContentType", ""},
+ {"ContentEncoding", ""},
+ {"GroupId", ""},
+ {"GroupSequence", int32(0)},
+ {"ReplyToGroupId", ""},
+ {"MessageId", nil},
+ {"CorrelationId", nil},
+ {"DeliveryAnnotations", map[AnnotationKey]interface{}(nil)},
+ {"MessageAnnotations", map[AnnotationKey]interface{}(nil)},
+ {"ApplicationProperties", map[string]interface{}(nil)},
// Deprecated
- {m.Instructions(), map[string]interface{}(nil)},
- {m.Annotations(), map[string]interface{}(nil)},
- {m.Properties(), map[string]interface{}(nil)},
- {m.Body(), nil},
+ {"Instructions", map[string]interface{}(nil)},
+ {"Annotations", map[string]interface{}(nil)},
+ {"Properties", map[string]interface{}(nil)},
+ {"Body", nil},
} {
- if err := checkEqual(data[0], data[1]); err != nil {
- t.Error(err)
+ ret := mv.MethodByName(x.method).Call(nil)
+ if err := checkEqual(x.want, ret[0].Interface()); err != nil {
+ t.Errorf("%s: %s", x.method, err)
}
}
if err := roundTrip(m); err != nil {
@@ -84,14 +88,17 @@ func TestMessageString(t *testing.T) {
m.SetDeliveryAnnotations(map[AnnotationKey]interface{}{AnnotationKeySymbol("instructions"): "foo"})
m.SetMessageAnnotations(map[AnnotationKey]interface{}{AnnotationKeySymbol("annotations"): "bar"})
m.SetApplicationProperties(map[string]interface{}{"int": int32(32)})
+ if err := roundTrip(m); err != nil {
+ t.Error(err)
+ }
msgstr := `Message{user_id="user", instructions={:instructions="foo"}, annotations={:annotations="bar"}, properties={"int"=32}, body="hello"}`
if err := checkEqual(msgstr, m.String()); err != nil {
t.Error(err)
}
}
-func TestMessageRoundTrip(t *testing.T) {
- m := NewMessage()
+// Set all message properties
+func setMessageProperties(m Message) Message {
m.SetInferred(false)
m.SetDurable(true)
m.SetPriority(42)
@@ -110,7 +117,25 @@ func TestMessageRoundTrip(t *testing.T) {
m.SetDeliveryAnnotations(map[AnnotationKey]interface{}{AnnotationKeySymbol("instructions"): "foo"})
m.SetMessageAnnotations(map[AnnotationKey]interface{}{AnnotationKeySymbol("annotations"): "bar"})
m.SetApplicationProperties(map[string]interface{}{"int": int32(32), "bool": true})
- m.Marshal("hello")
+ return m
+}
+
+func TestMessageRoundTrip(t *testing.T) {
+ m1 := NewMessage()
+ setMessageProperties(m1)
+ m1.Marshal("hello")
+
+ buffer, err := m1.Encode(nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ m, err := DecodeMessage(buffer)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if err = checkEqual(m1, m); err != nil {
+ t.Error(err)
+ }
for _, data := range [][]interface{}{
{m.Inferred(), false},
@@ -215,3 +240,54 @@ func TestMessageBodyTypes(t *testing.T) {
// TODO aconway 2015-09-08: array etc.
}
+
+// Benchmarks assign to package-scope variables to prevent being optimized out.
+var bmM Message
+var bmBuf []byte
+
+func BenchmarkNewMessageEmpty(b *testing.B) {
+ for n := 0; n < b.N; n++ {
+ bmM = NewMessage()
+ }
+}
+
+func BenchmarkNewMessageString(b *testing.B) {
+ for n := 0; n < b.N; n++ {
+ bmM = NewMessageWith("hello")
+ }
+}
+
+func BenchmarkNewMessageAll(b *testing.B) {
+ for n := 0; n < b.N; n++ {
+ bmM = setMessageProperties(NewMessageWith("hello"))
+ }
+}
+
+func BenchmarkEncode(b *testing.B) {
+ m := setMessageProperties(NewMessageWith("hello"))
+ var buf []byte
+ b.ResetTimer()
+ for n := 0; n < b.N; n++ {
+ buf, err := m.Encode(buf)
+ if err != nil {
+ b.Fatal(err)
+ }
+ bmBuf = buf
+ }
+}
+
+func BenchmarkDecode(b *testing.B) {
+ var buf []byte
+ buf, err := setMessageProperties(NewMessageWith("hello")).Encode(buf)
+ if err != nil {
+ b.Fatal(err)
+ }
+ m := NewMessage()
+ b.ResetTimer()
+ for n := 0; n < b.N; n++ {
+ if err := m.Decode(buf); err != nil {
+ b.Fatal(err)
+ }
+ bmM = m
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9b4b989/go/src/qpid.apache.org/electron/auth_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/auth_test.go b/go/src/qpid.apache.org/electron/auth_test.go
index 162b366..30ef401 100644
--- a/go/src/qpid.apache.org/electron/auth_test.go
+++ b/go/src/qpid.apache.org/electron/auth_test.go
@@ -29,57 +29,40 @@ import (
"testing"
)
-func testAuthClientServer(t *testing.T, copts []ConnectionOption, sopts []ConnectionOption) (got connectionSettings, err error) {
- client, server := newClientServerOpts(t, copts, sopts)
- defer closeClientServer(client, server)
-
- go func() {
- for in := range server.Incoming() {
- switch in := in.(type) {
- case *IncomingConnection:
- got = connectionSettings{user: in.User(), virtualHost: in.VirtualHost()}
- }
- in.Accept()
- }
- }()
-
- err = client.Sync()
- return
-}
-
func TestAuthAnonymous(t *testing.T) {
- got, err := testAuthClientServer(t,
+ p := newPipe(t,
[]ConnectionOption{User("fred"), VirtualHost("vhost"), SASLAllowInsecure(true)},
[]ConnectionOption{SASLAllowedMechs("ANONYMOUS"), SASLAllowInsecure(true)})
- fatalIf(t, err)
- errorIf(t, checkEqual(connectionSettings{user: "anonymous", virtualHost: "vhost"}, got))
+ fatalIf(t, p.server.Sync())
+ errorIf(t, checkEqual("anonymous", p.server.User()))
+ errorIf(t, checkEqual("vhost", p.server.VirtualHost()))
}
func TestAuthPlain(t *testing.T) {
extendedSASL.startTest(t)
- got, err := testAuthClientServer(t,
+ p := newPipe(t,
[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN"), User("fred@proton"), Password([]byte("xxx"))},
[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN")})
- fatalIf(t, err)
- errorIf(t, checkEqual(connectionSettings{user: "fred@proton"}, got))
+ fatalIf(t, p.server.Sync())
+ errorIf(t, checkEqual("fred@proton", p.server.User()))
}
func TestAuthBadPass(t *testing.T) {
extendedSASL.startTest(t)
- _, err := testAuthClientServer(t,
+ p := newPipe(t,
[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN"), User("fred@proton"), Password([]byte("yyy"))},
[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN")})
- if err == nil {
+ if p.server.Sync() == nil {
t.Error("Expected auth failure for bad pass")
}
}
func TestAuthBadUser(t *testing.T) {
extendedSASL.startTest(t)
- _, err := testAuthClientServer(t,
+ p := newPipe(t,
[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN"), User("foo@bar"), Password([]byte("yyy"))},
[]ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN")})
- if err == nil {
+ if p.server.Sync() == nil {
t.Error("Expected auth failure for bad user")
}
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9b4b989/go/src/qpid.apache.org/electron/benchmark_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/benchmark_test.go b/go/src/qpid.apache.org/electron/benchmark_test.go
new file mode 100644
index 0000000..ae9d47c
--- /dev/null
+++ b/go/src/qpid.apache.org/electron/benchmark_test.go
@@ -0,0 +1,132 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
+ "flag"
+ "strings"
+ "sync"
+ "testing"
+
+ "qpid.apache.org/amqp"
+)
+
+// To change capacity use
+// go test -bench=. -args -capacity 100
+var capacity = flag.Int("capacity", 1000, "Prefetch capacity")
+var bodySize = flag.Int("bodySize", 1000, "Message body size")
+
+type bmCommon struct {
+ b *testing.B
+ p *pair
+ s Sender
+ r Receiver
+ ack chan Outcome
+ done sync.WaitGroup
+}
+
+func makeBmCommon(p *pair, waitCount int) bmCommon {
+ bm := bmCommon{p: p, b: p.t.(*testing.B)}
+ bm.p.capacity = *capacity
+ bm.p.prefetch = true
+ bm.s, bm.r = p.sender()
+ bm.ack = make(chan Outcome, *capacity)
+ bm.done.Add(waitCount)
+ bm.b.ResetTimer()
+ return bm
+}
+
+func (bm *bmCommon) receiveAccept() {
+ defer bm.done.Done()
+ for n := 0; n < bm.b.N; n++ {
+ if rm, err := bm.r.Receive(); err != nil {
+ bm.b.Fatal(err)
+ } else {
+ fatalIf(bm.b, rm.Accept())
+ }
+ }
+}
+
+func (bm *bmCommon) outcomes() {
+ defer bm.done.Done()
+ for n := 0; n < bm.b.N; n++ {
+ fatalIf(bm.b, (<-bm.ack).Error)
+ }
+}
+
+var emptyMsg = amqp.NewMessage()
+
+func BenchmarkSendForget(b *testing.B) {
+ bm := makeBmCommon(newPipe(b, nil, nil), 1)
+ defer bm.p.close()
+
+ go func() { // Receive, no ack
+ defer bm.done.Done()
+ for n := 0; n < b.N; n++ {
+ if _, err := bm.r.Receive(); err != nil {
+ b.Fatal(err)
+ }
+ }
+ }()
+
+ for n := 0; n < b.N; n++ {
+ bm.s.SendForget(emptyMsg)
+ }
+ bm.done.Wait()
+}
+
+func BenchmarkSendSync(b *testing.B) {
+ bm := makeBmCommon(newPipe(b, nil, nil), 1)
+ defer bm.p.close()
+
+ go bm.receiveAccept()
+ for n := 0; n < b.N; n++ {
+ fatalIf(b, bm.s.SendSync(emptyMsg).Error)
+ }
+ bm.done.Wait()
+}
+
+func BenchmarkSendAsync(b *testing.B) {
+ bm := makeBmCommon(newPipe(b, nil, nil), 2)
+ defer bm.p.close()
+
+ go bm.outcomes() // Handle outcomes
+ go bm.receiveAccept() // Receive
+ for n := 0; n < b.N; n++ {
+ bm.s.SendAsync(emptyMsg, bm.ack, nil)
+ }
+ bm.done.Wait()
+}
+
+// Create a new message for each send, with body and property.
+func BenchmarkSendAsyncNewMessage(b *testing.B) {
+ body := strings.Repeat("x", *bodySize)
+ bm := makeBmCommon(newPipe(b, nil, nil), 2)
+ defer bm.p.close()
+
+ go bm.outcomes() // Handle outcomes
+ go bm.receiveAccept() // Receive
+ for n := 0; n < b.N; n++ {
+ msg := amqp.NewMessageWith(body)
+ msg.SetApplicationProperties(map[string]interface{}{"prop": "value"})
+ bm.s.SendAsync(msg, bm.ack, nil)
+ }
+ bm.done.Wait()
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9b4b989/go/src/qpid.apache.org/electron/common_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/common_test.go b/go/src/qpid.apache.org/electron/common_test.go
new file mode 100644
index 0000000..3aad825
--- /dev/null
+++ b/go/src/qpid.apache.org/electron/common_test.go
@@ -0,0 +1,148 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
+ "fmt"
+ "net"
+ "path"
+ "reflect"
+ "runtime"
+ "testing"
+)
+
+func decorate(err error, callDepth int) string {
+ _, file, line, _ := runtime.Caller(callDepth + 1) // annotate with location of caller.
+ _, file = path.Split(file)
+ return fmt.Sprintf("\n%s:%d: %v", file, line, err)
+}
+
+func fatalIfN(t testing.TB, err error, callDepth int) {
+ if err != nil {
+ t.Fatal(decorate(err, callDepth+1))
+ }
+}
+
+func fatalIf(t testing.TB, err error) {
+ fatalIfN(t, err, 1)
+}
+
+func errorIf(t testing.TB, err error) {
+ if err != nil {
+ t.Errorf(decorate(err, 1))
+ }
+}
+
+func checkEqual(want interface{}, got interface{}) error {
+ if !reflect.DeepEqual(want, got) {
+ return fmt.Errorf("(%#v != %#v)", want, got)
+ }
+ return nil
+}
+
+// AMQP client/server pair
+type pair struct {
+ t testing.TB
+ client Session
+ server Connection
+ capacity int
+ prefetch bool
+ rchan chan Receiver
+ schan chan Sender
+ auth connectionSettings
+}
+
+func newPair(t testing.TB, cli, srv net.Conn, clientOpts, serverOpts []ConnectionOption) *pair {
+ opts := append([]ConnectionOption{Server()}, serverOpts...)
+ sc, _ := NewConnection(srv, opts...)
+ opts = append([]ConnectionOption{}, clientOpts...)
+ cc, _ := NewConnection(cli, opts...)
+ cs, _ := cc.Session()
+ p := &pair{
+ t: t,
+ client: cs,
+ server: sc,
+ capacity: 100,
+ rchan: make(chan Receiver),
+ schan: make(chan Sender)}
+
+ go func() {
+ for i := range p.server.Incoming() {
+ switch i := i.(type) {
+ case *IncomingReceiver:
+ if p.capacity > 0 {
+ i.SetCapacity(p.capacity)
+ }
+ i.SetPrefetch(p.prefetch)
+ p.rchan <- i.Accept().(Receiver)
+ break
+ case *IncomingSender:
+ p.schan <- i.Accept().(Sender)
+ default:
+ i.Accept()
+ }
+ }
+ }()
+
+ return p
+}
+
+// AMQP pair linked by in-memory pipe
+func newPipe(t testing.TB, clientOpts, serverOpts []ConnectionOption) *pair {
+ cli, srv := net.Pipe()
+ return newPair(t, cli, srv, clientOpts, serverOpts)
+}
+
+// AMQP pair linked by TCP socket
+func newSocketPair(t testing.TB, clientOpts, serverOpts []ConnectionOption) *pair {
+ l, err := net.Listen("tcp4", ":0") // For systems with ipv6 disabled
+ fatalIfN(t, err, 1)
+ srvCh := make(chan net.Conn)
+ var srvErr error
+ go func() {
+ var c net.Conn
+ c, srvErr = l.Accept()
+ srvCh <- c
+ }()
+ addr := l.Addr()
+ cli, err := net.Dial(addr.Network(), addr.String())
+ fatalIfN(t, err, 1)
+ srv := <-srvCh
+ fatalIfN(t, srvErr, 1)
+ return newPair(t, cli, srv, clientOpts, serverOpts)
+}
+
+func (p *pair) close() { p.client.Connection().Close(nil); p.server.Close(nil) }
+
+// Return a client sender and server receiver
+func (p *pair) sender(opts ...LinkOption) (Sender, Receiver) {
+ snd, err := p.client.Sender(opts...)
+ fatalIfN(p.t, err, 2)
+ rcv := <-p.rchan
+ return snd, rcv
+}
+
+// Return a client receiver and server sender
+func (p *pair) receiver(opts ...LinkOption) (Receiver, Sender) {
+ rcv, err := p.client.Receiver(opts...)
+ fatalIfN(p.t, err, 2)
+ snd := <-p.schan
+ return rcv, snd
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d9b4b989/go/src/qpid.apache.org/electron/electron_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/electron_test.go b/go/src/qpid.apache.org/electron/electron_test.go
index 74759f5..c8a51c7 100644
--- a/go/src/qpid.apache.org/electron/electron_test.go
+++ b/go/src/qpid.apache.org/electron/electron_test.go
@@ -21,119 +21,23 @@ package electron
import (
"fmt"
- "net"
- "path"
"qpid.apache.org/amqp"
- "reflect"
- "runtime"
"testing"
"time"
)
-func fatalIf(t *testing.T, err error) {
- if err != nil {
- _, file, line, ok := runtime.Caller(1) // annotate with location of caller.
- if ok {
- _, file = path.Split(file)
- }
- t.Fatalf("(from %s:%d) %v", file, line, err)
- }
-}
-
-func errorIf(t *testing.T, err error) {
- if err != nil {
- _, file, line, ok := runtime.Caller(1) // annotate with location of caller.
- if ok {
- _, file = path.Split(file)
- }
- t.Errorf("(from %s:%d) %v", file, line, err)
- }
-}
-
-func checkEqual(want interface{}, got interface{}) error {
- if !reflect.DeepEqual(want, got) {
- return fmt.Errorf("%#v != %#v", want, got)
- }
- return nil
-}
-
-// Start a server, return listening addr and channel for incoming Connections.
-func newServer(t *testing.T, cont Container, opts ...ConnectionOption) (net.Addr, <-chan Connection) {
- listener, err := net.Listen("tcp4", "") // For systems with ipv6 disabled
- fatalIf(t, err)
- addr := listener.Addr()
- ch := make(chan Connection)
- go func() {
- conn, err := listener.Accept()
- c, err := cont.Connection(conn, append([]ConnectionOption{Server()}, opts...)...)
- fatalIf(t, err)
- ch <- c
- }()
- return addr, ch
-}
-
-// Open a client connection and session, return the session.
-func newClient(t *testing.T, cont Container, addr net.Addr, opts ...ConnectionOption) Session {
- conn, err := net.Dial(addr.Network(), addr.String())
- fatalIf(t, err)
- // Don't bother checking error here, it's an async error so it's racy to do so anyway.
- // Let caller use Sync() or catch it on first use.
- c, _ := cont.Connection(conn, opts...)
- sn, _ := c.Session()
- return sn
-}
-
-// Return client and server ends of the same connection.
-func newClientServerOpts(t *testing.T, copts []ConnectionOption, sopts []ConnectionOption) (client Session, server Connection) {
- addr, ch := newServer(t, NewContainer("test-server"), sopts...)
- client = newClient(t, NewContainer("test-client"), addr, copts...)
- return client, <-ch
-}
-
-// Return client and server ends of the same connection.
-func newClientServer(t *testing.T) (client Session, server Connection) {
- return newClientServerOpts(t, nil, nil)
-}
-
-// Close client and server
-func closeClientServer(client Session, server Connection) {
- client.Connection().Close(nil)
- server.Close(nil)
-}
-
// Send a message one way with a client sender and server receiver, verify ack.
-func TestClientSendServerReceive(t *testing.T) {
+func TestClientSender(t *testing.T) {
+ p := newPipe(t, nil, nil)
+ defer func() { p.close() }()
+
nLinks := 3
nMessages := 3
- rchan := make(chan Receiver, nLinks)
- client, server := newClientServer(t)
- go func() {
- for in := range server.Incoming() {
- switch in := in.(type) {
- case *IncomingReceiver:
- in.SetCapacity(1)
- in.SetPrefetch(false)
- rchan <- in.Accept().(Receiver)
- default:
- in.Accept()
- }
- }
- }()
-
- defer func() { closeClientServer(client, server) }()
-
s := make([]Sender, nLinks)
- for i := 0; i < nLinks; i++ {
- var err error
- s[i], err = client.Sender(Target(fmt.Sprintf("foo%d", i)))
- if err != nil {
- t.Fatal(err)
- }
- }
r := make([]Receiver, nLinks)
for i := 0; i < nLinks; i++ {
- r[i] = <-rchan
+ s[i], r[i] = p.sender(Target(fmt.Sprintf("foo%d", i)))
}
for i := 0; i < nLinks; i++ {
@@ -146,16 +50,12 @@ func TestClientSendServerReceive(t *testing.T) {
m := amqp.NewMessageWith(fmt.Sprintf("foobar%v-%v", i, j))
var err error
s[i].SendAsync(m, ack, "testing")
- if err != nil {
- t.Fatal(err)
- }
+ fatalIf(t, err)
}()
// Server receive
rm, err := r[i].Receive()
- if err != nil {
- t.Fatal(err)
- }
+ fatalIf(t, err)
if want, got := interface{}(fmt.Sprintf("foobar%v-%v", i, j)), rm.Message.Body(); want != got {
t.Errorf("%#v != %#v", want, got)
}
@@ -182,76 +82,28 @@ func TestClientSendServerReceive(t *testing.T) {
func TestClientReceiver(t *testing.T) {
nMessages := 3
- client, server := newClientServer(t)
+ p := newPipe(t, nil, nil)
+ defer func() { p.close() }()
+ r, s := p.receiver(Source("foo"), Capacity(nMessages), Prefetch(true))
go func() {
- for in := range server.Incoming() {
- switch in := in.(type) {
- case *IncomingSender:
- s := in.Accept().(Sender)
- go func() {
- for i := int32(0); i < int32(nMessages); i++ {
- out := s.SendSync(amqp.NewMessageWith(i))
- if out.Error != nil {
- t.Error(out.Error)
- return
- }
- }
- s.Close(nil)
- }()
- default:
- in.Accept()
- }
+ for i := 0; i < nMessages; i++ { // Server sends
+ out := s.SendSync(amqp.NewMessageWith(int32(i)))
+ fatalIf(t, out.Error)
}
}()
-
- r, err := client.Receiver(Source("foo"))
- if err != nil {
- t.Fatal(err)
- }
- for i := int32(0); i < int32(nMessages); i++ {
+ for i := 0; i < nMessages; i++ { // Client receives
rm, err := r.Receive()
- if err != nil {
- if err != Closed {
- t.Error(err)
- }
- break
- }
- if err := rm.Accept(); err != nil {
- t.Error(err)
- }
- if b, ok := rm.Message.Body().(int32); !ok || b != i {
- t.Errorf("want %v, true got %v, %v", i, b, ok)
- }
+ fatalIf(t, err)
+ errorIf(t, checkEqual(int32(i), rm.Message.Body()))
+ errorIf(t, rm.Accept())
}
- server.Close(nil)
- client.Connection().Close(nil)
}
// Test timeout versions of waiting functions.
func TestTimeouts(t *testing.T) {
- var err error
- rchan := make(chan Receiver, 1)
- client, server := newClientServer(t)
- go func() {
- for i := range server.Incoming() {
- switch i := i.(type) {
- case *IncomingReceiver:
- i.SetCapacity(1)
- i.SetPrefetch(false)
- rchan <- i.Accept().(Receiver) // Issue credit only on receive
- default:
- i.Accept()
- }
- }
- }()
- defer func() { closeClientServer(client, server) }()
-
- // Open client sender
- snd, err := client.Sender(Target("test"))
- if err != nil {
- t.Fatal(err)
- }
- rcv := <-rchan
+ p := newPipe(t, nil, nil)
+ defer func() { p.close() }()
+ snd, rcv := p.sender(Target("test"))
// Test send with timeout
short := time.Millisecond
@@ -264,11 +116,11 @@ func TestTimeouts(t *testing.T) {
t.Error("want Timeout got", err)
}
// Test receive with timeout
- if _, err = rcv.ReceiveTimeout(0); err != Timeout { // No credit, expect timeout.
+ if _, err := rcv.ReceiveTimeout(0); err != Timeout { // No credit, expect timeout.
t.Error("want Timeout got", err)
}
// Test receive with timeout
- if _, err = rcv.ReceiveTimeout(short); err != Timeout { // No credit, expect timeout.
+ if _, err := rcv.ReceiveTimeout(short); err != Timeout { // No credit, expect timeout.
t.Error("want Timeout got", err)
}
// There is now a credit on the link due to receive
@@ -295,57 +147,6 @@ func TestTimeouts(t *testing.T) {
}
}
-// A server that returns the opposite end of each client link via channels.
-type pairs struct {
- t *testing.T
- client Session
- server Connection
- rchan chan Receiver
- schan chan Sender
- capacity int
- prefetch bool
-}
-
-func newPairs(t *testing.T, capacity int, prefetch bool) *pairs {
- p := &pairs{t: t, rchan: make(chan Receiver, 1), schan: make(chan Sender, 1)}
- p.client, p.server = newClientServer(t)
- go func() {
- for i := range p.server.Incoming() {
- switch i := i.(type) {
- case *IncomingReceiver:
- i.SetCapacity(capacity)
- i.SetPrefetch(prefetch)
- p.rchan <- i.Accept().(Receiver)
- case *IncomingSender:
- p.schan <- i.Accept().(Sender)
- default:
- i.Accept()
- }
- }
- }()
- return p
-}
-
-func (p *pairs) close() {
- closeClientServer(p.client, p.server)
-}
-
-// Return a client sender and server receiver
-func (p *pairs) senderReceiver() (Sender, Receiver) {
- snd, err := p.client.Sender()
- fatalIf(p.t, err)
- rcv := <-p.rchan
- return snd, rcv
-}
-
-// Return a client receiver and server sender
-func (p *pairs) receiverSender() (Receiver, Sender) {
- rcv, err := p.client.Receiver()
- fatalIf(p.t, err)
- snd := <-p.schan
- return rcv, snd
-}
-
type result struct {
label string
err error
@@ -370,8 +171,9 @@ func doDisposition(ack <-chan Outcome, results chan result) {
// Senders get credit immediately if receivers have prefetch set
func TestSendReceivePrefetch(t *testing.T) {
- pairs := newPairs(t, 1, true)
- s, r := pairs.senderReceiver()
+ p := newPipe(t, nil, nil)
+ p.prefetch = true
+ s, r := p.sender()
s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // Should not block for credit.
if _, err := r.Receive(); err != nil {
t.Error(err)
@@ -380,8 +182,9 @@ func TestSendReceivePrefetch(t *testing.T) {
// Senders do not get credit till Receive() if receivers don't have prefetch
func TestSendReceiveNoPrefetch(t *testing.T) {
- pairs := newPairs(t, 1, false)
- s, r := pairs.senderReceiver()
+ p := newPipe(t, nil, nil)
+ p.prefetch = false
+ s, r := p.sender()
done := make(chan struct{}, 1)
go func() {
s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // Should block for credit.
@@ -402,14 +205,14 @@ func TestSendReceiveNoPrefetch(t *testing.T) {
// Test that closing Links interrupts blocked link functions.
func TestLinkCloseInterrupt(t *testing.T) {
want := amqp.Error{Name: "x", Description: "all bad"}
- pairs := newPairs(t, 1, false)
+ p := newPipe(t, nil, nil)
results := make(chan result) // Collect expected errors
// Note closing the link does not interrupt Send() calls, the AMQP spec says
// that deliveries can be settled after the link is closed.
// Receiver.Close() interrupts Receive()
- snd, rcv := pairs.senderReceiver()
+ snd, rcv := p.sender()
go doReceive(rcv, results)
rcv.Close(want)
if r := <-results; want != r.err {
@@ -417,7 +220,7 @@ func TestLinkCloseInterrupt(t *testing.T) {
}
// Remote Sender.Close() interrupts Receive()
- snd, rcv = pairs.senderReceiver()
+ snd, rcv = p.sender()
go doReceive(rcv, results)
snd.Close(want)
if r := <-results; want != r.err {
@@ -428,27 +231,28 @@ func TestLinkCloseInterrupt(t *testing.T) {
// Test closing the server end of a connection.
func TestConnectionCloseInterrupt1(t *testing.T) {
want := amqp.Error{Name: "x", Description: "bad"}
- pairs := newPairs(t, 1, true)
+ p := newSocketPair(t, nil, nil)
+ p.prefetch = true
results := make(chan result) // Collect expected errors
// Connection.Close() interrupts Send, Receive, Disposition.
- snd, rcv := pairs.senderReceiver()
+ snd, rcv := p.sender()
go doSend(snd, results)
if _, err := rcv.Receive(); err != nil {
t.Error("receive", err)
}
- rcv, snd = pairs.receiverSender()
+ rcv, snd = p.receiver()
go doReceive(rcv, results)
- snd, rcv = pairs.senderReceiver()
+ snd, rcv = p.sender()
ack := snd.SendWaitable(amqp.NewMessage())
if _, err := rcv.Receive(); err != nil {
t.Error("receive", err)
}
go doDisposition(ack, results)
- pairs.server.Close(want)
+ p.server.Close(want)
for i := 0; i < 3; i++ {
if r := <-results; want != r.err {
t.Errorf("want %v got %v", want, r)
@@ -459,24 +263,25 @@ func TestConnectionCloseInterrupt1(t *testing.T) {
// Test closing the client end of the connection.
func TestConnectionCloseInterrupt2(t *testing.T) {
want := amqp.Error{Name: "x", Description: "bad"}
- pairs := newPairs(t, 1, true)
+ p := newSocketPair(t, nil, nil)
+ p.prefetch = true
results := make(chan result) // Collect expected errors
// Connection.Close() interrupts Send, Receive, Disposition.
- snd, rcv := pairs.senderReceiver()
+ snd, rcv := p.sender()
go doSend(snd, results)
if _, err := rcv.Receive(); err != nil {
t.Error("receive", err)
}
- rcv, snd = pairs.receiverSender()
+ rcv, snd = p.receiver()
go doReceive(rcv, results)
- snd, rcv = pairs.senderReceiver()
+ snd, rcv = p.sender()
ack := snd.SendWaitable(amqp.NewMessage())
go doDisposition(ack, results)
- pairs.client.Connection().Close(want)
+ p.client.Connection().Close(want)
for i := 0; i < 3; i++ {
if r := <-results; want != r.err {
t.Errorf("want %v got %v", want, r.err)
@@ -484,63 +289,43 @@ func TestConnectionCloseInterrupt2(t *testing.T) {
}
}
-func heartbeat(c Connection) time.Duration {
- return c.(*connection).engine.Transport().RemoteIdleTimeout()
-}
-
func TestHeartbeat(t *testing.T) {
- client, server := newClientServerOpts(t,
- []ConnectionOption{Heartbeat(102 * time.Millisecond)},
- nil)
- defer closeClientServer(client, server)
-
- var serverHeartbeat time.Duration
-
- go func() {
- for in := range server.Incoming() {
- switch in := in.(type) {
- case *IncomingConnection:
- serverHeartbeat = in.Heartbeat()
- in.AcceptConnection(Heartbeat(101 * time.Millisecond))
- default:
- in.Accept()
- }
- }
- }()
+ p := newSocketPair(t,
+ []ConnectionOption{Heartbeat(12 * time.Millisecond)},
+ []ConnectionOption{Heartbeat(11 * time.Millisecond)})
+ defer func() { p.close() }()
- // Freeze the server to stop it sending heartbeats.
+ // Function to freeze the server to stop it sending heartbeats.
unfreeze := make(chan bool)
defer close(unfreeze)
- freeze := func() error { return server.(*connection).engine.Inject(func() { <-unfreeze }) }
+ freeze := func() error { return p.server.(*connection).engine.Inject(func() { <-unfreeze }) }
- fatalIf(t, client.Sync())
- errorIf(t, checkEqual(101*time.Millisecond, heartbeat(client.Connection())))
- errorIf(t, checkEqual(102*time.Millisecond, serverHeartbeat))
- errorIf(t, client.Connection().Error())
+ fatalIf(t, p.client.Sync())
+ errorIf(t, checkEqual(11*time.Millisecond, p.client.Connection().Heartbeat()))
+ errorIf(t, checkEqual(12*time.Millisecond, p.server.Heartbeat()))
// Freeze the server for less than a heartbeat
fatalIf(t, freeze())
- time.Sleep(50 * time.Millisecond)
+ time.Sleep(5 * time.Millisecond)
unfreeze <- true
// Make sure server is still responding.
- s, err := client.Sender()
- errorIf(t, err)
+ s, _ := p.sender()
errorIf(t, s.Sync())
- // Freeze the server till the client times out the connection
+ // Freeze the server till the p.client times out the connection
fatalIf(t, freeze())
select {
- case <-client.Done():
- if amqp.ResourceLimitExceeded != client.Error().(amqp.Error).Name {
- t.Error("bad timeout error:", client.Error())
+ case <-p.client.Done():
+ if amqp.ResourceLimitExceeded != p.client.Error().(amqp.Error).Name {
+ t.Error("bad timeout error:", p.client.Error())
}
case <-time.After(400 * time.Millisecond):
t.Error("connection failed to time out")
}
unfreeze <- true // Unfreeze the server
- <-server.Done()
- if amqp.ResourceLimitExceeded != server.Error().(amqp.Error).Name {
- t.Error("bad timeout error:", server.Error())
+ <-p.server.Done()
+ if amqp.ResourceLimitExceeded != p.server.Error().(amqp.Error).Name {
+ t.Error("bad timeout error:", p.server.Error())
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[07/12] qpid-proton git commit: PROTON-1955: [go] incorrect
conversion between Go time and AMQP time
Posted by ac...@apache.org.
PROTON-1955: [go] incorrect conversion between Go time and AMQP time
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/ccaeaa0c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/ccaeaa0c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/ccaeaa0c
Branch: refs/heads/master
Commit: ccaeaa0cbcc1364b3265984ef15f5d92c6087793
Parents: 32c7036
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Oct 5 15:38:48 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 11 15:14:15 2018 -0400
----------------------------------------------------------------------
go/src/qpid.apache.org/amqp/marshal.go | 2 +-
go/src/qpid.apache.org/amqp/types.go | 26 ++++++++++++++++--------
go/src/qpid.apache.org/amqp/types_test.go | 28 +++++++++++++++++++++++++-
go/src/qpid.apache.org/amqp/unmarshal.go | 4 ++--
4 files changed, 48 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccaeaa0c/go/src/qpid.apache.org/amqp/marshal.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/amqp/marshal.go b/go/src/qpid.apache.org/amqp/marshal.go
index 99584a2..d846993 100644
--- a/go/src/qpid.apache.org/amqp/marshal.go
+++ b/go/src/qpid.apache.org/amqp/marshal.go
@@ -224,7 +224,7 @@ func marshal(i interface{}, data *C.pn_data_t) {
// Other simple types
case time.Time:
- C.pn_data_put_timestamp(data, C.pn_timestamp_t(v.UnixNano()/1000))
+ C.pn_data_put_timestamp(data, pnTime(v))
case UUID:
C.pn_data_put_uuid(data, *(*C.pn_uuid_t)(unsafe.Pointer(&v[0])))
case Char:
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccaeaa0c/go/src/qpid.apache.org/amqp/types.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/amqp/types.go b/go/src/qpid.apache.org/amqp/types.go
index a1fe2ac..04226ff 100644
--- a/go/src/qpid.apache.org/amqp/types.go
+++ b/go/src/qpid.apache.org/amqp/types.go
@@ -163,19 +163,29 @@ func (l List) GoString() string {
}
// pnTime converts Go time.Time to Proton millisecond Unix time.
+// Take care to convert zero values to zero values.
func pnTime(t time.Time) C.pn_timestamp_t {
- secs := t.Unix()
- // Note: sub-second accuracy is not guaranteed if the Unix time in
- // nanoseconds cannot be represented by an int64 (sometime around year 2260)
- msecs := (t.UnixNano() % int64(time.Second)) / int64(time.Millisecond)
- return C.pn_timestamp_t(secs*1000 + msecs)
+ if t.IsZero() {
+ return C.pn_timestamp_t(0)
+ }
+ return C.pn_timestamp_t(t.UnixNano() / int64(time.Millisecond))
}
// goTime converts a pn_timestamp_t to a Go time.Time.
+// Take care to convert zero values to zero values.
func goTime(t C.pn_timestamp_t) time.Time {
- secs := int64(t) / 1000
- nsecs := (int64(t) % 1000) * int64(time.Millisecond)
- return time.Unix(secs, nsecs)
+ if t == 0 {
+ return time.Time{}
+ }
+ return time.Unix(0, int64(t)*int64(time.Millisecond))
+}
+
+func pnDuration(d time.Duration) C.pn_millis_t {
+ return (C.pn_millis_t)(d / (time.Millisecond))
+}
+
+func goDuration(d C.pn_millis_t) time.Duration {
+ return time.Duration(d) * time.Millisecond
}
func goBytes(cBytes C.pn_bytes_t) (bytes []byte) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccaeaa0c/go/src/qpid.apache.org/amqp/types_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/amqp/types_test.go b/go/src/qpid.apache.org/amqp/types_test.go
index 994a3cd..d7e54f1 100644
--- a/go/src/qpid.apache.org/amqp/types_test.go
+++ b/go/src/qpid.apache.org/amqp/types_test.go
@@ -28,7 +28,7 @@ import (
func checkEqual(want interface{}, got interface{}) error {
if !reflect.DeepEqual(want, got) {
- return fmt.Errorf("%#v != %#v", want, got)
+ return fmt.Errorf("(%T)%#v != (%T)%#v", want, want, got, got)
}
return nil
}
@@ -200,3 +200,29 @@ func TestDescribed(t *testing.T) {
t.Error(err)
}
}
+
+func TestTimeConversion(t *testing.T) {
+ pt := pnTime(timeValue)
+ if err := checkEqual(timeValue, goTime(pt)); err != nil {
+ t.Error(err)
+ }
+ if err := checkEqual(pt, pnTime(goTime(pt))); err != nil {
+ t.Error(err)
+ }
+ ut := time.Unix(123, 456*1000000)
+ if err := checkEqual(123456, int(pnTime(ut))); err != nil {
+ t.Error(err)
+ }
+ if err := checkEqual(ut, goTime(123456)); err != nil {
+ t.Error(err)
+ }
+
+ // Preserve zero values
+ var tz time.Time
+ if err := checkEqual(0, int(pnTime(tz))); err != nil {
+ t.Error(err)
+ }
+ if err := checkEqual(tz, goTime(pnTime(tz))); err != nil {
+ t.Error(err)
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccaeaa0c/go/src/qpid.apache.org/amqp/unmarshal.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/amqp/unmarshal.go b/go/src/qpid.apache.org/amqp/unmarshal.go
index 2c6e3f1..054f115 100644
--- a/go/src/qpid.apache.org/amqp/unmarshal.go
+++ b/go/src/qpid.apache.org/amqp/unmarshal.go
@@ -487,7 +487,7 @@ func unmarshal(v interface{}, data *C.pn_data_t) {
case *time.Time:
panicUnless(pnType == C.PN_TIMESTAMP, data, v)
- *v = time.Unix(0, int64(C.pn_data_get_timestamp(data))*1000)
+ *v = goTime(C.pn_data_get_timestamp(data))
case *UUID:
panicUnless(pnType == C.PN_UUID, data, v)
@@ -566,7 +566,7 @@ func getInterface(data *C.pn_data_t, vp *interface{}) {
case C.PN_SYMBOL:
*vp = Symbol(goString(C.pn_data_get_symbol(data)))
case C.PN_TIMESTAMP:
- *vp = time.Unix(0, int64(C.pn_data_get_timestamp(data))*1000)
+ *vp = goTime(C.pn_data_get_timestamp(data))
case C.PN_UUID:
var u UUID
unmarshal(&u, data)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[09/12] qpid-proton git commit: NO-JIRA: Map Go errors to
amqp::internal-error
Posted by ac...@apache.org.
NO-JIRA: Map Go errors to amqp::internal-error
Previously using go error type name which is not useful, and often empty.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/84844a95
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/84844a95
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/84844a95
Branch: refs/heads/master
Commit: 84844a95c37d8e6dcdc397b9974769e07980e8ae
Parents: ccaeaa0
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Oct 5 17:05:16 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 11 15:14:20 2018 -0400
----------------------------------------------------------------------
go/src/qpid.apache.org/amqp/error.go | 11 +++++++----
go/src/qpid.apache.org/proton/wrappers.go | 17 ++++++-----------
2 files changed, 13 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/84844a95/go/src/qpid.apache.org/amqp/error.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/amqp/error.go b/go/src/qpid.apache.org/amqp/error.go
index 3a178b2..a8974cb 100644
--- a/go/src/qpid.apache.org/amqp/error.go
+++ b/go/src/qpid.apache.org/amqp/error.go
@@ -24,7 +24,6 @@ import "C"
import (
"fmt"
- "reflect"
)
// Error is an AMQP error condition. It has a name and a description.
@@ -46,10 +45,14 @@ func Errorf(name, format string, arg ...interface{}) Error {
return Error{name, fmt.Sprintf(format, arg...)}
}
-// MakeError makes an AMQP error from a go error using the Go error type as the name
-// and the err.Error() string as the description.
+// MakeError makes an AMQP error from a go error: {Name: InternalError, Description: err.Error()}
+// If err is already an amqp.Error it is returned unchanged.
func MakeError(err error) Error {
- return Error{reflect.TypeOf(err).Name(), err.Error()}
+ if amqpErr, ok := err.(Error); ok {
+ return amqpErr
+ } else {
+ return Error{InternalError, err.Error()}
+ }
}
var (
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/84844a95/go/src/qpid.apache.org/proton/wrappers.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/proton/wrappers.go b/go/src/qpid.apache.org/proton/wrappers.go
index a7b7fb2..42b2a23 100644
--- a/go/src/qpid.apache.org/proton/wrappers.go
+++ b/go/src/qpid.apache.org/proton/wrappers.go
@@ -37,10 +37,10 @@ import "C"
import (
"fmt"
- "qpid.apache.org/amqp"
- "reflect"
"time"
"unsafe"
+
+ "qpid.apache.org/amqp"
)
// TODO aconway 2015-05-05: Documentation for generated types.
@@ -379,17 +379,12 @@ func (c Condition) Error() error {
return amqp.Error{Name: c.Name(), Description: c.Description()}
}
-// Set a Go error into a condition.
-// If it is not an amqp.Condition use the error type as name, error string as description.
+// Set a Go error into a condition, converting to an amqp.Error using amqp.MakeError
func (c Condition) SetError(err error) {
if err != nil {
- if cond, ok := err.(amqp.Error); ok {
- c.SetName(cond.Name)
- c.SetDescription(cond.Description)
- } else {
- c.SetName(reflect.TypeOf(err).Name())
- c.SetDescription(err.Error())
- }
+ cond := amqp.MakeError(err)
+ c.SetName(cond.Name)
+ c.SetDescription(cond.Description)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[11/12] qpid-proton git commit: PROTON-1910: [go] native Message
implementation
Posted by ac...@apache.org.
PROTON-1910: [go] native Message implementation
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/886d2b93
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/886d2b93
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/886d2b93
Branch: refs/heads/master
Commit: 886d2b9349f74c02219b29990ee04278124f5224
Parents: ef716fa
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Oct 11 15:20:23 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 11 15:20:23 2018 -0400
----------------------------------------------------------------------
go/src/qpid.apache.org/amqp/message.go | 521 +++++++++++++++--------
go/src/qpid.apache.org/amqp/message_test.go | 10 +-
2 files changed, 355 insertions(+), 176 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/886d2b93/go/src/qpid.apache.org/amqp/message.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/amqp/message.go b/go/src/qpid.apache.org/amqp/message.go
index e514b26..919904c 100644
--- a/go/src/qpid.apache.org/amqp/message.go
+++ b/go/src/qpid.apache.org/amqp/message.go
@@ -35,10 +35,10 @@ package amqp
import "C"
import (
+ "bytes"
"fmt"
- "runtime"
+ "reflect"
"time"
- "unsafe"
)
// Message is the interface to an AMQP message.
@@ -124,11 +124,12 @@ type Message interface {
// Per-delivery annotations to provide delivery instructions.
// May be added or removed by intermediaries during delivery.
+ // See ApplicationProperties() for properties set by the application.
DeliveryAnnotations() map[AnnotationKey]interface{}
SetDeliveryAnnotations(map[AnnotationKey]interface{})
// Message annotations added as part of the bare message at creation, usually
- // by an AMQP library. See ApplicationProperties() for adding application data.
+ // by an AMQP library. See ApplicationProperties() for properties set by the application.
MessageAnnotations() map[AnnotationKey]interface{}
SetMessageAnnotations(map[AnnotationKey]interface{})
@@ -141,15 +142,18 @@ type Message interface {
Inferred() bool
SetInferred(bool)
- // Marshal a Go value into the message body. See amqp.Marshal() for details.
+ // Get the message body, using the amqp.Unmarshal() rules for interface{}
+ Body() interface{}
+
+ // Set the body using amqp.Marshal()
+ SetBody(interface{})
+
+ // Marshal a Go value into the message body, synonym for SetBody()
Marshal(interface{})
- // Unmarshal the message body into the value pointed to by v. See amqp.Unmarshal() for details.
+ // Unmarshal the message body into the value pointed to by v. See amqp.Unmarshal()
Unmarshal(interface{})
- // Body value resulting from the default unmarshaling of message body as interface{}
- Body() interface{}
-
// Encode encodes the message as AMQP data. If buffer is non-nil and is large enough
// the message is encoded into it, otherwise a new buffer is created.
// Returns the buffer containing the message.
@@ -158,7 +162,7 @@ type Message interface {
// Decode data into this message. Overwrites an existing message content.
Decode(buffer []byte) error
- // Clear the message contents.
+ // Clear the message contents, set all fields to the default value.
Clear()
// Copy the contents of another message to this one.
@@ -180,194 +184,205 @@ type Message interface {
String() string
}
-type message struct{ pn *C.pn_message_t }
-
-func freeMessage(m *message) {
- C.pn_message_free(m.pn)
- m.pn = nil
-}
-
// NewMessage creates a new message instance.
func NewMessage() Message {
- m := &message{C.pn_message()}
- runtime.SetFinalizer(m, freeMessage)
+ m := &message{}
+ m.Clear()
return m
}
-// NewMessageWith creates a message with value as the body. Equivalent to
-// m := NewMessage(); m.Marshal(body)
+// NewMessageWith creates a message with value as the body.
func NewMessageWith(value interface{}) Message {
m := NewMessage()
- m.Marshal(value)
+ m.SetBody(value)
return m
}
-func (m *message) Clear() { C.pn_message_clear(m.pn) }
-
-func (m *message) Copy(x Message) error {
- if data, err := x.Encode(nil); err == nil {
- return m.Decode(data)
- } else {
- return err
- }
-}
-
-// ==== message get functions
-
-func rewindGet(data *C.pn_data_t) (v interface{}) {
- C.pn_data_rewind(data)
- C.pn_data_next(data)
- unmarshal(&v, data)
- return v
-}
-
-func (m *message) Inferred() bool { return bool(C.pn_message_is_inferred(m.pn)) }
-func (m *message) Durable() bool { return bool(C.pn_message_is_durable(m.pn)) }
-func (m *message) Priority() uint8 { return uint8(C.pn_message_get_priority(m.pn)) }
-func (m *message) TTL() time.Duration {
- return time.Duration(C.pn_message_get_ttl(m.pn)) * time.Millisecond
+// NewMessageCopy creates a copy of an existing message.
+func NewMessageCopy(m Message) Message {
+ m2 := NewMessage()
+ m2.Copy(m)
+ return m2
}
-func (m *message) FirstAcquirer() bool { return bool(C.pn_message_is_first_acquirer(m.pn)) }
-func (m *message) DeliveryCount() uint32 { return uint32(C.pn_message_get_delivery_count(m.pn)) }
-func (m *message) MessageId() interface{} { return rewindGet(C.pn_message_id(m.pn)) }
-func (m *message) UserId() string { return goString(C.pn_message_get_user_id(m.pn)) }
-func (m *message) Address() string { return C.GoString(C.pn_message_get_address(m.pn)) }
-func (m *message) Subject() string { return C.GoString(C.pn_message_get_subject(m.pn)) }
-func (m *message) ReplyTo() string { return C.GoString(C.pn_message_get_reply_to(m.pn)) }
-func (m *message) CorrelationId() interface{} { return rewindGet(C.pn_message_correlation_id(m.pn)) }
-func (m *message) ContentType() string { return C.GoString(C.pn_message_get_content_type(m.pn)) }
-func (m *message) ContentEncoding() string { return C.GoString(C.pn_message_get_content_encoding(m.pn)) }
-func (m *message) ExpiryTime() time.Time {
- return time.Unix(0, int64(time.Millisecond*time.Duration(C.pn_message_get_expiry_time(m.pn))))
-}
-func (m *message) CreationTime() time.Time {
- return time.Unix(0, int64(time.Millisecond)*int64(C.pn_message_get_creation_time(m.pn)))
-}
-func (m *message) GroupId() string { return C.GoString(C.pn_message_get_group_id(m.pn)) }
-func (m *message) GroupSequence() int32 { return int32(C.pn_message_get_group_sequence(m.pn)) }
-func (m *message) ReplyToGroupId() string { return C.GoString(C.pn_message_get_reply_to_group_id(m.pn)) }
+// Reset message to all default values
+func (m *message) Clear() { *m = message{priority: 4} }
-func getAnnotations(data *C.pn_data_t) (v map[AnnotationKey]interface{}) {
- if C.pn_data_size(data) > 0 {
- C.pn_data_rewind(data)
- C.pn_data_next(data)
- unmarshal(&v, data)
+// Copy makes a deep copy of message x
+func (m *message) Copy(x Message) error {
+ var mc MessageCodec
+ bytes, err := mc.Encode(x, nil)
+ if err == nil {
+ err = mc.Decode(m, bytes)
}
- return v
-}
+ return err
+}
+
+type message struct {
+ address string
+ applicationProperties map[string]interface{}
+ contentEncoding string
+ contentType string
+ correlationId interface{}
+ creationTime time.Time
+ deliveryAnnotations map[AnnotationKey]interface{}
+ deliveryCount uint32
+ durable bool
+ expiryTime time.Time
+ firstAcquirer bool
+ groupId string
+ groupSequence int32
+ inferred bool
+ messageAnnotations map[AnnotationKey]interface{}
+ messageId interface{}
+ priority uint8
+ replyTo string
+ replyToGroupId string
+ subject string
+ ttl time.Duration
+ userId string
+ body interface{}
+ // Keep the original data to support Unmarshal to a non-interface{} type
+ // Waste of memory, consider deprecating or making it optional.
+ pnBody *C.pn_data_t
+}
+
+// ==== message get methods
+func (m *message) Body() interface{} { return m.body }
+func (m *message) Inferred() bool { return m.inferred }
+func (m *message) Durable() bool { return m.durable }
+func (m *message) Priority() uint8 { return m.priority }
+func (m *message) TTL() time.Duration { return m.ttl }
+func (m *message) FirstAcquirer() bool { return m.firstAcquirer }
+func (m *message) DeliveryCount() uint32 { return m.deliveryCount }
+func (m *message) MessageId() interface{} { return m.messageId }
+func (m *message) UserId() string { return m.userId }
+func (m *message) Address() string { return m.address }
+func (m *message) Subject() string { return m.subject }
+func (m *message) ReplyTo() string { return m.replyTo }
+func (m *message) CorrelationId() interface{} { return m.correlationId }
+func (m *message) ContentType() string { return m.contentType }
+func (m *message) ContentEncoding() string { return m.contentEncoding }
+func (m *message) ExpiryTime() time.Time { return m.expiryTime }
+func (m *message) CreationTime() time.Time { return m.creationTime }
+func (m *message) GroupId() string { return m.groupId }
+func (m *message) GroupSequence() int32 { return m.groupSequence }
+func (m *message) ReplyToGroupId() string { return m.replyToGroupId }
func (m *message) DeliveryAnnotations() map[AnnotationKey]interface{} {
- return getAnnotations(C.pn_message_instructions(m.pn))
+ if m.deliveryAnnotations == nil {
+ m.deliveryAnnotations = make(map[AnnotationKey]interface{})
+ }
+ return m.deliveryAnnotations
}
func (m *message) MessageAnnotations() map[AnnotationKey]interface{} {
- return getAnnotations(C.pn_message_annotations(m.pn))
+ if m.messageAnnotations == nil {
+ m.messageAnnotations = make(map[AnnotationKey]interface{})
+ }
+ return m.messageAnnotations
}
-
func (m *message) ApplicationProperties() map[string]interface{} {
- var v map[string]interface{}
- data := C.pn_message_properties(m.pn)
- if C.pn_data_size(data) > 0 {
- C.pn_data_rewind(data)
- C.pn_data_next(data)
- unmarshal(&v, data)
+ if m.applicationProperties == nil {
+ m.applicationProperties = make(map[string]interface{})
}
- return v
+ return m.applicationProperties
}
// ==== message set methods
-func setData(v interface{}, data *C.pn_data_t) {
- C.pn_data_clear(data)
- marshal(v, data)
-}
+func (m *message) SetBody(v interface{}) { m.body = v }
+func (m *message) SetInferred(x bool) { m.inferred = x }
+func (m *message) SetDurable(x bool) { m.durable = x }
+func (m *message) SetPriority(x uint8) { m.priority = x }
+func (m *message) SetTTL(x time.Duration) { m.ttl = x }
+func (m *message) SetFirstAcquirer(x bool) { m.firstAcquirer = x }
+func (m *message) SetDeliveryCount(x uint32) { m.deliveryCount = x }
+func (m *message) SetMessageId(x interface{}) { m.messageId = x }
+func (m *message) SetUserId(x string) { m.userId = x }
+func (m *message) SetAddress(x string) { m.address = x }
+func (m *message) SetSubject(x string) { m.subject = x }
+func (m *message) SetReplyTo(x string) { m.replyTo = x }
+func (m *message) SetCorrelationId(x interface{}) { m.correlationId = x }
+func (m *message) SetContentType(x string) { m.contentType = x }
+func (m *message) SetContentEncoding(x string) { m.contentEncoding = x }
+func (m *message) SetExpiryTime(x time.Time) { m.expiryTime = x }
+func (m *message) SetCreationTime(x time.Time) { m.creationTime = x }
+func (m *message) SetGroupId(x string) { m.groupId = x }
+func (m *message) SetGroupSequence(x int32) { m.groupSequence = x }
+func (m *message) SetReplyToGroupId(x string) { m.replyToGroupId = x }
+
+func (m *message) SetDeliveryAnnotations(x map[AnnotationKey]interface{}) {
+ m.deliveryAnnotations = x
+}
+func (m *message) SetMessageAnnotations(x map[AnnotationKey]interface{}) {
+ m.messageAnnotations = x
+}
+func (m *message) SetApplicationProperties(x map[string]interface{}) {
+ m.applicationProperties = x
+}
+
+// Marshal body from v, same as SetBody(v). See amqp.Marshal.
+func (m *message) Marshal(v interface{}) { m.body = v }
-func (m *message) SetInferred(b bool) { C.pn_message_set_inferred(m.pn, C.bool(b)) }
-func (m *message) SetDurable(b bool) { C.pn_message_set_durable(m.pn, C.bool(b)) }
-func (m *message) SetPriority(b uint8) { C.pn_message_set_priority(m.pn, C.uint8_t(b)) }
-func (m *message) SetTTL(d time.Duration) {
- C.pn_message_set_ttl(m.pn, C.pn_millis_t(d/time.Millisecond))
-}
-func (m *message) SetFirstAcquirer(b bool) { C.pn_message_set_first_acquirer(m.pn, C.bool(b)) }
-func (m *message) SetDeliveryCount(c uint32) { C.pn_message_set_delivery_count(m.pn, C.uint32_t(c)) }
-func (m *message) SetMessageId(id interface{}) { setData(id, C.pn_message_id(m.pn)) }
-func (m *message) SetUserId(s string) { C.pn_message_set_user_id(m.pn, pnBytes(([]byte)(s))) }
-func (m *message) SetAddress(s string) {
- C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_address))
-}
-func (m *message) SetSubject(s string) {
- C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_subject))
-}
-func (m *message) SetReplyTo(s string) {
- C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to))
-}
-func (m *message) SetCorrelationId(c interface{}) { setData(c, C.pn_message_correlation_id(m.pn)) }
-func (m *message) SetContentType(s string) {
- C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_type))
-}
-func (m *message) SetContentEncoding(s string) {
- C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_encoding))
-}
-func (m *message) SetExpiryTime(t time.Time) { C.pn_message_set_expiry_time(m.pn, pnTime(t)) }
-func (m *message) SetCreationTime(t time.Time) { C.pn_message_set_creation_time(m.pn, pnTime(t)) }
-func (m *message) SetGroupId(s string) {
- C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_group_id))
-}
-func (m *message) SetGroupSequence(s int32) {
- C.pn_message_set_group_sequence(m.pn, C.pn_sequence_t(s))
-}
-func (m *message) SetReplyToGroupId(s string) {
- C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to_group_id))
+func (m *message) Unmarshal(v interface{}) {
+ // FIXME aconway 2018-09-28: this is inefficient, replace with a
+ // reflective conversion from the existing body value that respects
+ // the Unmarshal() rules.
+ pnData := C.pn_data(2)
+ marshal(m.body, pnData)
+ unmarshal(v, pnData)
}
-func (m *message) SetDeliveryAnnotations(v map[AnnotationKey]interface{}) {
- setData(v, C.pn_message_instructions(m.pn))
-}
-func (m *message) SetMessageAnnotations(v map[AnnotationKey]interface{}) {
- setData(v, C.pn_message_annotations(m.pn))
+// Internal use only
+type MessageCodec struct {
+ pn *C.pn_message_t // Cache a pn_message_t to speed up encode/decode
+ // Optionally remember a byte buffer to use with MessageCodec methods.
+ Buffer []byte
}
-func (m *message) SetApplicationProperties(v map[string]interface{}) {
- setData(v, C.pn_message_properties(m.pn))
-}
-
-// Marshal body from v
-func (m *message) Marshal(v interface{}) { clearMarshal(v, C.pn_message_body(m.pn)) }
-// Unmarshal body to v, which must be a pointer to a value. See amqp.Unmarshal
-func (m *message) Unmarshal(v interface{}) {
- data := C.pn_message_body(m.pn)
- if C.pn_data_size(data) > 0 {
- C.pn_data_rewind(data)
- C.pn_data_next(data)
- unmarshal(v, data)
+func (mc *MessageCodec) pnMessage() *C.pn_message_t {
+ if mc.pn == nil {
+ mc.pn = C.pn_message()
}
- return
+ return mc.pn
}
-// Return the body value as an interface
-func (m *message) Body() (v interface{}) { m.Unmarshal(&v); return }
-
-func (m *message) Decode(data []byte) error {
- m.Clear()
- if len(data) == 0 {
- return fmt.Errorf("empty buffer for decode")
+func (mc *MessageCodec) Close() {
+ if mc.pn != nil {
+ C.pn_message_free(mc.pn)
+ mc.pn = nil
}
- if C.pn_message_decode(m.pn, cPtr(data), cLen(data)) < 0 {
- return fmt.Errorf("decoding message: %s", PnError(C.pn_message_error(m.pn)))
+}
+
+func (mc *MessageCodec) Decode(m Message, data []byte) error {
+ pn := mc.pnMessage()
+ if C.pn_message_decode(pn, cPtr(data), cLen(data)) < 0 {
+ return fmt.Errorf("decoding message: %s", PnError(C.pn_message_error(pn)))
}
+ m.(*message).get(pn)
return nil
}
+func (m *message) Decode(data []byte) error {
+ var mc MessageCodec
+ defer mc.Close()
+ return mc.Decode(m, data)
+}
+
func DecodeMessage(data []byte) (m Message, err error) {
m = NewMessage()
err = m.Decode(data)
return
}
-func (m *message) Encode(buffer []byte) ([]byte, error) {
+// Encode m using buffer. Return the final buffer used to hold m,
+// may be different if the initial buffer was not large enough.
+func (mc *MessageCodec) Encode(m Message, buffer []byte) ([]byte, error) {
+ pn := mc.pnMessage()
+ m.(*message).put(pn)
encode := func(buf []byte) ([]byte, error) {
len := cLen(buf)
- result := C.pn_message_encode(m.pn, cPtr(buf), &len)
+ result := C.pn_message_encode(pn, cPtr(buf), &len)
switch {
case result == C.PN_OVERFLOW:
return buf, overflow
@@ -380,50 +395,214 @@ func (m *message) Encode(buffer []byte) ([]byte, error) {
return encodeGrow(buffer, encode)
}
+func (m *message) Encode(buffer []byte) ([]byte, error) {
+ var mc MessageCodec
+ defer mc.Close()
+ return mc.Encode(m, buffer)
+}
+
// TODO aconway 2015-09-14: Multi-section messages.
-func (m *message) String() string {
- str := C.pn_string(C.CString(""))
- defer C.pn_free(unsafe.Pointer(str))
- C.pn_inspect(unsafe.Pointer(m.pn), str)
- return C.GoString(C.pn_string_get(str))
+type ignoreFunc func(v interface{}) bool
+
+func isNil(v interface{}) bool { return v == nil }
+func isZero(v interface{}) bool { return v == reflect.Zero(reflect.TypeOf(v)).Interface() }
+func isEmpty(v interface{}) bool { return reflect.ValueOf(v).Len() == 0 }
+
+type stringBuilder struct {
+ bytes.Buffer
+ separator string
}
-// ==== Deprecated functions
-func oldGetAnnotations(data *C.pn_data_t) (v map[string]interface{}) {
- if C.pn_data_size(data) > 0 {
+func (b *stringBuilder) field(name string, value interface{}, ignore ignoreFunc) {
+ if !ignore(value) {
+ b.WriteString(b.separator)
+ b.separator = ", "
+ b.WriteString(name)
+ b.WriteString(": ")
+ fmt.Fprintf(&b.Buffer, "%v", value)
+ }
+}
+
+// Human-readable string describing message.
+// Includes only message fields with non-default values.
+func (m *message) String() string {
+ var b stringBuilder
+ b.WriteString("Message{")
+ b.field("address", m.address, isEmpty)
+ b.field("durable", m.durable, isZero)
+ // Priority has weird default
+ b.field("priority", m.priority, func(v interface{}) bool { return v.(uint8) == 4 })
+ b.field("ttl", m.ttl, isZero)
+ b.field("first-acquirer", m.firstAcquirer, isZero)
+ b.field("delivery-count", m.deliveryCount, isZero)
+ b.field("message-id", m.messageId, isNil)
+ b.field("user-id", m.userId, isEmpty)
+ b.field("subject", m.subject, isEmpty)
+ b.field("reply-to", m.replyTo, isEmpty)
+ b.field("correlation-id", m.correlationId, isNil)
+ b.field("content-type", m.contentType, isEmpty)
+ b.field("content-encoding", m.contentEncoding, isEmpty)
+ b.field("expiry-time", m.expiryTime, isZero)
+ b.field("creation-time", m.creationTime, isZero)
+ b.field("group-id", m.groupId, isEmpty)
+ b.field("group-sequence", m.groupSequence, isZero)
+ b.field("reply-to-group-id", m.replyToGroupId, isEmpty)
+ b.field("inferred", m.inferred, isZero)
+ b.field("delivery-annotations", m.deliveryAnnotations, isEmpty)
+ b.field("message-annotations", m.messageAnnotations, isEmpty)
+ b.field("application-properties", m.applicationProperties, isEmpty)
+ b.field("body", m.body, isNil)
+ b.WriteString("}")
+ return b.String()
+}
+
+// ==== get message from pn_message_t
+
+func getData(v interface{}, data *C.pn_data_t) {
+ if data != nil && C.pn_data_size(data) > 0 {
C.pn_data_rewind(data)
C.pn_data_next(data)
- unmarshal(&v, data)
+ unmarshal(v, data)
+ }
+ return
+}
+
+func getString(c *C.char) string {
+ if c == nil {
+ return ""
+ }
+ return C.GoString(c)
+}
+
+func (m *message) get(pn *C.pn_message_t) {
+ m.Clear()
+ m.inferred = bool(C.pn_message_is_inferred(pn))
+ m.durable = bool(C.pn_message_is_durable(pn))
+ m.priority = uint8(C.pn_message_get_priority(pn))
+ m.ttl = goDuration(C.pn_message_get_ttl(pn))
+ m.firstAcquirer = bool(C.pn_message_is_first_acquirer(pn))
+ m.deliveryCount = uint32(C.pn_message_get_delivery_count(pn))
+ getData(&m.messageId, C.pn_message_id(pn))
+ m.userId = string(goBytes(C.pn_message_get_user_id(pn)))
+ m.address = getString(C.pn_message_get_address(pn))
+ m.subject = getString(C.pn_message_get_subject(pn))
+ m.replyTo = getString(C.pn_message_get_reply_to(pn))
+ getData(&m.correlationId, C.pn_message_correlation_id(pn))
+ m.contentType = getString(C.pn_message_get_content_type(pn))
+ m.contentEncoding = getString(C.pn_message_get_content_encoding(pn))
+ m.expiryTime = goTime(C.pn_message_get_expiry_time(pn))
+ m.creationTime = goTime(C.pn_message_get_creation_time(pn))
+ m.groupId = getString(C.pn_message_get_group_id(pn))
+ m.groupSequence = int32(C.pn_message_get_group_sequence(pn))
+ m.replyToGroupId = getString(C.pn_message_get_reply_to_group_id(pn))
+ getData(&m.deliveryAnnotations, C.pn_message_instructions(pn))
+ getData(&m.messageAnnotations, C.pn_message_annotations(pn))
+ getData(&m.applicationProperties, C.pn_message_properties(pn))
+ getData(&m.body, C.pn_message_body(pn))
+}
+
+// ==== put message to pn_message_t
+
+func putData(v interface{}, pn *C.pn_data_t) {
+ if v != nil {
+ C.pn_data_clear(pn)
+ marshal(v, pn)
+ }
+}
+
+// For pointer-based fields (pn_data_t, strings, bytes) only
+// put a field if it has a non-empty value
+func (m *message) put(pn *C.pn_message_t) {
+ C.pn_message_clear(pn)
+ C.pn_message_set_inferred(pn, C.bool(m.inferred))
+ C.pn_message_set_durable(pn, C.bool(m.durable))
+ C.pn_message_set_priority(pn, C.uint8_t(m.priority))
+ C.pn_message_set_ttl(pn, pnDuration(m.ttl))
+ C.pn_message_set_first_acquirer(pn, C.bool(m.firstAcquirer))
+ C.pn_message_set_delivery_count(pn, C.uint32_t(m.deliveryCount))
+ putData(m.messageId, C.pn_message_id(pn))
+ if m.userId != "" {
+ C.pn_message_set_user_id(pn, pnBytes(([]byte)(m.userId)))
+ }
+ if m.address != "" {
+ C.pn_message_set_address(pn, C.CString(m.address))
+ }
+ if m.subject != "" {
+ C.pn_message_set_subject(pn, C.CString(m.subject))
+ }
+ if m.replyTo != "" {
+ C.pn_message_set_reply_to(pn, C.CString(m.replyTo))
+ }
+ putData(m.correlationId, C.pn_message_correlation_id(pn))
+ if m.contentType != "" {
+ C.pn_message_set_content_type(pn, C.CString(m.contentType))
+ }
+ if m.contentEncoding != "" {
+ C.pn_message_set_content_encoding(pn, C.CString(m.contentEncoding))
+ }
+ C.pn_message_set_expiry_time(pn, pnTime(m.expiryTime))
+ C.pn_message_set_creation_time(pn, pnTime(m.creationTime))
+ if m.groupId != "" {
+ C.pn_message_set_group_id(pn, C.CString(m.groupId))
}
- return v
+ C.pn_message_set_group_sequence(pn, C.pn_sequence_t(m.groupSequence))
+ if m.replyToGroupId != "" {
+ C.pn_message_set_reply_to_group_id(pn, C.CString(m.replyToGroupId))
+ }
+ if len(m.deliveryAnnotations) != 0 {
+ putData(m.deliveryAnnotations, C.pn_message_instructions(pn))
+ }
+ if len(m.messageAnnotations) != 0 {
+ putData(m.messageAnnotations, C.pn_message_annotations(pn))
+ }
+ if len(m.applicationProperties) != 0 {
+ putData(m.applicationProperties, C.pn_message_properties(pn))
+ }
+ putData(m.body, C.pn_message_body(pn))
+}
+
+// ==== Deprecated functions
+
+func oldAnnotations(in map[AnnotationKey]interface{}) (out map[string]interface{}) {
+ if len(in) == 0 {
+ return nil
+ }
+ out = make(map[string]interface{})
+ for k, v := range in {
+ out[k.String()] = v
+ }
+ return
}
func (m *message) Instructions() map[string]interface{} {
- return oldGetAnnotations(C.pn_message_instructions(m.pn))
+ return oldAnnotations(m.deliveryAnnotations)
}
func (m *message) Annotations() map[string]interface{} {
- return oldGetAnnotations(C.pn_message_annotations(m.pn))
+ return oldAnnotations(m.messageAnnotations)
}
func (m *message) Properties() map[string]interface{} {
- return oldGetAnnotations(C.pn_message_properties(m.pn))
+ return m.applicationProperties
}
// Convert old string-keyed annotations to an AnnotationKey map
-func fixAnnotations(old map[string]interface{}) (annotations map[AnnotationKey]interface{}) {
- annotations = make(map[AnnotationKey]interface{})
- for k, v := range old {
- annotations[AnnotationKeyString(k)] = v
+func newAnnotations(in map[string]interface{}) (out map[AnnotationKey]interface{}) {
+ if len(in) == 0 {
+ return nil
+ }
+ out = make(map[AnnotationKey]interface{})
+ for k, v := range in {
+ out[AnnotationKeyString(k)] = v
}
return
}
func (m *message) SetInstructions(v map[string]interface{}) {
- setData(fixAnnotations(v), C.pn_message_instructions(m.pn))
+ m.deliveryAnnotations = newAnnotations(v)
}
func (m *message) SetAnnotations(v map[string]interface{}) {
- setData(fixAnnotations(v), C.pn_message_annotations(m.pn))
+ m.messageAnnotations = newAnnotations(v)
}
func (m *message) SetProperties(v map[string]interface{}) {
- setData(fixAnnotations(v), C.pn_message_properties(m.pn))
+ m.applicationProperties = v
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/886d2b93/go/src/qpid.apache.org/amqp/message_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/amqp/message_test.go b/go/src/qpid.apache.org/amqp/message_test.go
index b22c60d..668ca49 100644
--- a/go/src/qpid.apache.org/amqp/message_test.go
+++ b/go/src/qpid.apache.org/amqp/message_test.go
@@ -58,14 +58,14 @@ func TestDefaultMessage(t *testing.T) {
{"ReplyToGroupId", ""},
{"MessageId", nil},
{"CorrelationId", nil},
- {"DeliveryAnnotations", map[AnnotationKey]interface{}(nil)},
- {"MessageAnnotations", map[AnnotationKey]interface{}(nil)},
- {"ApplicationProperties", map[string]interface{}(nil)},
+ {"DeliveryAnnotations", map[AnnotationKey]interface{}{}},
+ {"MessageAnnotations", map[AnnotationKey]interface{}{}},
+ {"ApplicationProperties", map[string]interface{}{}},
// Deprecated
{"Instructions", map[string]interface{}(nil)},
{"Annotations", map[string]interface{}(nil)},
- {"Properties", map[string]interface{}(nil)},
+ {"Properties", map[string]interface{}{}},
{"Body", nil},
} {
ret := mv.MethodByName(x.method).Call(nil)
@@ -91,7 +91,7 @@ func TestMessageString(t *testing.T) {
if err := roundTrip(m); err != nil {
t.Error(err)
}
- msgstr := `Message{user_id="user", instructions={:instructions="foo"}, annotations={:annotations="bar"}, properties={"int"=32}, body="hello"}`
+ msgstr := "Message{user-id: user, delivery-annotations: map[instructions:foo], message-annotations: map[annotations:bar], application-properties: map[int:32], body: hello}"
if err := checkEqual(msgstr, m.String()); err != nil {
t.Error(err)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[04/12] qpid-proton git commit: NO-JIRA: Fix go vet -v warnings,
minor typos
Posted by ac...@apache.org.
NO-JIRA: Fix go vet -v warnings, minor typos
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/f46076e4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/f46076e4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/f46076e4
Branch: refs/heads/master
Commit: f46076e4c19f05584f1724ef52930821c8d66c38
Parents: 55b2735
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Oct 5 15:38:14 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 11 12:33:08 2018 -0400
----------------------------------------------------------------------
go/src/qpid.apache.org/amqp/types_test.go | 2 +-
go/src/qpid.apache.org/electron/common_test.go | 2 +-
go/src/qpid.apache.org/electron/connection.go | 5 ++-
go/src/qpid.apache.org/electron/error.go | 35 ---------------------
go/src/qpid.apache.org/electron/receiver.go | 8 +++--
go/src/qpid.apache.org/proton/error.go | 13 +-------
6 files changed, 12 insertions(+), 53 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f46076e4/go/src/qpid.apache.org/amqp/types_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/amqp/types_test.go b/go/src/qpid.apache.org/amqp/types_test.go
index 8f7cd6a..994a3cd 100644
--- a/go/src/qpid.apache.org/amqp/types_test.go
+++ b/go/src/qpid.apache.org/amqp/types_test.go
@@ -44,7 +44,7 @@ func checkUnmarshal(marshaled []byte, v interface{}) error {
return nil
}
-func ExampleKey() {
+func ExampleAnnotationKey() {
var k AnnotationKey = AnnotationKeySymbol(Symbol("foo"))
fmt.Println(k.Get().(Symbol))
k = AnnotationKeyUint64(42)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f46076e4/go/src/qpid.apache.org/electron/common_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/common_test.go b/go/src/qpid.apache.org/electron/common_test.go
index e2e4a2a..59fbfff 100644
--- a/go/src/qpid.apache.org/electron/common_test.go
+++ b/go/src/qpid.apache.org/electron/common_test.go
@@ -47,7 +47,7 @@ func fatalIf(t testing.TB, err error) {
func errorIf(t testing.TB, err error) {
if err != nil {
- t.Errorf(decorate(err, 1))
+ t.Error(decorate(err, 1))
}
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f46076e4/go/src/qpid.apache.org/electron/connection.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/connection.go b/go/src/qpid.apache.org/electron/connection.go
index 628d933..48cba60 100644
--- a/go/src/qpid.apache.org/electron/connection.go
+++ b/go/src/qpid.apache.org/electron/connection.go
@@ -25,6 +25,7 @@ import "C"
import (
"crypto/rand"
"encoding/hex"
+ "fmt"
"net"
"sync"
"time"
@@ -310,7 +311,9 @@ func (c *connection) WaitTimeout(timeout time.Duration) error {
}
func (c *connection) Incoming() <-chan Incoming {
- assert(c.incoming != nil, "Incoming() is only allowed for a Connection created with the Server() option: %s", c)
+ if c.incoming == nil {
+ panic(fmt.Errorf("Incoming() only allowed on Connection created with the Server() option: %s", c))
+ }
return c.incoming
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f46076e4/go/src/qpid.apache.org/electron/error.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/error.go b/go/src/qpid.apache.org/electron/error.go
deleted file mode 100644
index 4dcfd94..0000000
--- a/go/src/qpid.apache.org/electron/error.go
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package electron
-
-import (
- "fmt"
-)
-
-// assert panics if condition is false with optional formatted message
-func assert(condition bool, format ...interface{}) {
- if !condition {
- if len(format) > 0 {
- panic(fmt.Errorf(format[0].(string), format[1:]...))
- } else {
- panic(fmt.Errorf("assertion failed"))
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f46076e4/go/src/qpid.apache.org/electron/receiver.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/receiver.go b/go/src/qpid.apache.org/electron/receiver.go
index 26b46a8..8e3b1a7 100644
--- a/go/src/qpid.apache.org/electron/receiver.go
+++ b/go/src/qpid.apache.org/electron/receiver.go
@@ -21,9 +21,10 @@ package electron
import (
"fmt"
+ "time"
+
"qpid.apache.org/amqp"
"qpid.apache.org/proton"
- "time"
)
// Receiver is a Link that receives messages.
@@ -126,7 +127,9 @@ func (r *receiver) Receive() (rm ReceivedMessage, err error) {
}
func (r *receiver) ReceiveTimeout(timeout time.Duration) (rm ReceivedMessage, err error) {
- assert(r.buffer != nil, "Receiver is not open: %s", r)
+ if r.buffer == nil {
+ panic(fmt.Errorf("Receiver is not open: %s", r))
+ }
if !r.prefetch { // Per-caller flow control
select { // Check for immediate availability, avoid caller() inject
case rm2, ok := <-r.buffer:
@@ -164,7 +167,6 @@ func (r *receiver) message(delivery proton.Delivery) {
localClose(r.pLink, err)
return
}
- assert(m != nil)
r.pLink.Advance()
if r.pLink.Credit() < 0 {
localClose(r.pLink, fmt.Errorf("received message in excess of credit limit"))
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f46076e4/go/src/qpid.apache.org/proton/error.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/proton/error.go b/go/src/qpid.apache.org/proton/error.go
index 5232fec..a6fb026 100644
--- a/go/src/qpid.apache.org/proton/error.go
+++ b/go/src/qpid.apache.org/proton/error.go
@@ -71,7 +71,7 @@ type ErrorHolder struct {
value atomic.Value
}
-// Set the error if not already set, return the error in the Holder.
+// Set the error if not already set
func (e *ErrorHolder) Set(err error) {
if err != nil {
e.once.Do(func() { e.value.Store(err) })
@@ -83,14 +83,3 @@ func (e *ErrorHolder) Get() (err error) {
err, _ = e.value.Load().(error)
return
}
-
-// assert panics if condition is false with optional formatted message
-func assert(condition bool, format ...interface{}) {
- if !condition {
- if len(format) > 0 {
- panic(fmt.Errorf(format[0].(string), format[1:]...))
- } else {
- panic(fmt.Errorf("assertion failed"))
- }
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[06/12] qpid-proton git commit: PROTON-1952: [go] Server connection
fails to authenticate
Posted by ac...@apache.org.
PROTON-1952: [go] Server connection fails to authenticate
Problem: If the Server() option came after SASL options in the option list, the
server hangs in authentication.
Solution: NewConnection panics if there is a Server() option that is not the first
option in the list.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/55b27351
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/55b27351
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/55b27351
Branch: refs/heads/master
Commit: 55b27351de92258cb4e8d7ba32701b6c0ba93a3c
Parents: c249107
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Oct 5 12:02:28 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 11 12:33:08 2018 -0400
----------------------------------------------------------------------
go/src/qpid.apache.org/electron/common_test.go | 32 +++++++++---------
go/src/qpid.apache.org/electron/connection.go | 37 +++++++++++++++------
go/src/qpid.apache.org/electron/container.go | 5 +--
3 files changed, 46 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/55b27351/go/src/qpid.apache.org/electron/common_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/common_test.go b/go/src/qpid.apache.org/electron/common_test.go
index 3aad825..e2e4a2a 100644
--- a/go/src/qpid.apache.org/electron/common_test.go
+++ b/go/src/qpid.apache.org/electron/common_test.go
@@ -25,6 +25,7 @@ import (
"path"
"reflect"
"runtime"
+ "sync"
"testing"
)
@@ -69,16 +70,12 @@ type pair struct {
auth connectionSettings
}
-func newPair(t testing.TB, cli, srv net.Conn, clientOpts, serverOpts []ConnectionOption) *pair {
- opts := append([]ConnectionOption{Server()}, serverOpts...)
- sc, _ := NewConnection(srv, opts...)
- opts = append([]ConnectionOption{}, clientOpts...)
- cc, _ := NewConnection(cli, opts...)
- cs, _ := cc.Session()
+func newPair(t testing.TB, cli, srv Connection) *pair {
+ cs, _ := cli.Session()
p := &pair{
t: t,
client: cs,
- server: sc,
+ server: srv,
capacity: 100,
rchan: make(chan Receiver),
schan: make(chan Sender)}
@@ -107,26 +104,31 @@ func newPair(t testing.TB, cli, srv net.Conn, clientOpts, serverOpts []Connectio
// AMQP pair linked by in-memory pipe
func newPipe(t testing.TB, clientOpts, serverOpts []ConnectionOption) *pair {
cli, srv := net.Pipe()
- return newPair(t, cli, srv, clientOpts, serverOpts)
+ opts := []ConnectionOption{Server(), ContainerId(t.Name() + "-server")}
+ sc, _ := NewConnection(srv, append(opts, serverOpts...)...)
+ opts = []ConnectionOption{ContainerId(t.Name() + "-client")}
+ cc, _ := NewConnection(cli, append(opts, clientOpts...)...)
+ return newPair(t, cc, sc)
}
// AMQP pair linked by TCP socket
func newSocketPair(t testing.TB, clientOpts, serverOpts []ConnectionOption) *pair {
l, err := net.Listen("tcp4", ":0") // For systems with ipv6 disabled
fatalIfN(t, err, 1)
- srvCh := make(chan net.Conn)
+ var srv Connection
var srvErr error
+ var wg sync.WaitGroup
+ wg.Add(1)
go func() {
- var c net.Conn
- c, srvErr = l.Accept()
- srvCh <- c
+ defer wg.Done()
+ srv, srvErr = NewContainer(t.Name()+"-server").Accept(l, serverOpts...)
}()
addr := l.Addr()
- cli, err := net.Dial(addr.Network(), addr.String())
+ cli, err := NewContainer(t.Name()+"-client").Dial(addr.Network(), addr.String(), clientOpts...)
fatalIfN(t, err, 1)
- srv := <-srvCh
+ wg.Wait()
fatalIfN(t, srvErr, 1)
- return newPair(t, cli, srv, clientOpts, serverOpts)
+ return newPair(t, cli, srv)
}
func (p *pair) close() { p.client.Connection().Close(nil); p.server.Close(nil) }
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/55b27351/go/src/qpid.apache.org/electron/connection.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/connection.go b/go/src/qpid.apache.org/electron/connection.go
index 0d5e7c5..628d933 100644
--- a/go/src/qpid.apache.org/electron/connection.go
+++ b/go/src/qpid.apache.org/electron/connection.go
@@ -26,9 +26,10 @@ import (
"crypto/rand"
"encoding/hex"
"net"
- "qpid.apache.org/proton"
"sync"
"time"
+
+ "qpid.apache.org/proton"
)
// Settings associated with a Connection.
@@ -147,7 +148,7 @@ func Password(password []byte) ConnectionOption {
// net.Listener.Accept()
//
func Server() ConnectionOption {
- return func(c *connection) { c.engine.Server(); c.server = true; AllowIncoming()(c) }
+ return func(c *connection) { c.setServer() }
}
// AllowIncoming returns a ConnectionOption to enable incoming endpoints, see
@@ -174,13 +175,13 @@ type connection struct {
defaultSessionOnce, closeOnce sync.Once
- container *container
- conn net.Conn
- server bool
- incoming chan Incoming
- handler *handler
- engine *proton.Engine
- pConnection proton.Connection
+ container *container
+ conn net.Conn
+ server, client bool
+ incoming chan Incoming
+ handler *handler
+ engine *proton.Engine
+ pConnection proton.Connection
defaultSession Session
}
@@ -198,8 +199,13 @@ func NewConnection(conn net.Conn, opts ...ConnectionOption) (*connection, error)
return nil, err
}
c.pConnection = c.engine.Connection()
- for _, set := range opts {
- set(c)
+ for _, opt := range opts {
+ opt(c)
+ // If the first option is not Server(), then we are a client.
+ // Applying Server() after other options is an error
+ if !c.server {
+ c.client = true
+ }
}
if c.container == nil {
// Generate a random container-id. Not an RFC4122-compliant UUID but probably-unique
@@ -216,6 +222,15 @@ func NewConnection(conn net.Conn, opts ...ConnectionOption) (*connection, error)
return c, nil
}
+func (c *connection) setServer() {
+ if c.client {
+ panic("electron.Server() must be first in the ConnectionOption list")
+ }
+ c.server = true
+ c.engine.Server()
+ AllowIncoming()(c)
+}
+
func (c *connection) run() {
if !c.server {
c.pConnection.Open()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/55b27351/go/src/qpid.apache.org/electron/container.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/container.go b/go/src/qpid.apache.org/electron/container.go
index 7c19aa5..9990706 100644
--- a/go/src/qpid.apache.org/electron/container.go
+++ b/go/src/qpid.apache.org/electron/container.go
@@ -21,9 +21,10 @@ package electron
import (
"net"
- "qpid.apache.org/proton"
"strconv"
"sync/atomic"
+
+ "qpid.apache.org/proton"
)
// Container is an AMQP container, it represents a single AMQP "application"
@@ -99,7 +100,7 @@ func (cont *container) Dial(network, address string, opts ...ConnectionOption) (
func (cont *container) Accept(l net.Listener, opts ...ConnectionOption) (c Connection, err error) {
conn, err := l.Accept()
if err == nil {
- c, err = cont.Connection(conn, append(opts, Server())...)
+ c, err = cont.Connection(conn, append([]ConnectionOption{Server()}, opts...)...)
}
return
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[08/12] qpid-proton git commit: PROTON-1956: [go] server does not
close transport on unexpected disconnect
Posted by ac...@apache.org.
PROTON-1956: [go] server does not close transport on unexpected disconnect
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/2a84494c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/2a84494c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/2a84494c
Branch: refs/heads/master
Commit: 2a84494c4befb40c674116aeba7e563e68b9537d
Parents: 84844a9
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Oct 9 16:30:05 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 11 15:14:20 2018 -0400
----------------------------------------------------------------------
go/src/qpid.apache.org/electron/electron_test.go | 7 ++++---
go/src/qpid.apache.org/electron/handler.go | 10 +++++++---
go/src/qpid.apache.org/proton/handlers.go | 7 +++++--
3 files changed, 16 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2a84494c/go/src/qpid.apache.org/electron/electron_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/electron_test.go b/go/src/qpid.apache.org/electron/electron_test.go
index cf881e6..a648a18 100644
--- a/go/src/qpid.apache.org/electron/electron_test.go
+++ b/go/src/qpid.apache.org/electron/electron_test.go
@@ -21,9 +21,10 @@ package electron
import (
"fmt"
- "qpid.apache.org/amqp"
"testing"
"time"
+
+ "qpid.apache.org/amqp"
)
// Send a message one way with a client sender and server receiver, verify ack.
@@ -325,7 +326,7 @@ func TestHeartbeat(t *testing.T) {
unfreeze <- true // Unfreeze the server
<-p.server.Done()
- if amqp.ResourceLimitExceeded != p.server.Error().(amqp.Error).Name {
- t.Error("bad timeout error:", p.server.Error())
+ if p.server.Error() == nil {
+ t.Error("expected server side time-out or connection abort error")
}
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2a84494c/go/src/qpid.apache.org/electron/handler.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/handler.go b/go/src/qpid.apache.org/electron/handler.go
index af1efd6..899d6a9 100644
--- a/go/src/qpid.apache.org/electron/handler.go
+++ b/go/src/qpid.apache.org/electron/handler.go
@@ -126,9 +126,13 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
h.shutdown(proton.EndpointError(e.Connection()))
case proton.MDisconnected:
- err := e.Transport().Condition().Error()
- if err == nil {
- err = amqp.Errorf(amqp.IllegalState, "unexpected disconnect on %s", h.connection)
+ var err error
+ if err = e.Connection().RemoteCondition().Error(); err == nil {
+ if err = e.Connection().Condition().Error(); err == nil {
+ if err = e.Transport().Condition().Error(); err == nil {
+ err = amqp.Errorf(amqp.IllegalState, "unexpected disconnect on %s", h.connection)
+ }
+ }
}
h.shutdown(err)
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2a84494c/go/src/qpid.apache.org/proton/handlers.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/proton/handlers.go b/go/src/qpid.apache.org/proton/handlers.go
index f101548..bbb5014 100644
--- a/go/src/qpid.apache.org/proton/handlers.go
+++ b/go/src/qpid.apache.org/proton/handlers.go
@@ -25,7 +25,6 @@ import "fmt"
type EventHandler interface {
// HandleEvent is called with an event.
// Typically HandleEvent() is implemented as a switch on e.Type()
- // Returning an error will stop the Engine.
HandleEvent(e Event)
}
@@ -38,7 +37,6 @@ type EventHandler interface {
type MessagingHandler interface {
// HandleMessagingEvent is called with MessagingEvent.
// Typically HandleEvent() is implemented as a switch on e.Type()
- // Returning an error will stop the Engine.
HandleMessagingEvent(MessagingEvent, Event)
}
@@ -351,6 +349,11 @@ func (d *MessagingAdapter) HandleEvent(e Event) {
d.outgoing(e)
}
+ case ETransportTailClosed:
+ if !e.Connection().State().RemoteClosed() { // Unexpected transport closed
+ e.Transport().CloseHead() // Complete transport close, no connection close expected
+ }
+
case ETransportClosed:
d.mhandler.HandleMessagingEvent(MDisconnected, e)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[10/12] qpid-proton git commit: PROTON-1910: [go] proton.Link allow
sending/receiving message as bytes
Posted by ac...@apache.org.
PROTON-1910: [go] proton.Link allow sending/receiving message as bytes
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/ef716fa0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/ef716fa0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/ef716fa0
Branch: refs/heads/master
Commit: ef716fa008e709efbab6ed64edfa53f0361c3262
Parents: 2a84494
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Oct 11 15:19:00 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 11 15:19:00 2018 -0400
----------------------------------------------------------------------
go/src/qpid.apache.org/proton/message.go | 42 ++++++++++++++++++++-------
1 file changed, 32 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef716fa0/go/src/qpid.apache.org/proton/message.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/proton/message.go b/go/src/qpid.apache.org/proton/message.go
index fbb1d48..7c166a5 100644
--- a/go/src/qpid.apache.org/proton/message.go
+++ b/go/src/qpid.apache.org/proton/message.go
@@ -26,9 +26,10 @@ import "C"
import (
"fmt"
- "qpid.apache.org/amqp"
"strconv"
"sync/atomic"
+
+ "qpid.apache.org/amqp"
)
// HasMessage is true if all message data is available.
@@ -41,7 +42,23 @@ func (d Delivery) HasMessage() bool { return !d.IsNil() && d.Readable() && !d.Pa
// handling an MMessage event is always a safe context to call this function.
//
// Will return an error if message is incomplete or not current.
-func (delivery Delivery) Message() (m amqp.Message, err error) {
+func (delivery Delivery) Message() (amqp.Message, error) {
+ var err error
+ if bytes, err := delivery.MessageBytes(); err == nil {
+ m := amqp.NewMessage()
+ err = m.Decode(bytes)
+ return m, err
+ }
+ return nil, err
+}
+
+// MessageBytes extracts the raw message bytes contained in a delivery.
+//
+// Must be called in the correct link context with this delivery as the current message,
+// handling an MMessage event is always a safe context to call this function.
+//
+// Will return an error if message is incomplete or not current.
+func (delivery Delivery) MessageBytes() ([]byte, error) {
if !delivery.Readable() {
return nil, fmt.Errorf("delivery is not readable")
}
@@ -53,9 +70,7 @@ func (delivery Delivery) Message() (m amqp.Message, err error) {
if result != len(data) {
return nil, fmt.Errorf("cannot receive message: %s", PnErrorCode(result))
}
- m = amqp.NewMessage()
- err = m.Decode(data)
- return
+ return data, nil
}
// Process-wide atomic counter for generating tag names
@@ -68,15 +83,22 @@ func nextTag() string {
// Send sends a amqp.Message over a Link.
// Returns a Delivery that can be use to determine the outcome of the message.
func (link Link) Send(m amqp.Message) (Delivery, error) {
+ var err error
+ if bytes, err := m.Encode(nil); err == nil {
+ if d, err := link.SendMessageBytes(bytes); err == nil {
+ return d, err
+ }
+ }
+ return Delivery{}, err
+}
+
+// SendMessageBytes sends encoded bytes of an amqp.Message over a Link.
+// Returns a Delivery that can be use to determine the outcome of the message.
+func (link Link) SendMessageBytes(bytes []byte) (Delivery, error) {
if !link.IsSender() {
return Delivery{}, fmt.Errorf("attempt to send message on receiving link")
}
-
delivery := link.Delivery(nextTag())
- bytes, err := m.Encode(nil)
- if err != nil {
- return Delivery{}, fmt.Errorf("cannot send message %s", err)
- }
result := link.SendBytes(bytes)
link.Advance()
if result != len(bytes) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[03/12] qpid-proton git commit: NO-JIRA: longer timeout in heartbeat
test, false failures
Posted by ac...@apache.org.
NO-JIRA: longer timeout in heartbeat test, false failures
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/32c7036c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/32c7036c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/32c7036c
Branch: refs/heads/master
Commit: 32c7036cc5745d55241195e4b36e0b516fd57b2c
Parents: f46076e
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Oct 10 16:12:11 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 11 12:33:08 2018 -0400
----------------------------------------------------------------------
go/src/qpid.apache.org/electron/electron_test.go | 8 ++++----
1 file changed, 4 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/32c7036c/go/src/qpid.apache.org/electron/electron_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/electron_test.go b/go/src/qpid.apache.org/electron/electron_test.go
index c8a51c7..cf881e6 100644
--- a/go/src/qpid.apache.org/electron/electron_test.go
+++ b/go/src/qpid.apache.org/electron/electron_test.go
@@ -291,8 +291,8 @@ func TestConnectionCloseInterrupt2(t *testing.T) {
func TestHeartbeat(t *testing.T) {
p := newSocketPair(t,
- []ConnectionOption{Heartbeat(12 * time.Millisecond)},
- []ConnectionOption{Heartbeat(11 * time.Millisecond)})
+ []ConnectionOption{Heartbeat(102 * time.Millisecond)},
+ []ConnectionOption{Heartbeat(101 * time.Millisecond)})
defer func() { p.close() }()
// Function to freeze the server to stop it sending heartbeats.
@@ -301,8 +301,8 @@ func TestHeartbeat(t *testing.T) {
freeze := func() error { return p.server.(*connection).engine.Inject(func() { <-unfreeze }) }
fatalIf(t, p.client.Sync())
- errorIf(t, checkEqual(11*time.Millisecond, p.client.Connection().Heartbeat()))
- errorIf(t, checkEqual(12*time.Millisecond, p.server.Heartbeat()))
+ errorIf(t, checkEqual(101*time.Millisecond, p.client.Connection().Heartbeat()))
+ errorIf(t, checkEqual(102*time.Millisecond, p.server.Heartbeat()))
// Freeze the server for less than a heartbeat
fatalIf(t, freeze())
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[12/12] qpid-proton git commit: PROTON-1910: [go] move message
encode/decode to handler thread
Posted by ac...@apache.org.
PROTON-1910: [go] move message encode/decode to handler thread
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/4a9f3b98
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/4a9f3b98
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/4a9f3b98
Branch: refs/heads/master
Commit: 4a9f3b986693d61040c1db32f46355d04ce75499
Parents: 886d2b9
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Oct 11 15:23:00 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 11 18:01:44 2018 -0400
----------------------------------------------------------------------
go/src/qpid.apache.org/amqp/message.go | 3 -
go/src/qpid.apache.org/electron/connection.go | 10 +-
go/src/qpid.apache.org/electron/handler.go | 32 +++---
go/src/qpid.apache.org/electron/link.go | 2 +-
go/src/qpid.apache.org/electron/receiver.go | 7 +-
go/src/qpid.apache.org/electron/sender.go | 128 ++++++++++++---------
6 files changed, 106 insertions(+), 76 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a9f3b98/go/src/qpid.apache.org/amqp/message.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/amqp/message.go b/go/src/qpid.apache.org/amqp/message.go
index 919904c..e96b331 100644
--- a/go/src/qpid.apache.org/amqp/message.go
+++ b/go/src/qpid.apache.org/amqp/message.go
@@ -325,9 +325,6 @@ func (m *message) SetApplicationProperties(x map[string]interface{}) {
func (m *message) Marshal(v interface{}) { m.body = v }
func (m *message) Unmarshal(v interface{}) {
- // FIXME aconway 2018-09-28: this is inefficient, replace with a
- // reflective conversion from the existing body value that respects
- // the Unmarshal() rules.
pnData := C.pn_data(2)
marshal(m.body, pnData)
unmarshal(v, pnData)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a9f3b98/go/src/qpid.apache.org/electron/connection.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/connection.go b/go/src/qpid.apache.org/electron/connection.go
index 48cba60..464c4df 100644
--- a/go/src/qpid.apache.org/electron/connection.go
+++ b/go/src/qpid.apache.org/electron/connection.go
@@ -30,6 +30,7 @@ import (
"sync"
"time"
+ "qpid.apache.org/amqp"
"qpid.apache.org/proton"
)
@@ -183,6 +184,7 @@ type connection struct {
handler *handler
engine *proton.Engine
pConnection proton.Connection
+ mc amqp.MessageCodec
defaultSession Session
}
@@ -244,8 +246,12 @@ func (c *connection) run() {
}
func (c *connection) Close(err error) {
- c.err.Set(err)
- c.engine.Close(err)
+ c.closeOnce.Do(func() {
+ c.err.Set(err)
+ c.engine.Close(err)
+ c.mc.Close()
+ })
+
}
func (c *connection) Disconnect(err error) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a9f3b98/go/src/qpid.apache.org/electron/handler.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/handler.go b/go/src/qpid.apache.org/electron/handler.go
index 899d6a9..753f9d3 100644
--- a/go/src/qpid.apache.org/electron/handler.go
+++ b/go/src/qpid.apache.org/electron/handler.go
@@ -27,19 +27,19 @@ import (
// NOTE: methods in this file are called only in the proton goroutine unless otherwise indicated.
type handler struct {
- delegator *proton.MessagingAdapter
- connection *connection
- links map[proton.Link]Endpoint
- sentMessages map[proton.Delivery]sentMessage
- sessions map[proton.Session]*session
+ delegator *proton.MessagingAdapter
+ connection *connection
+ links map[proton.Link]Endpoint
+ sent map[proton.Delivery]*sendable // Waiting for outcome
+ sessions map[proton.Session]*session
}
func newHandler(c *connection) *handler {
h := &handler{
- connection: c,
- links: make(map[proton.Link]Endpoint),
- sentMessages: make(map[proton.Delivery]sentMessage),
- sessions: make(map[proton.Session]*session),
+ connection: c,
+ links: make(map[proton.Link]Endpoint),
+ sent: make(map[proton.Delivery]*sendable),
+ sessions: make(map[proton.Session]*session),
}
h.delegator = proton.NewMessagingAdapter(h)
// Disable auto features of MessagingAdapter, we do these ourselves.
@@ -65,15 +65,15 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
}
case proton.MSettled:
- if sm, ok := h.sentMessages[e.Delivery()]; ok {
+ if sm, ok := h.sent[e.Delivery()]; ok {
d := e.Delivery().Remote()
- sm.ack <- Outcome{sentStatus(d.Type()), d.Condition().Error(), sm.value}
- delete(h.sentMessages, e.Delivery())
+ sm.ack <- Outcome{sentStatus(d.Type()), d.Condition().Error(), sm.v}
+ delete(h.sent, e.Delivery())
}
case proton.MSendable:
if s, ok := h.links[e.Link()].(*sender); ok {
- s.sendable()
+ s.trySend()
} else {
h.linkError(e.Link(), "no sender")
}
@@ -182,10 +182,10 @@ func (h *handler) sessionClosed(ps proton.Session, err error) {
func (h *handler) shutdown(err error) {
err = h.connection.closed(err)
- for _, sm := range h.sentMessages {
+ for _, sm := range h.sent {
// Don't block but ensure outcome is sent eventually.
if sm.ack != nil {
- o := Outcome{Unacknowledged, err, sm.value}
+ o := Outcome{Unacknowledged, err, sm.v}
select {
case sm.ack <- o:
default:
@@ -193,7 +193,7 @@ func (h *handler) shutdown(err error) {
}
}
}
- h.sentMessages = nil
+ h.sent = nil
for _, l := range h.links {
_ = l.closed(err)
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a9f3b98/go/src/qpid.apache.org/electron/link.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/link.go b/go/src/qpid.apache.org/electron/link.go
index de8a995..dd974f5 100644
--- a/go/src/qpid.apache.org/electron/link.go
+++ b/go/src/qpid.apache.org/electron/link.go
@@ -146,7 +146,7 @@ const (
// Messages are sent already settled
SndSettled = SndSettleMode(proton.SndSettled)
// Sender can send either unsettled or settled messages.
- SendMixed = SndSettleMode(proton.SndMixed)
+ SndMixed = SndSettleMode(proton.SndMixed)
)
// RcvSettleMode defines when the receiving end of the link settles message delivery.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a9f3b98/go/src/qpid.apache.org/electron/receiver.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/receiver.go b/go/src/qpid.apache.org/electron/receiver.go
index 8e3b1a7..eba300b 100644
--- a/go/src/qpid.apache.org/electron/receiver.go
+++ b/go/src/qpid.apache.org/electron/receiver.go
@@ -162,7 +162,12 @@ func (r *receiver) message(delivery proton.Delivery) {
return
}
if delivery.HasMessage() {
- m, err := delivery.Message()
+ bytes, err := delivery.MessageBytes()
+ var m amqp.Message
+ if err == nil {
+ m = amqp.NewMessage()
+ err = r.session.connection.mc.Decode(m, bytes)
+ }
if err != nil {
localClose(r.pLink, err)
return
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4a9f3b98/go/src/qpid.apache.org/electron/sender.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/electron/sender.go b/go/src/qpid.apache.org/electron/sender.go
index f46fdc4..98e6c9b 100644
--- a/go/src/qpid.apache.org/electron/sender.go
+++ b/go/src/qpid.apache.org/electron/sender.go
@@ -24,9 +24,10 @@ import "C"
import (
"fmt"
+ "time"
+
"qpid.apache.org/amqp"
"qpid.apache.org/proton"
- "time"
)
// Sender is a Link that sends messages.
@@ -148,54 +149,86 @@ func sentStatus(d uint64) SentStatus {
}
}
-// Sender implementation, held by handler.
+type sendable struct {
+ m amqp.Message
+ ack chan<- Outcome // Channel for acknowledgement of m
+ v interface{} // Correlation value
+ sent chan struct{} // Closed when m is encoded and will be sent
+}
+
+func (sm *sendable) unsent(err error) {
+ Outcome{Unsent, err, sm.v}.send(sm.ack)
+}
+
type sender struct {
link
- credit chan struct{} // Signal available credit.
+ sending []*sendable
}
-func (s *sender) SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, v interface{}, t time.Duration) {
- // wait for credit
- if _, err := timedReceive(s.credit, t); err != nil {
- if err == Closed && s.Error() != nil {
- err = s.Error()
- }
- Outcome{Unsent, err, v}.send(ack)
+func newSender(ls linkSettings) *sender {
+ s := &sender{link: link{linkSettings: ls}}
+ s.endpoint.init(s.link.pLink.String())
+ s.handler().addLink(s.pLink, s)
+ s.link.pLink.Open()
+ return s
+}
+
+// Called in handler goroutine
+func (s *sender) startSend(sm *sendable) {
+ s.sending = append(s.sending, sm)
+ s.trySend()
+}
+
+// Called in handler goroutine
+func (s *sender) trySend() {
+ for s.pLink.Credit() > 0 && len(s.sending) > 0 {
+ sm := s.sending[0]
+ s.sending = s.sending[1:]
+ s.send(sm)
+ }
+}
+
+// Called in handler goroutine with credit > 0
+func (s *sender) send(sm *sendable) {
+ var err error
+ bytes, err := s.session.connection.mc.Encode(sm.m, nil)
+ close(sm.sent) // Safe to re-use sm.m now
+ if err != nil {
+ sm.unsent(err)
return
}
- // Send a message in handler goroutine
- err := s.engine().Inject(func() {
- if s.Error() != nil {
- Outcome{Unsent, s.Error(), v}.send(ack)
- return
- }
+ d, err := s.pLink.SendMessageBytes(bytes)
+ if err != nil {
+ sm.unsent(err)
+ return
+ }
+ if s.SndSettle() == SndSettled || (s.SndSettle() == SndMixed && sm.ack == nil) {
+ d.Settle() // Pre-settled
+ Outcome{Accepted, nil, sm.v}.send(sm.ack) // Assume accepted
+ } else {
+ // Register with handler to receive the remote outcome
+ s.handler().sent[d] = sm
+ }
+}
- delivery, err2 := s.pLink.Send(m)
- switch {
- case err2 != nil:
- Outcome{Unsent, err2, v}.send(ack)
- case ack == nil || s.SndSettle() == SndSettled: // Pre-settled
- if s.SndSettle() != SndUnsettled { // Not forced to send unsettled by link policy
- delivery.Settle()
- }
- Outcome{Accepted, nil, v}.send(ack) // Assume accepted
- default:
- s.handler().sentMessages[delivery] = sentMessage{ack, v} // Register with handler
- }
- if s.pLink.Credit() > 0 { // Signal there is still credit
- s.sendable()
+func (s *sender) timeoutSend(sm *sendable) {
+ for i, sm2 := range s.sending {
+ if sm2 == sm {
+ n := copy(s.sending[i:], s.sending[i+1:])
+ s.sending = s.sending[:i+n] // delete
+ close(sm.sent)
+ return
}
- })
- if err != nil {
- Outcome{Unsent, err, v}.send(ack)
}
}
-// Set credit flag if not already set. Non-blocking, any goroutine
-func (s *sender) sendable() {
- select { // Non-blocking
- case s.credit <- struct{}{}:
- default:
+func (s *sender) SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, v interface{}, t time.Duration) {
+ sm := &sendable{m, ack, v, make(chan struct{})}
+ s.engine().Inject(func() { s.startSend(sm) })
+ select {
+ case <-sm.sent: // OK
+ case <-After(t): // Try to timeout sm
+ s.engine().Inject(func() { s.timeoutSend(sm) })
}
}
@@ -244,24 +277,13 @@ func (s *sender) SendSync(m amqp.Message) Outcome {
// handler goroutine
func (s *sender) closed(err error) error {
- close(s.credit)
+ for _, sm := range s.sending {
+ close(sm.sent)
+ }
+ s.sending = nil
return s.link.closed(err)
}
-func newSender(ls linkSettings) *sender {
- s := &sender{link: link{linkSettings: ls}, credit: make(chan struct{}, 1)}
- s.endpoint.init(s.link.pLink.String())
- s.handler().addLink(s.pLink, s)
- s.link.pLink.Open()
- return s
-}
-
-// sentMessage records a sent message on the handler.
-type sentMessage struct {
- ack chan<- Outcome
- value interface{}
-}
-
// IncomingSender is sent on the Connection.Incoming() channel when there is
// an incoming request to open a sender link.
type IncomingSender struct {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org