You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2015/11/18 20:48:10 UTC
[2/2] qpid-proton git commit: PROTON-1047: go: improved ack handling
in electron API, add to broker example
PROTON-1047: go: improved ack handling in electron API, add to broker example
Sender interface changed
- SendAsync: flexible and efficient server-side ack handling across multiple Senders.
- SendSync, SendForget, SendWaitable: easy-to-use methods for common client cases.
electron broker.go, send.go examples demonstrate async send handling.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/cd8ad96f
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/cd8ad96f
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/cd8ad96f
Branch: refs/heads/master
Commit: cd8ad96f2b3022344198abe8b76f0798758dbe9d
Parents: 41eac74
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Nov 11 15:03:07 2015 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Nov 18 14:27:57 2015 -0500
----------------------------------------------------------------------
examples/go/electron/broker.go | 133 +++++--
examples/go/electron/receive.go | 11 +-
examples/go/electron/send.go | 25 +-
examples/go/proton/broker.go | 9 +-
examples/go/util/queue.go | 10 +
.../go/src/qpid.apache.org/amqp/error.go | 2 +-
.../src/qpid.apache.org/electron/connection.go | 27 +-
.../go/src/qpid.apache.org/electron/doc.go | 3 +-
.../go/src/qpid.apache.org/electron/endpoint.go | 31 +-
.../go/src/qpid.apache.org/electron/handler.go | 16 +-
.../go/src/qpid.apache.org/electron/link.go | 23 +-
.../qpid.apache.org/electron/messaging_test.go | 71 ++--
.../go/src/qpid.apache.org/electron/receiver.go | 29 +-
.../go/src/qpid.apache.org/electron/sender.go | 361 +++++++++----------
.../go/src/qpid.apache.org/electron/session.go | 21 +-
.../go/src/qpid.apache.org/electron/time.go | 3 +-
.../go/src/qpid.apache.org/proton/engine.go | 7 +
17 files changed, 430 insertions(+), 352 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/examples/go/electron/broker.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/broker.go b/examples/go/electron/broker.go
index f68deb1..66941b7 100644
--- a/examples/go/electron/broker.go
+++ b/examples/go/electron/broker.go
@@ -33,6 +33,7 @@ import (
"log"
"net"
"os"
+ "qpid.apache.org/amqp"
"qpid.apache.org/electron"
)
@@ -52,7 +53,12 @@ var qsize = flag.Int("qsize", 1000, "Max queue size")
func main() {
flag.Usage = usage
flag.Parse()
- b := &broker{util.MakeQueues(*qsize), electron.NewContainer("")}
+ b := &broker{
+ queues: util.MakeQueues(*qsize),
+ container: electron.NewContainer(""),
+ acks: make(chan electron.Outcome),
+ sent: make(chan sentMessage),
+ }
if err := b.run(); err != nil {
log.Fatal(err)
}
@@ -60,18 +66,31 @@ func main() {
// State for the broker
type broker struct {
- queues util.Queues
- container electron.Container
+ queues util.Queues // A collection of queues.
+ container electron.Container // electron.Container manages AMQP connections.
+ sent chan sentMessage // Channel to record sent messages.
+ acks chan electron.Outcome // Channel to receive the Outcome of sent messages.
+}
+
+// Record of a sent message and the queue it came from.
+// If a message is rejected or not acknowledged due to a failure, we will put it back on the queue.
+type sentMessage struct {
+ m amqp.Message
+ q util.Queue
}
-// Listens for connections and starts an electron.Connection for each one.
+// run listens for incoming net.Conn connections and starts an electron.Connection for each one.
func (b *broker) run() error {
listener, err := net.Listen("tcp", *addr)
if err != nil {
return err
}
defer listener.Close()
- fmt.Printf("Listening on %s\n", listener.Addr())
+ fmt.Printf("Listening on %v\n", listener.Addr())
+
+ go b.acknowledgements() // Handles acknowledgements for all connections.
+
+ // Start a goroutine for each new connections
for {
conn, err := listener.Accept()
if err != nil {
@@ -83,69 +102,107 @@ func (b *broker) run() error {
util.Debugf("Connection error: %v", err)
continue
}
- go b.accept(c) // Goroutine to accept incoming sessions and links.
+ cc := &connection{b, c}
+ go cc.run() // Handle the connection
util.Debugf("Accepted %v", c)
}
}
-// accept remotely-opened endpoints (Session, Sender and Receiver)
+// State for a broker connectoin
+type connection struct {
+ broker *broker
+ connection electron.Connection
+}
+
+// accept remotely-opened endpoints (Session, Sender and Receiver) on a connection
// and start goroutines to service them.
-func (b *broker) accept(c electron.Connection) {
- for in := range c.Incoming() {
+func (c *connection) run() {
+ for in := range c.connection.Incoming() {
switch in := in.(type) {
case *electron.IncomingSender:
if in.Source() == "" {
- util.Debugf("sender has no source: %s", in)
- break
+ in.Reject(fmt.Errorf("no source"))
+ } else {
+ go c.sender(in.Accept().(electron.Sender))
}
- go b.sender(in.Accept().(electron.Sender))
case *electron.IncomingReceiver:
if in.Target() == "" {
- util.Debugf("receiver has no target: %s", in)
- break
+ in.Reject(fmt.Errorf("no target"))
+ } else {
+ in.SetPrefetch(true)
+ in.SetCapacity(*credit) // Pre-fetch up to credit window.
+ go c.receiver(in.Accept().(electron.Receiver))
}
- in.SetPrefetch(true)
- in.SetCapacity(*credit) // Pre-fetch up to credit window.
- go b.receiver(in.Accept().(electron.Receiver))
default:
in.Accept() // Accept sessions unconditionally
}
}
- util.Debugf("incoming closed: %s", c)
+ util.Debugf("incoming closed: %v", c.connection)
+}
+
+// receiver receives messages and pushes to a queue.
+func (c *connection) receiver(receiver electron.Receiver) {
+ q := c.broker.queues.Get(receiver.Target())
+ for {
+ if rm, err := receiver.Receive(); err == nil {
+ util.Debugf("%v: received %v", receiver, util.FormatMessage(rm.Message))
+ q <- rm.Message
+ rm.Accept()
+ } else {
+ util.Debugf("%v error: %v", receiver, err)
+ break
+ }
+ }
}
// sender pops messages from a queue and sends them.
-func (b *broker) sender(sender electron.Sender) {
- q := b.queues.Get(sender.Source())
+func (c *connection) sender(sender electron.Sender) {
+ q := c.broker.queues.Get(sender.Source())
for {
- m, ok := <-q
- if !ok { // Queue closed
+ if sender.Error() != nil {
+ util.Debugf("%v closed: %v", sender, sender.Error())
return
}
- if err := sender.SendForget(m); err == nil {
- util.Debugf("%s send: %s", sender, util.FormatMessage(m))
- } else {
- util.Debugf("%s error: %s", sender, err)
- q <- m // Put it back on the queue.
- return
+ select {
+
+ case m := <-q:
+ util.Debugf("%v: sent %v", sender, util.FormatMessage(m))
+ sm := sentMessage{m, q}
+ c.broker.sent <- sm // Record sent message
+ sender.SendAsync(m, c.broker.acks, sm) // Receive outcome on c.broker.acks with Value sm
+
+ case <-sender.Done(): // break if sender is closed
+ break
}
}
}
-// receiver receives messages and pushes to a queue.
-func (b *broker) receiver(receiver electron.Receiver) {
- q := b.queues.Get(receiver.Target())
+// acknowledgements keeps track of sent messages and receives outcomes.
+//
+// We could have handled outcomes separately per-connection, per-sender or even
+// per-message. Message outcomes are returned via channels defined by the user
+// so they can be grouped in any way that suits the application.
+func (b *broker) acknowledgements() {
+ sentMap := make(map[sentMessage]bool)
for {
- if rm, err := receiver.Receive(); err == nil {
- util.Debugf("%s: received %s", receiver, util.FormatMessage(rm.Message))
- q <- rm.Message
- rm.Accept()
- } else {
- util.Debugf("%s error: %s", receiver, err)
- break
+ select {
+ case sm, ok := <-b.sent: // A local sender records that it has sent a message.
+ if ok {
+ sentMap[sm] = true
+ } else {
+ return // Closed
+ }
+ case outcome := <-b.acks: // The message outcome is available
+ sm := outcome.Value.(sentMessage)
+ delete(sentMap, sm)
+ if outcome.Status != electron.Accepted { // Error, release or rejection
+ sm.q.PutBack(sm.m) // Put the message back on the queue.
+ util.Debugf("message %v put back, status %v, error %v",
+ util.FormatMessage(sm.m), outcome.Status, outcome.Error)
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/examples/go/electron/receive.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/receive.go b/examples/go/electron/receive.go
index f7d41fa..d631726 100644
--- a/examples/go/electron/receive.go
+++ b/examples/go/electron/receive.go
@@ -85,15 +85,12 @@ func main() {
// Loop receiving messages and sending them to the main() goroutine
for {
- rm, err := r.Receive()
- switch err {
- case electron.Closed:
- util.Debugf("closed %s", urlStr)
+ if rm, err := r.Receive(); err != nil {
+ util.Debugf("closed %v: %v", urlStr, err)
return
- case nil:
+ } else {
+ rm.Accept()
messages <- rm.Message
- default:
- log.Fatal(err)
}
}
}(urlStr)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/examples/go/electron/send.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/send.go b/examples/go/electron/send.go
index c9bdbc9..4c9a0bf 100644
--- a/examples/go/electron/send.go
+++ b/examples/go/electron/send.go
@@ -42,11 +42,6 @@ Send messages to each URL concurrently with body "<url-path>-<n>" where n is the
var count = flag.Int64("count", 1, "Send this may messages per address.")
-type sent struct {
- name string
- sentMessage electron.SentMessage
-}
-
func main() {
flag.Usage = usage
flag.Parse()
@@ -58,9 +53,10 @@ func main() {
os.Exit(1)
}
- sentChan := make(chan sent) // Channel to receive all the delivery receipts.
- var wait sync.WaitGroup // Used by main() to wait for all goroutines to end.
- wait.Add(len(urls)) // Wait for one goroutine per URL.
+ sentChan := make(chan electron.Outcome) // Channel to receive acknowledgements.
+
+ var wait sync.WaitGroup
+ wait.Add(len(urls)) // Wait for one goroutine per URL.
_, prog := path.Split(os.Args[0])
container := electron.NewContainer(fmt.Sprintf("%v:%v", prog, os.Getpid()))
@@ -91,9 +87,7 @@ func main() {
m := amqp.NewMessage()
body := fmt.Sprintf("%v-%v", url.Path, i)
m.Marshal(body)
- sentMessage, err := s.Send(m)
- util.ExitIf(err)
- sentChan <- sent{body, sentMessage}
+ s.SendAsync(m, sentChan, body) // Outcome will be sent to sentChan
}
}(urlStr)
}
@@ -102,12 +96,11 @@ func main() {
expect := int(*count) * len(urls)
util.Debugf("Started senders, expect %v acknowledgements\n", expect)
for i := 0; i < expect; i++ {
- d := <-sentChan
- disposition, err := d.sentMessage.Disposition()
- if err != nil {
- util.Debugf("acknowledgement[%v] %v error: %v\n", i, d.name, err)
+ out := <-sentChan // Outcome of async sends.
+ if out.Error != nil {
+ util.Debugf("acknowledgement[%v] %v error: %v\n", i, out.Value, out.Error)
} else {
- util.Debugf("acknowledgement[%v] %v (%v)\n", i, d.name, disposition)
+ util.Debugf("acknowledgement[%v] %v (%v)\n", i, out.Value, out.Status)
}
}
fmt.Printf("Received all %v acknowledgements\n", expect)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/examples/go/proton/broker.go
----------------------------------------------------------------------
diff --git a/examples/go/proton/broker.go b/examples/go/proton/broker.go
index 3eb5880..6458665 100644
--- a/examples/go/proton/broker.go
+++ b/examples/go/proton/broker.go
@@ -24,6 +24,9 @@ under the License.
// messages to queues or subscribe to receive messages from them.
//
+// TODO: show how to handle acknowledgedments from receivers and put rejected or
+// un-acknowledged messages back on their queues.
+
package main
import (
@@ -277,7 +280,7 @@ func (s *sender) sendable() {
// run runs in a separate goroutine. It monitors the queue for messages and injects
// a function to send them when there is credit
func (s *sender) run() {
- var q chan amqp.Message // q is nil initially as we have no credit.
+ var q util.Queue // q is nil initially as we have no credit.
for {
select {
case _, ok := <-s.credit:
@@ -293,7 +296,7 @@ func (s *sender) run() {
q = nil // Assume all credit will be used used, will be signaled otherwise.
s.h.injecter.Inject(func() { // Inject handler function to actually send
if s.h.senders[s.l] != s { // The sender has been closed by the remote end.
- go func() { q <- m }() // Put the message back on the queue but don't block
+ q.PutBack(m) // Put the message back on the queue but don't block
return
}
if s.sendOne(m) != nil {
@@ -322,7 +325,7 @@ func (s *sender) sendOne(m amqp.Message) error {
delivery.Settle() // Pre-settled, unreliable.
util.Debugf("link %s sent %s", s.l, util.FormatMessage(m))
} else {
- go func() { s.q <- m }() // Put the message back on the queue but don't block
+ s.q.PutBack(m) // Put the message back on the queue, don't block
}
return err
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/examples/go/util/queue.go
----------------------------------------------------------------------
diff --git a/examples/go/util/queue.go b/examples/go/util/queue.go
index d844c0d..2eaba72 100644
--- a/examples/go/util/queue.go
+++ b/examples/go/util/queue.go
@@ -27,6 +27,16 @@ import (
// Use a buffered channel as a very simple queue.
type Queue chan amqp.Message
+// Put a message back on the queue, does not block.
+func (q Queue) PutBack(m amqp.Message) {
+ select {
+ case q <- m:
+ default:
+ // Not an efficient implementation but ensures we don't block the caller.
+ go func() { q <- m }()
+ }
+}
+
// Concurrent-safe map of queues.
type Queues struct {
queueSize int
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/amqp/error.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/error.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/error.go
index 4096cdc..349fc41 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/amqp/error.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/error.go
@@ -39,7 +39,7 @@ import (
type Error struct{ Name, Description string }
// Error implements the Go error interface for AMQP error errors.
-func (c Error) Error() string { return fmt.Sprintf("proton %s: %s", c.Name, c.Description) }
+func (c Error) Error() string { return fmt.Sprintf("%s: %s", c.Name, c.Description) }
// Errorf makes a Error with name and formatted description as per fmt.Sprintf
func Errorf(name, format string, arg ...interface{}) Error {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
index 3462b92..8a9e6cd 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
@@ -69,10 +69,10 @@ type Connection interface {
// to close it with an error. The specific Incoming types have additional
// methods to configure the endpoint.
//
- // Delay in receiving from Incoming() or calling Accept/Reject will block
- // proton. Normally you should have a dedicated goroutine receive from this
- // channel and start a new goroutine to serve each endpoint accepted. The
- // channel is closed when the Connection closes.
+ // Not receiving from Incoming() or not calling Accept/Reject will block the
+ // electron event loop. Normally you would have a dedicated goroutine receive
+ // from Incoming() and start new goroutines to serve each incoming endpoint.
+ // The channel is closed when the Connection closes.
//
Incoming() <-chan Incoming
}
@@ -103,15 +103,13 @@ type connection struct {
incoming chan Incoming
handler *handler
engine *proton.Engine
- err proton.ErrorHolder
eConnection proton.Connection
defaultSession Session
- done chan struct{}
}
func newConnection(conn net.Conn, cont *container, setting ...ConnectionOption) (*connection, error) {
- c := &connection{container: cont, conn: conn, done: make(chan struct{})}
+ c := &connection{container: cont, conn: conn}
c.handler = newHandler(c)
var err error
c.engine, err = proton.NewEngine(c.conn, c.handler.delegator)
@@ -121,12 +119,20 @@ func newConnection(conn net.Conn, cont *container, setting ...ConnectionOption)
for _, set := range setting {
set(c)
}
- c.str = c.engine.String()
+ c.endpoint = makeEndpoint(c.engine.String())
c.eConnection = c.engine.Connection()
- go func() { c.engine.Run(); close(c.done) }()
+ go c.run()
return c, nil
}
+func (c *connection) run() {
+ c.engine.Run()
+ if c.incoming != nil {
+ close(c.incoming)
+ }
+ c.closed(Closed)
+}
+
func (c *connection) Close(err error) { c.err.Set(err); c.engine.Close(err) }
func (c *connection) Disconnect(err error) { c.err.Set(err); c.engine.Disconnect(err) }
@@ -134,6 +140,9 @@ func (c *connection) Disconnect(err error) { c.err.Set(err); c.engine.Disconnect
func (c *connection) Session(setting ...SessionOption) (Session, error) {
var s Session
err := c.engine.InjectWait(func() error {
+ if c.Error() != nil {
+ return c.Error()
+ }
eSession, err := c.engine.Connection().Session()
if err == nil {
eSession.Open()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go b/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
index eaa6e7a..a484900 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
@@ -35,7 +35,8 @@ Send() and Receive() messages.
You can create an AMQP server connection by calling Connection.Server() and
Connection.Listen() before calling Connection.Open(). A server connection can
negotiate protocol security details and can accept incoming links opened from
-the remote end of the connection
+the remote end of the connection.
+
*/
package electron
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go b/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
index 057e572..f04b240 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
@@ -48,15 +48,31 @@ type Endpoint interface {
// Connection containing the endpoint
Connection() Connection
+
+ // Done returns a channel that will close when the endpoint closes.
+ // Error() will contain the reason.
+ Done() <-chan struct{}
}
type endpoint struct {
- err proton.ErrorHolder
- str string // Must be set by the value that embeds endpoint.
+ err proton.ErrorHolder
+ str string // Must be set by the value that embeds endpoint.
+ done chan struct{}
+}
+
+func makeEndpoint(s string) endpoint { return endpoint{str: s, done: make(chan struct{})} }
+
+func (e *endpoint) closed(err error) {
+ e.err.Set(err)
+ e.err.Set(Closed)
+ close(e.done)
}
func (e *endpoint) String() string { return e.str }
-func (e *endpoint) Error() error { return e.err.Get() }
+
+func (e *endpoint) Error() error { return e.err.Get() }
+
+func (e *endpoint) Done() <-chan struct{} { return e.done }
// Call in proton goroutine to close an endpoint locally
// handler will complete the close when remote end closes.
@@ -65,3 +81,12 @@ func localClose(ep proton.Endpoint, err error) {
proton.CloseError(ep, err)
}
}
+
+// Used to indicate that a channel has closed which normally is because the endpoint is closed.
+func errorOrClosed(e Endpoint) error {
+ if e.Error() != nil {
+ return e.Error()
+ } else {
+ return Closed
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
index 1b1164c..1586026 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
@@ -30,7 +30,7 @@ type handler struct {
delegator *proton.MessagingAdapter
connection *connection
links map[proton.Link]Link
- sentMessages map[proton.Delivery]*sentMessage
+ sentMessages map[proton.Delivery]sentMessage
sessions map[proton.Session]*session
}
@@ -38,7 +38,7 @@ func newHandler(c *connection) *handler {
h := &handler{
connection: c,
links: make(map[proton.Link]Link),
- sentMessages: make(map[proton.Delivery]*sentMessage),
+ sentMessages: make(map[proton.Delivery]sentMessage),
sessions: make(map[proton.Session]*session),
}
h.delegator = proton.NewMessagingAdapter(h)
@@ -64,8 +64,10 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
}
case proton.MSettled:
- if sm := h.sentMessages[e.Delivery()]; sm != nil {
- sm.settled(nil)
+ if sm, ok := h.sentMessages[e.Delivery()]; ok {
+ d := e.Delivery().Remote()
+ sm.ack <- Outcome{sentStatus(d.Type()), d.Condition().Error(), sm.value}
+ delete(h.sentMessages, e.Delivery())
}
case proton.MSendable:
@@ -123,15 +125,19 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
h.connection.err.Set(amqp.Errorf(amqp.IllegalState, "unexpected disconnect on %s", h.connection))
err := h.connection.Error()
+
for l, _ := range h.links {
h.linkClosed(l, err)
}
+ h.links = nil
for _, s := range h.sessions {
s.closed(err)
}
+ h.sessions = nil
for _, sm := range h.sentMessages {
- sm.settled(err)
+ sm.ack <- Outcome{Unacknowledged, err, sm.value}
}
+ h.sentMessages = nil
}
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
index cadc2c1..91efa8e 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
@@ -144,7 +144,6 @@ type link struct {
session *session
eLink proton.Link
- done chan struct{} // Closed when link is closed
}
func (l *link) Source() string { return l.source }
@@ -167,7 +166,6 @@ func localLink(sn *session, isSender bool, setting ...LinkOption) (link, error)
isSender: isSender,
capacity: 1,
prefetch: false,
- done: make(chan struct{}),
}
for _, set := range setting {
set(&l)
@@ -188,8 +186,8 @@ func localLink(sn *session, isSender bool, setting ...LinkOption) (link, error)
l.eLink.Target().SetAddress(l.target)
l.eLink.SetSndSettleMode(proton.SndSettleMode(l.sndSettle))
l.eLink.SetRcvSettleMode(proton.RcvSettleMode(l.rcvSettle))
- l.str = l.eLink.String()
l.eLink.Open()
+ l.endpoint = makeEndpoint(l.eLink.String())
return l, nil
}
@@ -213,23 +211,18 @@ func makeIncomingLink(sn *session, eLink proton.Link) incomingLink {
rcvSettle: RcvSettleMode(eLink.RemoteRcvSettleMode()),
capacity: 1,
prefetch: false,
- done: make(chan struct{}),
+ endpoint: makeEndpoint(eLink.String()),
},
}
- l.str = eLink.String()
return l
}
-// Called in proton goroutine on closed or disconnected
-func (l *link) closed(err error) {
- l.err.Set(err)
- l.err.Set(Closed) // If no error set, mark as closed.
- close(l.done)
-}
-
// Not part of Link interface but use by Sender and Receiver.
func (l *link) Credit() (credit int, err error) {
err = l.engine().InjectWait(func() error {
+ if l.Error() != nil {
+ return l.Error()
+ }
credit = l.eLink.Credit()
return nil
})
@@ -240,7 +233,11 @@ func (l *link) Credit() (credit int, err error) {
func (l *link) Capacity() int { return l.capacity }
func (l *link) Close(err error) {
- l.engine().Inject(func() { localClose(l.eLink, err) })
+ l.engine().Inject(func() {
+ if l.Error() == nil {
+ localClose(l.eLink, err)
+ }
+ })
}
func (l *link) open() {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
index 3b5a062..5af57e8 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/messaging_test.go
@@ -80,7 +80,6 @@ func closeClientServer(client Session, server Connection) {
// Send a message one way with a client sender and server receiver, verify ack.
func TestClientSendServerReceive(t *testing.T) {
- timeout := time.Second * 2
nLinks := 3
nMessages := 3
@@ -116,15 +115,14 @@ func TestClientSendServerReceive(t *testing.T) {
for i := 0; i < nLinks; i++ {
for j := 0; j < nMessages; j++ {
- var sm SentMessage
-
// Client send
+ ack := make(chan Outcome, 1)
sendDone := make(chan struct{})
go func() {
defer close(sendDone)
m := amqp.NewMessageWith(fmt.Sprintf("foobar%v-%v", i, j))
var err error
- sm, err = s[i].Send(m)
+ s[i].SendAsync(m, ack, "testing")
if err != nil {
t.Fatal(err)
}
@@ -141,16 +139,19 @@ func TestClientSendServerReceive(t *testing.T) {
// Should not be acknowledged on client yet
<-sendDone
- if d, err := sm.DispositionTimeout(0); err != Timeout || NoDisposition != d {
- t.Errorf("want [no-disposition/timeout] got [%v/%v]", d, err)
+ select {
+ case <-ack:
+ t.Errorf("unexpected ack")
+ default:
}
- // Server ack
- if err := rm.Acknowledge(Rejected); err != nil {
+
+ // Server send ack
+ if err := rm.Reject(); err != nil {
t.Error(err)
}
// Client get ack.
- if d, err := sm.DispositionTimeout(timeout); err != nil || Rejected != d {
- t.Errorf("want [rejected/nil] got [%v/%v]", d, err)
+ if a := <-ack; a.Value != "testing" || a.Error != nil || a.Status != Rejected {
+ t.Error("unexpected ack: ", a.Status, a.Error, a.Value)
}
}
}
@@ -166,12 +167,10 @@ func TestClientReceiver(t *testing.T) {
s := in.Accept().(Sender)
go func() {
for i := int32(0); i < int32(nMessages); i++ {
- sm, err := s.Send(amqp.NewMessageWith(i))
- if err != nil {
- t.Error(err)
+ out := s.SendSync(amqp.NewMessageWith(i))
+ if out.Error != nil {
+ t.Error(out.Error)
return
- } else {
- sm.Disposition() // Sync send.
}
}
s.Close(nil)
@@ -235,10 +234,10 @@ func TestTimeouts(t *testing.T) {
short := time.Millisecond
long := time.Second
m := amqp.NewMessage()
- if _, err = snd.SendTimeout(m, 0); err != Timeout { // No credit, expect timeout.
+ if err := snd.SendSyncTimeout(m, 0).Error; err != Timeout { // No credit, expect timeout.
t.Error("want Timeout got", err)
}
- if _, err = snd.SendTimeout(m, short); err != Timeout { // No credit, expect timeout.
+ if err := snd.SendSyncTimeout(m, short).Error; err != Timeout { // No credit, expect timeout.
t.Error("want Timeout got", err)
}
// Test receive with timeout
@@ -250,17 +249,15 @@ func TestTimeouts(t *testing.T) {
t.Error("want Timeout got", err)
}
// There is now a credit on the link due to receive
- sm, err := snd.SendTimeout(m, long)
- if err != nil {
- t.Fatal(err)
- }
+ ack := make(chan Outcome)
+ snd.SendAsyncTimeout(m, ack, nil, short)
// Disposition should timeout
- if _, err = sm.DispositionTimeout(long); err != Timeout {
- t.Error("want Timeout got", err)
- }
- if _, err = sm.DispositionTimeout(short); err != Timeout {
- t.Error("want Timeout got", err)
+ select {
+ case <-ack:
+ t.Errorf("want Timeout got %#v", ack)
+ case <-time.After(short):
}
+
// Receive and accept
rm, err := rcv.ReceiveTimeout(long)
if err != nil {
@@ -268,9 +265,8 @@ func TestTimeouts(t *testing.T) {
}
rm.Accept()
// Sender get ack
- d, err := sm.DispositionTimeout(long)
- if err != nil || d != Accepted {
- t.Errorf("want (rejected, nil) got (%v, %v)", d, err)
+ if a := <-ack; a.Status != Accepted || a.Error != nil {
+ t.Errorf("want (accepted, nil) got %#v", a)
}
}
@@ -329,7 +325,7 @@ type result struct {
func (r result) String() string { return fmt.Sprintf("%s(%s)", r.err, r.label) }
func doSend(snd Sender, results chan result) {
- _, err := snd.Send(amqp.NewMessage())
+ err := snd.SendSync(amqp.NewMessage()).Error
results <- result{"send", err}
}
@@ -338,9 +334,8 @@ func doReceive(rcv Receiver, results chan result) {
results <- result{"receive", err}
}
-func doDisposition(sm SentMessage, results chan result) {
- _, err := sm.Disposition()
- results <- result{"disposition", err}
+func doDisposition(ack <-chan Outcome, results chan result) {
+ results <- result{"disposition", (<-ack).Error}
}
// Test that closing Links interrupts blocked link functions.
@@ -391,9 +386,8 @@ func TestConnectionCloseInterrupt1(t *testing.T) {
// Connection.Close() interrupts Send, Receive, Disposition.
snd, rcv := pairs.senderReceiver()
go doReceive(rcv, results)
- sm, err := snd.Send(amqp.NewMessage())
- fatalIf(t, err)
- go doDisposition(sm, results)
+ ack := snd.SendWaitable(amqp.NewMessage())
+ go doDisposition(ack, results)
snd, rcv = pairs.senderReceiver()
go doSend(snd, results)
rcv, snd = pairs.receiverSender()
@@ -416,9 +410,8 @@ func TestConnectionCloseInterrupt2(t *testing.T) {
// Connection.Close() interrupts Send, Receive, Disposition.
snd, rcv := pairs.senderReceiver()
go doReceive(rcv, results)
- sm, err := snd.Send(amqp.NewMessage())
- fatalIf(t, err)
- go doDisposition(sm, results)
+ ack := snd.SendWaitable(amqp.NewMessage())
+ go doDisposition(ack, results)
snd, rcv = pairs.senderReceiver()
go doSend(snd, results)
rcv, snd = pairs.receiverSender()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
index 4ff83b4..22bdc7e 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
@@ -77,6 +77,9 @@ type prefetchPolicy struct{}
func (p prefetchPolicy) Flow(r *receiver) {
r.engine().Inject(func() {
+ if r.Error() != nil {
+ return
+ }
_, _, max := r.credit()
if max > 0 {
r.eLink.Flow(max)
@@ -94,6 +97,9 @@ type noPrefetchPolicy struct{ waiting int }
func (p noPrefetchPolicy) Flow(r *receiver) { // Not called in proton goroutine
r.engine().Inject(func() {
+ if r.Error() != nil {
+ return
+ }
len, credit, max := r.credit()
add := p.waiting - (len + credit)
if add > max {
@@ -202,20 +208,23 @@ type ReceivedMessage struct {
receiver Receiver
}
-// Acknowledge a ReceivedMessage with the given disposition code.
-func (rm *ReceivedMessage) Acknowledge(disposition Disposition) error {
- return rm.receiver.(*receiver).engine().InjectWait(func() error {
- // Settle doesn't return an error but if the receiver is broken the settlement won't happen.
- rm.eDelivery.SettleAs(uint64(disposition))
- return rm.receiver.Error()
+// Acknowledge a ReceivedMessage with the given delivery status.
+func (rm *ReceivedMessage) acknowledge(status uint64) error {
+ return rm.receiver.(*receiver).engine().Inject(func() {
+ // Deliveries are valid as long as the connection is, unless settled.
+ rm.eDelivery.SettleAs(uint64(status))
})
}
-// Accept is short for Acknowledge(Accpeted)
-func (rm *ReceivedMessage) Accept() error { return rm.Acknowledge(Accepted) }
+// Accept tells the sender that we take responsibility for processing the message.
+func (rm *ReceivedMessage) Accept() error { return rm.acknowledge(proton.Accepted) }
+
+// Reject tells the sender we consider the message invalid and unusable.
+func (rm *ReceivedMessage) Reject() error { return rm.acknowledge(proton.Rejected) }
-// Reject is short for Acknowledge(Rejected)
-func (rm *ReceivedMessage) Reject() error { return rm.Acknowledge(Rejected) }
+// Release tells the sender we will not process the message but some other
+// receiver might.
+func (rm *ReceivedMessage) Release() error { return rm.acknowledge(proton.Released) }
// IncomingReceiver is sent on the Connection.Incoming() channel when there is
// an incoming request to open a receiver link.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
index 11f019b..573e9da 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
@@ -25,278 +25,237 @@ import "C"
import (
"qpid.apache.org/amqp"
"qpid.apache.org/proton"
- "reflect"
"time"
)
// Sender is a Link that sends messages.
+//
+// The result of sending a message is provided by an Outcome value.
+//
+// A sender can buffer messages up to the credit limit provided by the remote receiver.
+// Send* methods will block if the buffer is full until there is space.
+// Send*Timeout methods will give up after the timeout and set Timeout as Outcome.Error.
+//
type Sender interface {
Link
- // Send a message without waiting for acknowledgement. Returns a SentMessage.
- // use SentMessage.Disposition() to wait for acknowledgement and get the
- // disposition code.
- //
- // If the send buffer is full, send blocks until there is space in the buffer.
- Send(m amqp.Message) (sm SentMessage, err error)
+ // SendSync sends a message and blocks until the message is acknowledged by the remote receiver.
+ // Returns an Outcome, which may contain an error if the message could not be sent.
+ SendSync(m amqp.Message) Outcome
+
+ // SendWaitable puts a message in the send buffer and returns a channel that
+ // you can use to wait for the Outcome of just that message. The channel is
+ // buffered so you can receive from it whenever you want without blocking anything.
+ SendWaitable(m amqp.Message) <-chan Outcome
+
+ // SendForget buffers a message for sending and returns, with no notification of the outcome.
+ SendForget(m amqp.Message)
- // SendTimeout is like send but only waits up to timeout for buffer space.
+ // SendAsync puts a message in the send buffer and returns immediately. An
+ // Outcome with Value = value will be sent to the ack channel when the remote
+ // receiver has acknowledged the message or if there is an error.
//
- // Returns Timeout error if the timeout expires and the message has not been sent.
- SendTimeout(m amqp.Message, timeout time.Duration) (sm SentMessage, err error)
+ // You can use the same ack channel for many calls to SendAsync(), possibly on
+ // many Senders. The channel will receive the outcomes in the order they
+ // become available. The channel should be buffered and/or served by dedicated
+ // goroutines to avoid blocking the connection.
+ //
+ // If ack == nil no Outcome is sent.
+ SendAsync(m amqp.Message, ack chan<- Outcome, value interface{})
- // Send a message and forget it, there will be no acknowledgement.
- // If the send buffer is full, send blocks until there is space in the buffer.
- SendForget(m amqp.Message) error
+ SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, value interface{}, timeout time.Duration)
- // SendForgetTimeout is like send but only waits up to timeout for buffer space.
- // Returns Timeout error if the timeout expires and the message has not been sent.
- SendForgetTimeout(m amqp.Message, timeout time.Duration) error
+ SendWaitableTimeout(m amqp.Message, timeout time.Duration) <-chan Outcome
- // Credit indicates how many messages the receiving end of the link can accept.
- //
- // On a Sender credit can be negative, meaning that messages in excess of the
- // receiver's credit limit have been buffered locally till credit is available.
- Credit() (int, error)
-}
+ SendForgetTimeout(m amqp.Message, timeout time.Duration)
-type sendMessage struct {
- m amqp.Message
- sm SentMessage
+ SendSyncTimeout(m amqp.Message, timeout time.Duration) Outcome
}
-type sender struct {
- link
- credit chan struct{} // Signal available credit.
+// Outcome provides information about the outcome of sending a message.
+type Outcome struct {
+ // Status of the message: was it sent, how was it acknowledged.
+ Status SentStatus
+ // Error is a local error if Status is Unsent or Unacknowledged, a remote error otherwise.
+ Error error
+ // Value provided by the application in SendAsync()
+ Value interface{}
}
-// Disposition indicates the outcome of a settled message delivery.
-type Disposition uint64
+// SentStatus indicates the status of a sent message.
+type SentStatus int
const (
- // No disposition available: pre-settled, not yet acknowledged or an error occurred
- NoDisposition Disposition = 0
+ // Message was never sent
+ Unsent SentStatus = iota
+ // Message was sent but never acknowledged. It may or may not have been received.
+ Unacknowledged
+ // Message was sent pre-settled, no remote outcome is available.
+ Presettled
// Message was accepted by the receiver
- Accepted = proton.Accepted
+ Accepted
// Message was rejected as invalid by the receiver
- Rejected = proton.Rejected
- // Message was not processed by the receiver but may be processed by some other receiver.
- Released = proton.Released
+ Rejected
+ // Message was not processed by the receiver but may be valid for a different receiver
+ Released
+ // Receiver responded with an unrecognized status.
+ Unknown
)
-// String human readable name for a Disposition.
-func (d Disposition) String() string {
- switch d {
- case NoDisposition:
- return "no-disposition"
+// String human readable name for SentStatus.
+func (s SentStatus) String() string {
+ switch s {
+ case Unsent:
+ return "unsent"
+ case Unacknowledged:
+ return "unacknowledged"
case Accepted:
return "accepted"
case Rejected:
return "rejected"
case Released:
return "released"
- default:
+ case Unknown:
return "unknown"
+ default:
+ return "invalid"
}
}
-func (s *sender) Send(m amqp.Message) (SentMessage, error) {
- return s.SendTimeout(m, Forever)
-}
-
-func (s *sender) SendTimeout(m amqp.Message, timeout time.Duration) (SentMessage, error) {
- var sm SentMessage
- if s.sndSettle == SndSettled {
- sm = nil
- } else {
- sm = newSentMessage(s.session.connection)
+// Convert proton delivery state code to SentStatus value
+func sentStatus(d uint64) SentStatus {
+ switch d {
+ case proton.Accepted:
+ return Accepted
+ case proton.Rejected:
+ return Rejected
+ case proton.Released, proton.Modified:
+ return Released
+ default:
+ return Unknown
}
- return s.sendInternal(sendMessage{m, sm}, timeout)
-}
-
-func (s *sender) SendForget(m amqp.Message) error {
- return s.SendForgetTimeout(m, Forever)
}
-func (s *sender) SendForgetTimeout(m amqp.Message, timeout time.Duration) error {
- snd := sendMessage{m, nil}
- _, err := s.sendInternal(snd, timeout)
- return err
+// Sender implementation, held by handler.
+type sender struct {
+ link
+ credit chan struct{} // Signal available credit.
}
-func (s *sender) sendInternal(snd sendMessage, timeout time.Duration) (SentMessage, error) {
- if _, err := timedReceive(s.credit, timeout); err != nil { // Wait for credit
- if err == Closed {
+func (s *sender) SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, v interface{}, t time.Duration) {
+ // wait for credit
+ if _, err := timedReceive(s.credit, t); err != nil {
+ if err == Closed && s.Error != nil {
err = s.Error()
- assert(err != nil)
}
- return nil, err
+ ack <- Outcome{Unsent, err, v}
+ return
}
- if err := s.engine().Inject(func() { s.doSend(snd) }); err != nil {
- return nil, err
- }
- return snd.sm, nil
-}
-
-// Send a message. Handler goroutine
-func (s *sender) doSend(snd sendMessage) {
- delivery, err := s.eLink.Send(snd.m)
- switch sm := snd.sm.(type) {
- case nil:
- delivery.Settle()
- case *sentMessage:
- sm.delivery = delivery
- if err != nil {
- sm.settled(err)
- } else {
- s.handler().sentMessages[delivery] = sm
+ // Send a message in handler goroutine
+ err := s.engine().Inject(func() {
+ if s.Error() != nil {
+ if ack != nil {
+ ack <- Outcome{Unsent, s.Error(), v}
+ }
+ return
}
- default:
- assert(false, "bad SentMessage type %T", snd.sm)
- }
- if s.eLink.Credit() > 0 {
- s.sendable() // Signal credit.
+ if delivery, err := s.eLink.Send(m); err == nil {
+ if ack != nil { // We must report an outcome
+ if s.SndSettle() == SndSettled {
+ delivery.Settle() // Pre-settle if required
+ ack <- Outcome{Presettled, nil, v}
+ } else {
+ s.handler().sentMessages[delivery] = sentMessage{ack, v}
+ }
+ } else { // ack == nil, can't report outcome
+ if s.SndSettle() != SndUnsettled { // Pre-settle unless we are forced not to.
+ delivery.Settle()
+ }
+ }
+ } else { // err != nil
+ if ack != nil {
+ ack <- Outcome{Unsent, err, v}
+ }
+ }
+ if s.eLink.Credit() > 0 { // Signal there is still credit
+ s.sendable()
+ }
+ })
+ if err != nil && ack != nil {
+ ack <- Outcome{Unsent, err, v}
}
}
-// Signal the sender has credit. Any goroutine.
+// Set credit flag if not already set. Non-blocking, any goroutine
func (s *sender) sendable() {
select { // Non-blocking
- case s.credit <- struct{}{}: // Set the flag if not already set.
+ case s.credit <- struct{}{}:
default:
}
}
-func (s *sender) closed(err error) {
- s.link.closed(err)
- close(s.credit)
+func (s *sender) SendWaitableTimeout(m amqp.Message, t time.Duration) <-chan Outcome {
+ out := make(chan Outcome, 1)
+ s.SendAsyncTimeout(m, out, nil, t)
+ return out
}
-func newSender(l link) *sender {
- s := &sender{link: l, credit: make(chan struct{}, 1)}
- s.handler().addLink(s.eLink, s)
- s.link.open()
- return s
-}
-
-// SentMessage represents a previously sent message. It allows you to wait for acknowledgement.
-type SentMessage interface {
-
- // Disposition blocks till the message is acknowledged and returns the
- // disposition state.
- //
- // NoDisposition with Error() != nil means the Connection was closed before
- // the message was acknowledged.
- //
- // NoDisposition with Error() == nil means the message was pre-settled or
- // Forget() was called.
- Disposition() (Disposition, error)
-
- // DispositionTimeout is like Disposition but gives up after timeout, see Timeout.
- DispositionTimeout(time.Duration) (Disposition, error)
-
- // Forget interrupts any call to Disposition on this SentMessage and tells the
- // peer we are no longer interested in the disposition of this message.
- Forget()
-
- // Error returns the error that closed the disposition, or nil if there was no error.
- // If the disposition closed because the connection closed, it will return Closed.
- Error() error
-
- // Value is an optional value you wish to associate with the SentMessage. It
- // can be the message itself or some form of identifier.
- Value() interface{}
- SetValue(interface{})
-}
-
-// SentMessageSet is a concurrent-safe set of sent messages that can be checked
-// to get the next completed sent message
-type SentMessageSet struct {
- cases []reflect.SelectCase
- sm []SentMessage
- done chan SentMessage
-}
-
-func (s *SentMessageSet) Add(sm SentMessage) {
- s.sm = append(s.sm, sm)
- s.cases = append(s.cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sm.(*sentMessage).done)})
+func (s *sender) SendForgetTimeout(m amqp.Message, t time.Duration) {
+ s.SendAsyncTimeout(m, nil, nil, t)
}
-// Wait waits up to timeout and returns the next SentMessage that has a valid dispositionb
-// or an error.
-func (s *SentMessageSet) Wait(sm SentMessage, timeout time.Duration) (SentMessage, error) {
- s.cases = s.cases[:len(s.sm)] // Remove previous timeout cases
- if timeout == 0 { // Non-blocking
- s.cases = append(s.cases, reflect.SelectCase{Dir: reflect.SelectDefault})
- } else {
- s.cases = append(s.cases,
- reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(After(timeout))})
+func (s *sender) SendSyncTimeout(m amqp.Message, t time.Duration) Outcome {
+ deadline := time.Now().Add(t)
+ ack := s.SendWaitableTimeout(m, t)
+ t = deadline.Sub(time.Now()) // Adjust for time already spent.
+ if t < 0 {
+ t = 0
}
- chosen, _, _ := reflect.Select(s.cases)
- if chosen > len(s.sm) {
- return nil, Timeout
+ if out, err := timedReceive(ack, t); err == nil {
+ return out.(Outcome)
} else {
- sm := s.sm[chosen]
- s.sm = append(s.sm[:chosen], s.sm[chosen+1:]...)
- return sm, nil
+ if err == Closed && s.Error() != nil {
+ err = s.Error()
+ }
+ return Outcome{Unacknowledged, err, nil}
}
}
-// SentMessage implementation
-type sentMessage struct {
- connection *connection
- done chan struct{}
- delivery proton.Delivery
- disposition Disposition
- err error
- value interface{}
+func (s *sender) SendAsync(m amqp.Message, ack chan<- Outcome, v interface{}) {
+ s.SendAsyncTimeout(m, ack, v, Forever)
}
-func newSentMessage(c *connection) *sentMessage {
- return &sentMessage{connection: c, done: make(chan struct{})}
+func (s *sender) SendWaitable(m amqp.Message) <-chan Outcome {
+ return s.SendWaitableTimeout(m, Forever)
}
-func (sm *sentMessage) SetValue(v interface{}) { sm.value = v }
-func (sm *sentMessage) Value() interface{} { return sm.value }
-func (sm *sentMessage) Disposition() (Disposition, error) {
- <-sm.done
- return sm.disposition, sm.err
+func (s *sender) SendForget(m amqp.Message) {
+ s.SendForgetTimeout(m, Forever)
}
-func (sm *sentMessage) DispositionTimeout(timeout time.Duration) (Disposition, error) {
- if _, err := timedReceive(sm.done, timeout); err == Timeout {
- return sm.disposition, Timeout
- } else {
- return sm.disposition, sm.err
- }
+func (s *sender) SendSync(m amqp.Message) Outcome {
+ return <-s.SendWaitable(m)
}
-func (sm *sentMessage) Forget() {
- sm.connection.engine.Inject(func() {
- sm.delivery.Settle()
- delete(sm.connection.handler.sentMessages, sm.delivery)
- })
- sm.finish()
+// handler goroutine
+func (s *sender) closed(err error) {
+ s.link.closed(err)
+ close(s.credit)
}
-func (sm *sentMessage) settled(err error) {
- if sm.delivery.Settled() {
- sm.disposition = Disposition(sm.delivery.Remote().Type())
- }
- sm.err = err
- sm.finish()
+func newSender(l link) *sender {
+ s := &sender{link: l, credit: make(chan struct{}, 1)}
+ s.handler().addLink(s.eLink, s)
+ s.link.open()
+ return s
}
-func (sm *sentMessage) finish() {
- select {
- case <-sm.done: // No-op if already closed
- default:
- close(sm.done)
- }
+// sentMessage records a sent message on the handler.
+type sentMessage struct {
+ ack chan<- Outcome
+ value interface{}
}
-func (sm *sentMessage) Error() error { return sm.err }
-
// IncomingSender is sent on the Connection.Incoming() channel when there is
// an incoming request to open a sender link.
type IncomingSender struct {
@@ -307,3 +266,9 @@ type IncomingSender struct {
func (in *IncomingSender) Accept() Endpoint {
return in.accept(func() Endpoint { return newSender(in.link) })
}
+
+// Call in injected functions to check if the sender is valid.
+func (s *sender) valid() bool {
+ s2, ok := s.handler().links[s.eLink].(*sender)
+ return ok && s2 == s
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
index d347c99..18d8bc8 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
@@ -53,7 +53,7 @@ func newSession(c *connection, es proton.Session, setting ...SessionOption) *ses
s := &session{
connection: c,
eSession: es,
- endpoint: endpoint{str: es.String()},
+ endpoint: makeEndpoint(es.String()),
}
for _, set := range setting {
set(s)
@@ -67,12 +67,20 @@ func newSession(c *connection, es proton.Session, setting ...SessionOption) *ses
func (s *session) Connection() Connection { return s.connection }
func (s *session) eEndpoint() proton.Endpoint { return s.eSession }
func (s *session) engine() *proton.Engine { return s.connection.engine }
+
func (s *session) Close(err error) {
- s.engine().Inject(func() { localClose(s.eSession, err) })
+ s.engine().Inject(func() {
+ if s.Error() == nil {
+ localClose(s.eSession, err)
+ }
+ })
}
func (s *session) Sender(setting ...LinkOption) (snd Sender, err error) {
err = s.engine().InjectWait(func() error {
+ if s.Error() != nil {
+ return s.Error()
+ }
l, err := localLink(s, true, setting...)
if err == nil {
snd = newSender(l)
@@ -84,6 +92,9 @@ func (s *session) Sender(setting ...LinkOption) (snd Sender, err error) {
func (s *session) Receiver(setting ...LinkOption) (rcv Receiver, err error) {
err = s.engine().InjectWait(func() error {
+ if s.Error() != nil {
+ return s.Error()
+ }
l, err := localLink(s, false, setting...)
if err == nil {
rcv = newReceiver(l)
@@ -93,12 +104,6 @@ func (s *session) Receiver(setting ...LinkOption) (rcv Receiver, err error) {
return
}
-// Called from handler on closed.
-func (s *session) closed(err error) {
- s.err.Set(err)
- s.err.Set(Closed)
-}
-
// IncomingSender is sent on the Connection.Incoming() channel when there is an
// incoming request to open a session.
type IncomingSession struct {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/electron/time.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/time.go b/proton-c/bindings/go/src/qpid.apache.org/electron/time.go
index 0015185..51bfbc5 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/time.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/time.go
@@ -21,6 +21,7 @@ package electron
import (
"fmt"
+ "math"
"reflect"
"time"
)
@@ -41,7 +42,7 @@ import (
var Timeout = fmt.Errorf("timeout")
// Forever can be used as a timeout parameter to indicate wait forever.
-const Forever time.Duration = -1
+const Forever time.Duration = math.MaxInt64
// timedReceive receives on channel (which can be a chan of any type), waiting
// up to timeout.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/cd8ad96f/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
index 95d70e9..2e67ef7 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
@@ -315,6 +315,13 @@ func (eng *Engine) Run() error {
wait.Wait()
close(eng.running) // Signal goroutines have exited and Error is set.
+ // Execute any injected functions for side effects on application data structures.
+ inject := eng.inject
+ eng.inject = nil // Further calls to Inject() will return an error.
+ for f := range inject {
+ f()
+ }
+
if !eng.connection.IsNil() {
eng.connection.Free()
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org