You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2015/10/23 00:16:18 UTC
qpid-proton git commit: NO-JIRA: go: Bug fixes and improved examples.
Repository: qpid-proton
Updated Branches:
refs/heads/master 6a306163f -> bc0a242e4
NO-JIRA: go: Bug fixes and improved examples.
package proton:
- Injecter() provided by event rather than connection. Allow different event-loop strategies.
- Add access to proton refcounts, may be useful for some apps.
package electron:
- simplified sender logic using credit flag.
- consistent link, session and connection options.
examples: simplified & improved
proton/broker: concurrent broker using handler per connection.
electron/broker: cleaned up for comparison to proton/broker.
examples/README.md discussion of brokers.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/bc0a242e
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/bc0a242e
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/bc0a242e
Branch: refs/heads/master
Commit: bc0a242e4cc335f2aa496b98bf06d033904ca23d
Parents: 6a30616
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Oct 13 10:31:01 2015 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 22 18:14:17 2015 -0400
----------------------------------------------------------------------
examples/go/CMakeLists.txt | 9 +-
examples/go/README.md | 31 ++
examples/go/electron/broker.go | 67 ++--
examples/go/electron/receive.go | 5 +-
examples/go/electron/send.go | 2 -
examples/go/example_test.go | 57 ++-
examples/go/proton/broker.go | 389 ++++++++++---------
examples/go/util/util.go | 2 +-
proton-c/bindings/go/CMakeLists.txt | 7 +-
.../src/qpid.apache.org/electron/connection.go | 151 ++++---
.../src/qpid.apache.org/electron/container.go | 6 +-
.../go/src/qpid.apache.org/electron/handler.go | 102 +++--
.../go/src/qpid.apache.org/electron/link.go | 67 ++--
.../go/src/qpid.apache.org/electron/receiver.go | 60 +--
.../go/src/qpid.apache.org/electron/sender.go | 120 +++---
.../go/src/qpid.apache.org/electron/session.go | 49 ++-
.../go/src/qpid.apache.org/electron/time.go | 19 +-
.../go/src/qpid.apache.org/proton/doc.go | 29 +-
.../go/src/qpid.apache.org/proton/engine.go | 71 ++--
.../go/src/qpid.apache.org/proton/handlers.go | 18 +-
.../go/src/qpid.apache.org/proton/wrappers.go | 98 +++--
.../qpid/proton/amqp/messaging/Terminus.java | 2 +-
22 files changed, 736 insertions(+), 625 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/examples/go/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/go/CMakeLists.txt b/examples/go/CMakeLists.txt
index c345523..32be548 100644
--- a/examples/go/CMakeLists.txt
+++ b/examples/go/CMakeLists.txt
@@ -20,6 +20,7 @@
if(BUILD_GO)
set(examples electron/broker electron/receive electron/send proton/broker)
+ file(GLOB_RECURSE example_source ${CMAKE_CURRENT_SOURCE_DIR}/*.go)
# Build example exes
foreach(example ${examples})
@@ -28,7 +29,8 @@ if(BUILD_GO)
add_custom_command(
OUTPUT ${target}
COMMAND ${GO_BUILD} ${GO_EXAMPLE_FLAGS} -o ${target} ${source}
- DEPENDS ${source} ${GO_TARGETS})
+ DEPENDS ${example_source} ${GO_TARGETS}
+ WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
list(APPEND example_exes ${target})
endforeach()
@@ -36,8 +38,9 @@ if(BUILD_GO)
set(test_exe ${CMAKE_CURRENT_BINARY_DIR}/example_test)
add_custom_command(
OUTPUT ${test_exe}
- DEPENDS ${example_exes}
- COMMAND ${GO_TEST} -c -o ${test_exe} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.go)
+ DEPENDS ${example_exes} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.go
+ COMMAND ${GO_TEST} -c -o ${test_exe} ${CMAKE_CURRENT_SOURCE_DIR}/example_test.go
+ WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
add_custom_target(go-test-exe ALL DEPENDS ${test_exe})
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/examples/go/README.md
----------------------------------------------------------------------
diff --git a/examples/go/README.md b/examples/go/README.md
index 0114d0e..ce9206b 100644
--- a/examples/go/README.md
+++ b/examples/go/README.md
@@ -87,3 +87,34 @@ Or use the Go broker and the python clients:
python ../python/simple_send.py
python ../python/simple_recv.py
+
+
+## A tale of two brokers.
+
+The `proton` and `electron` packages provide two alternate APIs for AMQP applications.
+See [the proton Go README](https://github.com/apache/qpid-proton/blob/master/proton-c/bindings/go/src/qpid.apache.org/README.md) for a discussion
+of why there are two APIs.
+
+The examples `proton/broker.go` and `electron/broker.go` both implement the same
+simple broker-like functionality using each of the two APIs. They both handle
+multiple connections concurrently and store messages on bounded queues
+implemented by Go channels.
+
+However the `electron/broker` is less than half as long as the `proton/broker`
+illustrating why it is better suited for most Go applications.
+
+`proton/broker` must explicitly handle proton events, which are processed in a
+single goroutine per connection since proton is not concurrent safe. Each
+connection uses channels to exchange messages between the event-handling
+goroutine and the shared queues that are accessible to all connections. Sending
+messages is particularly tricky since we must monitor the queue for available
+messages and the sending link for available credit in order to send messages.
+
+
+`electron/broker` takes advantage of the `electron` package, which hides all the
+event handling and passing of messages between goroutines beind behind
+straightforward interfaces for sending and receiving messages. The electron
+broker can implement links as simple goroutines that loop popping messages from
+a queue and sending them or receiving messages and pushing them to a queue.
+
+
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/examples/go/electron/broker.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/broker.go b/examples/go/electron/broker.go
index 1e4a931..f1dce17 100644
--- a/examples/go/electron/broker.go
+++ b/examples/go/electron/broker.go
@@ -52,21 +52,20 @@ var qsize = flag.Int("qsize", 1000, "Max queue size")
func main() {
flag.Usage = usage
flag.Parse()
- if err := newBroker().run(); err != nil {
+ b := &broker{util.MakeQueues(*qsize), electron.NewContainer("")}
+ if err := b.run(); err != nil {
log.Fatal(err)
}
}
+// State for the broker
type broker struct {
queues util.Queues
container electron.Container
}
-func newBroker() *broker {
- return &broker{util.MakeQueues(*qsize), electron.NewContainer("")}
-}
-
-func (b *broker) run() (err error) {
+// Listens for connections and starts an electron.Connection for each one.
+func (b *broker) run() error {
listener, err := net.Listen("tcp", *addr)
if err != nil {
return err
@@ -76,46 +75,29 @@ func (b *broker) run() (err error) {
for {
conn, err := listener.Accept()
if err != nil {
- util.Debugf("Accept error: %s", err)
+ util.Debugf("Accept error: %v", err)
continue
}
- if err := b.connection(conn); err != nil {
- if err != nil {
- util.Debugf("Connection error: %s", err)
- continue
- }
+ c, err := b.container.Connection(conn, electron.Server(), electron.Accepter(b.accept))
+ if err != nil {
+ util.Debugf("Connection error: %v", err)
+ continue
}
+ util.Debugf("Accepted %v", c)
}
}
-// connection creates a new AMQP connection for a net.Conn.
-func (b *broker) connection(conn net.Conn) error {
- c, err := b.container.Connection(conn)
- if err != nil {
- return err
- }
- c.Server() // Enable server-side protocol negotiation.
- c.Listen(b.accept) // Call accept() for remotely-opened endpoints.
- if err := c.Open(); err != nil {
- return err
- }
- util.Debugf("Accepted %s", c)
- return nil
-}
-
// accept remotely-opened endpoints (Session, Sender and Receiver)
// and start goroutines to service them.
-func (b *broker) accept(ep electron.Endpoint) error {
- switch ep := ep.(type) {
- case electron.Sender:
- util.Debugf("%s opened", ep)
- go b.sender(ep)
- case electron.Receiver:
- util.Debugf("%s opened", ep)
- ep.SetCapacity(100, true) // Pre-fetch 100 messages
- go b.receiver(ep)
+func (b *broker) accept(i electron.Incoming) {
+ switch i := i.(type) {
+ case *electron.IncomingSender:
+ go b.sender(i.AcceptSender())
+ case *electron.IncomingReceiver:
+ go b.receiver(i.AcceptReceiver(100, true)) // Pre-fetch 100 messages
+ default:
+ i.Accept()
}
- return nil
}
// sender pops messages from a queue and sends them.
@@ -127,17 +109,16 @@ func (b *broker) sender(sender electron.Sender) {
return
}
if err := sender.SendForget(m); err == nil {
- util.Debugf("send %s: %s", sender, util.FormatMessage(m))
+ util.Debugf("%s send: %s", sender, util.FormatMessage(m))
} else {
- util.Debugf("send error %s: %s", sender, err)
+ util.Debugf("%s error: %s", sender, err)
q <- m // Put it back on the queue.
- break
+ return
}
}
}
-// receiver receives messages and pushes to the queue named by the receivers's
-// Target address
+// receiver receives messages and pushes to a queue.
func (b *broker) receiver(receiver electron.Receiver) {
q := b.queues.Get(receiver.Target())
for {
@@ -146,7 +127,7 @@ func (b *broker) receiver(receiver electron.Receiver) {
q <- rm.Message
rm.Accept()
} else {
- util.Debugf("%s: error %s", receiver, err)
+ util.Debugf("%s error: %s", receiver, err)
break
}
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/examples/go/electron/receive.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/receive.go b/examples/go/electron/receive.go
index e450a75..f7d41fa 100644
--- a/examples/go/electron/receive.go
+++ b/examples/go/electron/receive.go
@@ -77,8 +77,6 @@ func main() {
conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port"
util.ExitIf(err)
c, err := container.Connection(conn)
- util.ExitIf(err)
- util.ExitIf(c.Open())
connections <- c // Save connection so we can Close() when main() ends
// Create a Receiver using the path of the URL as the source address
@@ -106,9 +104,8 @@ func main() {
// print each message until the count is exceeded.
for i := uint64(0); i < *count; i++ {
- util.Debugf("pre (%d/%d)\n", i, *count)
m := <-messages
- util.Debugf("%s (%d/%d)\n", util.FormatMessage(m), i, *count)
+ util.Debugf("%s\n", util.FormatMessage(m))
}
fmt.Printf("Received %d messages\n", *count)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/examples/go/electron/send.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/send.go b/examples/go/electron/send.go
index 6b7aec1..c9bdbc9 100644
--- a/examples/go/electron/send.go
+++ b/examples/go/electron/send.go
@@ -80,8 +80,6 @@ func main() {
util.ExitIf(err)
c, err := container.Connection(conn)
util.ExitIf(err)
- err = c.Open()
- util.ExitIf(err)
connections <- c // Save connection so we can Close() when main() ends
// Create a Sender using the path of the URL as the AMQP address
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/examples/go/example_test.go
----------------------------------------------------------------------
diff --git a/examples/go/example_test.go b/examples/go/example_test.go
index 1e497b9..006e17c 100644
--- a/examples/go/example_test.go
+++ b/examples/go/example_test.go
@@ -28,6 +28,7 @@ import (
"flag"
"fmt"
"io"
+ "log"
"math/rand"
"net"
"os"
@@ -35,7 +36,6 @@ import (
"path"
"path/filepath"
"reflect"
- "strings"
"testing"
"time"
)
@@ -108,27 +108,8 @@ func checkEqual(want interface{}, got interface{}) error {
return fmt.Errorf("%#v != %#v", want, got)
}
-// 'go build' uses the installed copy of the proton Go libraries, which may be out of date.
-func checkStaleLibs(t *testing.T) {
- var stale []string
- pp := "qpid.apache.org"
- for _, p := range []string{pp + "/proton", pp + "/amqp", pp + "/electron"} {
- out, err := exec.Command("go", "list", "-f", "{{.Stale}}", p).CombinedOutput()
- if err != nil {
- t.Fatalf("failed to execute 'go list': %v\n%v", err, string(out))
- }
- if string(out) != "false\n" {
- stale = append(stale, p)
- }
- }
- if len(stale) > 0 {
- t.Fatalf("Stale libraries, run 'go install %s'", strings.Trim(fmt.Sprint(stale), "[]"))
- }
-}
-
// exampleCommand returns an exec.Cmd to run an example.
func exampleCommand(t *testing.T, prog string, arg ...string) (cmd *exec.Cmd) {
- checkStaleLibs(t)
args := []string{}
if *debug {
args = append(args, "-debug=true")
@@ -230,6 +211,7 @@ func goReceiveWant(t *testing.T, errchan chan<- error, want string, arg ...strin
errchan <- ready
buf := bytes.Buffer{}
io.Copy(&buf, out) // Collect the rest of the output
+ cmd.Wait()
errchan <- checkEqual(want, buf.String())
close(errchan)
}()
@@ -242,26 +224,30 @@ func TestExampleReceiveSend(t *testing.T) {
t.Skip("Skip demo tests in short mode")
}
testBroker.start(t)
- recvErr := make(chan error)
- recvCmd := goReceiveWant(
- t, recvErr,
- fmt.Sprintf("Received %d messages\n", expected),
- exampleArgs(fmt.Sprintf("-count=%d", expected))...)
- defer func() {
- recvCmd.Process.Kill()
- recvCmd.Wait()
- }()
- if err := <-recvErr; err != ready { // Wait for receiver ready
+
+ // Start receiver, wait for "listening" message on stdout
+ recvCmd := exampleCommand(t, "receive", exampleArgs(fmt.Sprintf("-count=%d", expected))...)
+ pipe, err := recvCmd.StdoutPipe()
+ if err != nil {
t.Fatal(err)
}
- err := runExampleWant(t,
+ recvCmd.Start()
+ out := bufio.NewReader(pipe)
+ line, err := out.ReadString('\n')
+ if err := checkEqual("Listening on 3 connections\n", line); err != nil {
+ t.Fatal(err)
+ }
+
+ if err := runExampleWant(t,
fmt.Sprintf("Received all %d acknowledgements\n", expected),
"send",
- exampleArgs("-count", fmt.Sprintf("%d", *count))...)
- if err != nil {
+ exampleArgs("-count", fmt.Sprintf("%d", *count))...); err != nil {
t.Fatal(err)
}
- if err := <-recvErr; err != nil {
+
+ buf := bytes.Buffer{}
+ io.Copy(&buf, out)
+ if err := checkEqual(fmt.Sprintf("Received %d messages\n", expected), buf.String()); err != nil {
t.Fatal(err)
}
}
@@ -276,6 +262,9 @@ var dir = flag.String("dir", "", "Directory containing example sources or binari
var expected int
func TestMain(m *testing.M) {
+ if out, err := exec.Command("go", "install", "qpid.apache.org/...").CombinedOutput(); err != nil {
+ log.Fatalf("go install failed: %s\n%s", err, out)
+ }
expected = (*count) * (*connections)
rand.Seed(time.Now().UTC().UnixNano())
testBroker = &broker{} // Broker is started on-demand by tests.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/examples/go/proton/broker.go
----------------------------------------------------------------------
diff --git a/examples/go/proton/broker.go b/examples/go/proton/broker.go
index 75f14f5..3eb5880 100644
--- a/examples/go/proton/broker.go
+++ b/examples/go/proton/broker.go
@@ -30,7 +30,7 @@ import (
"./util"
"flag"
"fmt"
- "io"
+ "log"
"net"
"os"
"qpid.apache.org/amqp"
@@ -53,247 +53,276 @@ var qsize = flag.Int("qsize", 1000, "Max queue size")
func main() {
flag.Usage = usage
flag.Parse()
+ b := &broker{util.MakeQueues(*qsize)}
+ if err := b.run(); err != nil {
+ log.Fatal(err)
+ }
+}
+
+// State for the broker
+type broker struct {
+ queues util.Queues
+}
- b := newBroker()
+// Listens for connections and starts a proton.Engine for each one.
+func (b *broker) run() error {
listener, err := net.Listen("tcp", *addr)
- util.ExitIf(err)
+ if err != nil {
+ return err
+ }
defer listener.Close()
fmt.Printf("Listening on %s\n", listener.Addr())
-
- // Loop accepting new connections.
for {
conn, err := listener.Accept()
if err != nil {
- util.Debugf("Accept error: %s", err)
+ util.Debugf("Accept error: %v", err)
continue
}
- if err := b.connection(conn); err != nil {
- if err != nil {
- util.Debugf("Connection error: %s", err)
- continue
- }
+ adapter := proton.NewMessagingAdapter(newHandler(&b.queues))
+ // We want to accept messages when they are enqueued, not just when they
+ // are received, so we turn off auto-accept and prefetch by the adapter.
+ adapter.Prefetch = 0
+ adapter.AutoAccept = false
+ engine, err := proton.NewEngine(conn, adapter)
+ if err != nil {
+ util.Debugf("Connection error: %v", err)
+ continue
}
+ engine.Server() // Enable server-side protocol negotiation.
+ util.Debugf("Accepted connection %s", engine)
+ go func() { // Start goroutine to run the engine event loop
+ engine.Run()
+ util.Debugf("Closed %s", engine)
+ }()
}
}
-type broker struct {
- queues util.Queues
-}
-
-func newBroker() *broker {
- return &broker{util.MakeQueues(*qsize)}
-}
-
-// connection creates a new AMQP connection for a net.Conn.
-func (b *broker) connection(conn net.Conn) error {
- delegator := proton.NewMessagingDelegator(newHandler(&b.queues, *credit))
- // We want to accept messages when they are enqueued, not just when they
- // are received, so we turn off auto-accept and prefetch by the handler.
- delegator.Prefetch = 0
- delegator.AutoAccept = false
- engine, err := proton.NewEngine(conn, delegator)
- if err != nil {
- return err
- }
- engine.Server() // Enable server-side protocol negotiation.
- go func() { // Start goroutine to run the engine event loop
- engine.Run()
- util.Debugf("Closed %s", engine)
- }()
- util.Debugf("Accepted %s", engine)
- return nil
-}
-
-// receiver is a channel to buffer messages waiting to go on the queue.
-type receiver chan receivedMessage
-
-// receivedMessage is a message and the corresponding delivery for acknowledgement.
-type receivedMessage struct {
- delivery proton.Delivery
- message amqp.Message
-}
-
-// sender is a signal channel, closed when we are done sending.
-type sender chan struct{}
-
// handler handles AMQP events. There is one handler per connection. The
-// handler does not need to be concurrent-safe as proton will serialize all
-// calls to a handler. We will use channels to communicate from the handler
-// to goroutines sending and receiving messages.
+// handler does not need to be concurrent-safe as proton.Engine will serialize
+// all calls to the handler. We use channels to communicate between the handler
+// goroutine and other goroutines sending and receiving messages.
type handler struct {
queues *util.Queues
- credit int // Credit window for receiver flow control.
- receivers map[proton.Link]receiver
- senders map[proton.Link]sender
+ receivers map[proton.Link]*receiver
+ senders map[proton.Link]*sender
+ injecter proton.Injecter
}
-func newHandler(queues *util.Queues, credit int) *handler {
+func newHandler(queues *util.Queues) *handler {
return &handler{
- queues,
- credit,
- make(map[proton.Link]receiver),
- make(map[proton.Link]sender),
+ queues: queues,
+ receivers: make(map[proton.Link]*receiver),
+ senders: make(map[proton.Link]*sender),
}
}
-// Handle an AMQP event.
+// HandleMessagingEvent handles an event, called in the handler goroutine.
func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) {
switch t {
+ case proton.MStart:
+ h.injecter = e.Injecter()
+
case proton.MLinkOpening:
- l := e.Link()
- var err error
- if l.IsReceiver() {
- err = h.receiver(l)
- } else { // IsSender()
- err = h.sender(l)
- }
- if err == nil {
- util.Debugf("%s opened", l)
+ if e.Link().IsReceiver() {
+ h.startReceiver(e)
} else {
- util.Debugf("%s open error: %s", l, err)
- proton.CloseError(l, err)
+ h.startSender(e)
}
- case proton.MLinkClosing:
- l := e.Link()
- if r, ok := h.receivers[l]; ok {
- close(r)
- delete(h.receivers, l)
- } else if s, ok := h.senders[l]; ok {
- close(s)
- delete(h.senders, l)
- }
- util.Debugf("%s closed", l)
+ case proton.MLinkClosed:
+ h.linkClosed(e.Link(), e.Link().RemoteCondition().Error())
case proton.MSendable:
- l := e.Link()
- q := h.queues.Get(l.RemoteSource().Address())
- if n, err := h.sendAll(e.Link(), q); err == nil && n > 0 {
- // Still have credit, start a watcher.
- go h.sendWatch(e.Link(), q)
+ if s, ok := h.senders[e.Link()]; ok {
+ s.sendable() // Signal the send goroutine that we have credit.
+ } else {
+ proton.CloseError(e.Link(), amqp.Errorf(amqp.NotFound, "link %s sender not found", e.Link()))
}
case proton.MMessage:
- l := e.Link()
- d := e.Delivery()
- m, err := d.Message() // Must decode message immediately before link state changes.
+ m, err := e.Delivery().Message() // Message() must be called while handling the MMessage event.
if err != nil {
- util.Debugf("%s error decoding message: %s", e.Link(), err)
- proton.CloseError(l, err)
- } else {
- // This will not block, AMQP credit prevents us from overflowing the buffer.
- h.receivers[l] <- receivedMessage{d, m}
- util.Debugf("%s received %s", l, util.FormatMessage(m))
+ proton.CloseError(e.Link(), err)
+ break
}
+ r, ok := h.receivers[e.Link()]
+ if !ok {
+ proton.CloseError(e.Link(), amqp.Errorf(amqp.NotFound, "link %s receiver not found", e.Link()))
+ break
+ }
+ // This will not block as AMQP credit is set to the buffer capacity.
+ r.buffer <- receivedMessage{e.Delivery(), m}
+ util.Debugf("link %s received %s", e.Link(), util.FormatMessage(m))
- case proton.MConnectionClosing, proton.MDisconnected:
- for l, r := range h.receivers {
- close(r)
- delete(h.receivers, l)
+ case proton.MConnectionClosed, proton.MDisconnected:
+ for l, _ := range h.receivers {
+ h.linkClosed(l, nil)
}
- for l, s := range h.senders {
- close(s)
- delete(h.senders, l)
+ for l, _ := range h.senders {
+ h.linkClosed(l, nil)
}
}
}
-// receiver is called by the handler when a receiver link opens.
+// linkClosed is called when a link has been closed by both ends.
+// It removes the link from the handlers maps and stops its goroutine.
+func (h *handler) linkClosed(l proton.Link, err error) {
+ if s, ok := h.senders[l]; ok {
+ s.stop()
+ delete(h.senders, l)
+ } else if r, ok := h.receivers[l]; ok {
+ r.stop()
+ delete(h.receivers, l)
+ }
+}
+
+// link has some common data and methods that are used by the sender and receiver types.
//
-// It sets up data structures in the handler and then starts a goroutine
-// to receive messages and put them on a queue.
-func (h *handler) receiver(l proton.Link) error {
- q := h.queues.Get(l.RemoteTarget().Address())
- buffer := make(receiver, h.credit)
- h.receivers[l] = buffer
- l.Flow(cap(buffer)) // credit==cap(buffer) so we won't overflow the buffer.
- go h.runReceive(l, buffer, q)
- return nil
+// An active link is represented by a sender or receiver value and a goroutine
+// running its run() method. The run() method communicates with the handler via
+// channels.
+type link struct {
+ l proton.Link
+ q util.Queue
+ h *handler
+}
+
+func makeLink(l proton.Link, q util.Queue, h *handler) link {
+ lnk := link{l: l, q: q, h: h}
+ return lnk
+}
+
+// receiver has a channel to buffer messages that have been received by the
+// handler and are waiting to go on the queue. AMQP credit ensures that the
+// handler does not overflow the buffer and block.
+type receiver struct {
+ link
+ buffer chan receivedMessage
}
-// runReceive moves messages from buffer to queue
-func (h *handler) runReceive(l proton.Link, buffer receiver, q util.Queue) {
- for rm := range buffer {
- q <- rm.message
- rm2 := rm // Save in temp var for injected closure
- err := l.Connection().Injecter().Inject(func() {
- rm2.delivery.Accept()
- l.Flow(1)
+// receivedMessage holds a message and a Delivery so that the message can be
+// acknowledged when it is put on the queue.
+type receivedMessage struct {
+ delivery proton.Delivery
+ message amqp.Message
+}
+
+// startReceiver creates a receiver and a goroutine for its run() method.
+func (h *handler) startReceiver(e proton.Event) {
+ q := h.queues.Get(e.Link().RemoteTarget().Address())
+ r := &receiver{
+ link: makeLink(e.Link(), q, h),
+ buffer: make(chan receivedMessage, *credit),
+ }
+ h.receivers[r.l] = r
+ r.l.Flow(cap(r.buffer)) // Give credit to fill the buffer to capacity.
+ go r.run()
+}
+
+// run runs in a separate goroutine. It moves messages from the buffer to the
+// queue for a receiver link, and injects a handler function to acknowledge the
+// message and send a credit.
+func (r *receiver) run() {
+ for rm := range r.buffer {
+ r.q <- rm.message
+ d := rm.delivery
+ // We are not in the handler goroutine so we Inject the accept function as a closure.
+ r.h.injecter.Inject(func() {
+ // Check that the receiver is still open, it may have been closed by the remote end.
+ if r == r.h.receivers[r.l] {
+ d.Accept() // Accept the delivery
+ r.l.Flow(1) // Add one credit
+ }
})
- if err != nil {
- util.Debugf("%s receive error: %s", l, err)
- proton.CloseError(l, err)
- }
}
}
-// sender is called by the handler when a sender link opens.
-// It sets up a sender structures in the handler.
-func (h *handler) sender(l proton.Link) error {
- h.senders[l] = make(sender)
- return nil
+// stop closes the buffer channel and waits for the run() goroutine to stop.
+func (r *receiver) stop() {
+ close(r.buffer)
}
-// send one message in handler context, assumes we have credit.
-func (h *handler) send(l proton.Link, m amqp.Message, q util.Queue) error {
- delivery, err := l.Send(m)
- if err != nil {
- h.closeSender(l, err)
- return err
+// sender has a channel that is used to signal when there is credit to send messages.
+type sender struct {
+ link
+ credit chan struct{} // Channel to signal availability of credit.
+}
+
+// startSender creates a sender and starts a goroutine for sender.run()
+func (h *handler) startSender(e proton.Event) {
+ q := h.queues.Get(e.Link().RemoteSource().Address())
+ s := &sender{
+ link: makeLink(e.Link(), q, h),
+ credit: make(chan struct{}, 1), // Capacity of 1 for signalling.
}
- delivery.Settle() // Pre-settled, unreliable.
- util.Debugf("%s sent %s", l, util.FormatMessage(m))
- return nil
+ h.senders[e.Link()] = s
+ go s.run()
}
-// sendAll sends as many messages as possible without blocking, call in handler context.
-// Returns the number of credits left, >0 means we ran out of messages.
-func (h *handler) sendAll(l proton.Link, q util.Queue) (int, error) {
- for l.Credit() > 0 {
- select {
- case m, ok := <-q:
- if ok { // Got a message
- if err := h.send(l, m, q); err != nil {
- return 0, err
- }
- } else { // Queue is closed
- l.Close()
- return 0, io.EOF
- }
- default: // Queue empty
- return l.Credit(), nil
- }
+// stop closes the credit channel and waits for the run() goroutine to stop.
+func (s *sender) stop() {
+ close(s.credit)
+}
+
+// sendable signals that the sender has credit, it does not block.
+// sender.credit has capacity 1, if it is already full we carry on.
+func (s *sender) sendable() {
+ select { // Non-blocking
+ case s.credit <- struct{}{}:
+ default:
}
- return l.Credit(), nil
}
-// sendWatch watches the queue for more messages and re-runs sendAll.
-// Run in a separate goroutine, so must inject handler functions.
-func (h *handler) sendWatch(l proton.Link, q util.Queue) {
- select {
- case m, ok := <-q:
- l.Connection().Injecter().Inject(func() {
- if ok {
- if h.send(l, m, q) != nil {
+// run runs in a separate goroutine. It monitors the queue for messages and injects
+// a function to send them when there is credit
+func (s *sender) run() {
+ var q chan amqp.Message // q is nil initially as we have no credit.
+ for {
+ select {
+ case _, ok := <-s.credit:
+ if !ok { // sender closed
+ return
+ }
+ q = s.q // We have credit, enable selecting on the queue.
+
+ case m, ok := <-q: // q is only enabled when we have credit.
+ if !ok { // queue closed
+ return
+ }
+ q = nil // Assume all credit will be used used, will be signaled otherwise.
+ s.h.injecter.Inject(func() { // Inject handler function to actually send
+ if s.h.senders[s.l] != s { // The sender has been closed by the remote end.
+ go func() { q <- m }() // Put the message back on the queue but don't block
return
}
- if n, err := h.sendAll(l, q); err != nil {
+ if s.sendOne(m) != nil {
return
- } else if n > 0 {
- go h.sendWatch(l, q) // Start a new watcher.
}
- }
- })
- case <-h.senders[l]: // Closed
- return
+ // Send as many more messages as we can without blocking
+ for s.l.Credit() > 0 {
+ select { // Non blocking receive from q
+ case m, ok := <-s.q:
+ if ok {
+ s.sendOne(m)
+ }
+ default: // Queue is empty but we have credit, signal the run() goroutine.
+ s.sendable()
+ }
+ }
+ })
+ }
}
}
-// closeSender closes a sender link and signals goroutines processing that sender.
-func (h *handler) closeSender(l proton.Link, err error) {
- util.Debugf("%s sender closed: %s", l, err)
- proton.CloseError(l, err)
- close(h.senders[l])
- delete(h.senders, l)
+// sendOne runs in the handler goroutine. It sends a single message.
+func (s *sender) sendOne(m amqp.Message) error {
+ delivery, err := s.l.Send(m)
+ if err == nil {
+ delivery.Settle() // Pre-settled, unreliable.
+ util.Debugf("link %s sent %s", s.l, util.FormatMessage(m))
+ } else {
+ go func() { s.q <- m }() // Put the message back on the queue but don't block
+ }
+ return err
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/examples/go/util/util.go
----------------------------------------------------------------------
diff --git a/examples/go/util/util.go b/examples/go/util/util.go
index 5118467..20f2192 100644
--- a/examples/go/util/util.go
+++ b/examples/go/util/util.go
@@ -64,5 +64,5 @@ func FormatMessage(m amqp.Message) string {
func init() {
log.SetFlags(0)
_, prog := path.Split(os.Args[0])
- log.SetPrefix(prog + ": ")
+ log.SetPrefix(fmt.Sprintf("%s(%d): ", prog, os.Getpid()))
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/CMakeLists.txt b/proton-c/bindings/go/CMakeLists.txt
index 51c2d86..74c7e13 100644
--- a/proton-c/bindings/go/CMakeLists.txt
+++ b/proton-c/bindings/go/CMakeLists.txt
@@ -75,14 +75,17 @@ foreach (pkg amqp proton electron)
set(sources "${GoFiles}${CgoFiles}")
# Build the package library
- add_custom_command(OUTPUT ${lib} COMMAND ${GO_INSTALL} ${package} DEPENDS ${sources} ${cdepends})
+ add_custom_command(
+ OUTPUT ${lib} COMMAND ${GO_INSTALL} ${package} DEPENDS ${sources} ${cdepends}
+ WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
set(target go-package-${pkg})
add_custom_target(${target} ALL DEPENDS ${lib})
# Package test
go_sources(TestGoFiles)
set(test_exe ${CMAKE_CURRENT_BINARY_DIR}/${pkg}.test)
- add_custom_command(OUTPUT ${test_exe} COMMAND ${GO_TEST} -c -o ${test_exe} ${package}
+ add_custom_command(
+ OUTPUT ${test_exe} COMMAND ${GO_TEST} -c -o ${test_exe} ${package}
DEPENDS ${target} qpid-proton)
add_custom_target(go-package-test-${pkg} ALL DEPENDS ${test_exe})
add_test(NAME go_test_${pkg} COMMAND ${test_exe} WORKING_DIRECTORY ${dir})
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
index bef8c17..d6761d6 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
@@ -28,131 +28,114 @@ import (
"qpid.apache.org/internal"
"qpid.apache.org/proton"
"sync"
+ "time"
)
// Connection is an AMQP connection, created by a Container.
type Connection interface {
Endpoint
- // Server puts the connection in server mode, must be called before Open().
- //
- // A server connection will do protocol negotiation to accept a incoming AMQP
- // connection. Normally you would call this for a connection created by
- // net.Listener.Accept()
- //
- Server()
-
- // Listen arranges for endpoints opened by the remote peer to be passed to accept().
- // Listen() must be called before Connection.Open().
- //
- // accept() is passed a Session, Sender or Receiver. It can examine endpoint
- // properties and set some properties (e.g. Receiver.SetCapacity()) Returning nil
- // will accept the endpoint, returning an error will reject it.
- //
- // accept() must not block or use the endpoint other than to examine or set
- // properties. It can start a goroutine to process the Endpoint, or pass the
- // Endpoint to another goroutine via a channel, and that goroutine can use
- // the endpoint as normal.
- //
- // The default Listen function is RejectEndpoint which rejects all endpoints.
- // You can call Listen(AcceptEndpoint) to accept all endpoints
- Listen(accept func(Endpoint) error)
-
- // Open the connection, ready for use.
- Open() error
-
// Sender opens a new sender on the DefaultSession.
//
// v can be a string, which is used as the Target address, or a SenderSettings
// struct containing more details settings.
- Sender(setting ...LinkSetting) (Sender, error)
+ Sender(...LinkSetting) (Sender, error)
// Receiver opens a new Receiver on the DefaultSession().
//
// v can be a string, which is used as the
// Source address, or a ReceiverSettings struct containing more details
// settings.
- Receiver(setting ...LinkSetting) (Receiver, error)
+ Receiver(...LinkSetting) (Receiver, error)
// DefaultSession() returns a default session for the connection. It is opened
// on the first call to DefaultSession and returned on subsequent calls.
DefaultSession() (Session, error)
// Session opens a new session.
- Session() (Session, error)
+ Session(...SessionSetting) (Session, error)
// Container for the connection.
Container() Container
// Disconnect the connection abruptly with an error.
Disconnect(error)
-}
-// AcceptEndpoint pass to Connection.Listen to accept all endpoints
-func AcceptEndpoint(Endpoint) error { return nil }
+ // Wait waits for the connection to be disconnected.
+ Wait() error
-// RejectEndpoint pass to Connection.Listen to reject all endpoints
-func RejectEndpoint(Endpoint) error {
- return amqp.Errorf(amqp.NotAllowed, "remote open rejected")
+ // WaitTimeout is like Wait but returns Timeout if the timeout expires.
+ WaitTimeout(time.Duration) error
+}
+
+// ConnectionSetting can be passed when creating a connection.
+// See functions that return ConnectionSetting for details
+type ConnectionSetting func(*connection)
+
+// Server setting puts the connection in server mode.
+//
+// A server connection will do protocol negotiation to accept a incoming AMQP
+// connection. Normally you would call this for a connection created by
+// net.Listener.Accept()
+//
+func Server() ConnectionSetting { return func(c *connection) { c.engine.Server() } }
+
+// Accepter provides a function to be called when a connection receives an incoming
+// request to open an endpoint, one of IncomingSession, IncomingSender or IncomingReceiver.
+//
+// The accept() function must not block or use the accepted endpoint.
+// It can pass the endpoint to another goroutine for processing.
+//
+// By default all incoming endpoints are rejected.
+func Accepter(accept func(Incoming)) ConnectionSetting {
+ return func(c *connection) { c.accept = accept }
}
type connection struct {
endpoint
listenOnce, defaultSessionOnce, closeOnce sync.Once
- // Set before Open()
- container *container
- conn net.Conn
- accept func(Endpoint) error
-
- // Set by Open()
+ container *container
+ conn net.Conn
+ accept func(Incoming)
handler *handler
engine *proton.Engine
err internal.ErrorHolder
eConnection proton.Connection
defaultSession Session
+ done chan struct{}
}
-func newConnection(conn net.Conn, cont *container) (*connection, error) {
- c := &connection{container: cont, conn: conn, accept: RejectEndpoint}
+func newConnection(conn net.Conn, cont *container, setting ...ConnectionSetting) (*connection, error) {
+ c := &connection{container: cont, conn: conn, accept: func(Incoming) {}, done: make(chan struct{})}
c.handler = newHandler(c)
var err error
c.engine, err = proton.NewEngine(c.conn, c.handler.delegator)
if err != nil {
return nil, err
}
+ for _, set := range setting {
+ set(c)
+ }
c.str = c.engine.String()
c.eConnection = c.engine.Connection()
+ go func() { c.engine.Run(); close(c.done) }()
return c, nil
}
-func (c *connection) Server() { c.engine.Server() }
-
-func (c *connection) Listen(accept func(Endpoint) error) { c.accept = accept }
-
-func (c *connection) Open() error {
- go c.engine.Run()
- return nil
-}
-
-func (c *connection) Close(err error) { c.engine.Close(err) }
+func (c *connection) Close(err error) { c.err.Set(err); c.engine.Close(err) }
-func (c *connection) Disconnect(err error) { c.engine.Disconnect(err) }
+func (c *connection) Disconnect(err error) { c.err.Set(err); c.engine.Disconnect(err) }
-func (c *connection) closed(err error) {
- // Call from another goroutine to initiate close without deadlock.
- go c.Close(err)
-}
-
-func (c *connection) Session() (Session, error) {
+func (c *connection) Session(setting ...SessionSetting) (Session, error) {
var s Session
err := c.engine.InjectWait(func() error {
eSession, err := c.engine.Connection().Session()
if err == nil {
eSession.Open()
if err == nil {
- s = newSession(c, eSession)
+ s = newSession(c, eSession, setting...)
}
}
return err
@@ -189,3 +172,47 @@ func (c *connection) Receiver(setting ...LinkSetting) (Receiver, error) {
}
func (c *connection) Connection() Connection { return c }
+
+func (c *connection) Wait() error { return c.WaitTimeout(Forever) }
+func (c *connection) WaitTimeout(timeout time.Duration) error {
+ _, err := timedReceive(c.done, timeout)
+ if err == Timeout {
+ return Timeout
+ }
+ return c.Error()
+}
+
+// Incoming is the interface for incoming requests to open an endpoint.
+// Implementing types are IncomingSession, IncomingSender and IncomingReceiver.
+type Incoming interface {
+ // Accept the endpoint with default settings.
+ //
+ // You must not use the returned endpoint in the accept() function that
+ // receives the Incoming value, but you can pass it to other goroutines.
+ //
+ // Implementing types provide type-specific Accept functions that take additional settings.
+ Accept() Endpoint
+
+ // Reject the endpoint with an error
+ Reject(error)
+
+ error() error
+}
+
+type incoming struct {
+ err error
+ accepted bool
+}
+
+func (i *incoming) Reject(err error) { i.err = err }
+
+func (i *incoming) error() error {
+ switch {
+ case i.err != nil:
+ return i.err
+ case !i.accepted:
+ return amqp.Errorf(amqp.NotAllowed, "remote open rejected")
+ default:
+ return nil
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/container.go b/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
index 06a9a14..7bbc4b0 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
@@ -39,7 +39,7 @@ type Container interface {
// setting any Connection properties you need to set. Note the net.Conn
// can be an outgoing connection (e.g. made with net.Dial) or an incoming
// connection (e.g. made with net.Listener.Accept())
- Connection(conn net.Conn) (Connection, error)
+ Connection(net.Conn, ...ConnectionSetting) (Connection, error)
}
type container struct {
@@ -66,6 +66,6 @@ func (cont *container) nextLinkName() string {
return cont.id + "@" + cont.linkNames.Next()
}
-func (cont *container) Connection(conn net.Conn) (Connection, error) {
- return newConnection(conn, cont)
+func (cont *container) Connection(conn net.Conn, setting ...ConnectionSetting) (Connection, error) {
+ return newConnection(conn, cont, setting...)
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
index 1b07109..b518e42 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
@@ -27,7 +27,7 @@ import (
// NOTE: methods in this file are called only in the proton goroutine unless otherwise indicated.
type handler struct {
- delegator *proton.MessagingDelegator
+ delegator *proton.MessagingAdapter
connection *connection
links map[proton.Link]Link
sentMessages map[proton.Delivery]*sentMessage
@@ -41,8 +41,8 @@ func newHandler(c *connection) *handler {
sentMessages: make(map[proton.Delivery]*sentMessage),
sessions: make(map[proton.Session]*session),
}
- h.delegator = proton.NewMessagingDelegator(h)
- // Disable auto features of MessagingDelegator, we do these ourselves.
+ h.delegator = proton.NewMessagingAdapter(h)
+ // Disable auto features of MessagingAdapter, we do these ourselves.
h.delegator.Prefetch = 0
h.delegator.AutoAccept = false
h.delegator.AutoSettle = false
@@ -50,6 +50,10 @@ func newHandler(c *connection) *handler {
return h
}
+func (h *handler) internalError(fmt string, arg ...interface{}) {
+ proton.CloseError(h.connection.eConnection, amqp.Errorf(amqp.InternalError, fmt, arg...))
+}
+
func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) {
switch t {
@@ -57,9 +61,7 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
if r, ok := h.links[e.Link()].(*receiver); ok {
r.message(e.Delivery())
} else {
- proton.CloseError(
- h.connection.eConnection,
- amqp.Errorf(amqp.InternalError, "no receiver for link %s", e.Link()))
+ h.internalError("no receiver for link %s", e.Link())
}
case proton.MSettled:
@@ -68,19 +70,18 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
}
case proton.MSendable:
- h.trySend(e.Link())
+ if s, ok := h.links[e.Link()].(*sender); ok {
+ s.sendable()
+ } else {
+ h.internalError("no receiver for link %s", e.Link())
+ }
case proton.MSessionOpening:
if e.Session().State().LocalUninit() { // Remotely opened
- s := newSession(h.connection, e.Session())
- if err := h.connection.accept(s); err != nil {
- proton.CloseError(e.Session(), (err))
- } else {
- h.sessions[e.Session()] = s
- if s.capacity > 0 {
- e.Session().SetIncomingCapacity(s.capacity)
- }
- e.Session().Open()
+ incoming := &IncomingSession{h: h, pSession: e.Session()}
+ h.connection.accept(incoming)
+ if err := incoming.error(); err != nil {
+ proton.CloseError(e.Session(), err)
}
}
@@ -95,33 +96,24 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
case proton.MLinkOpening:
l := e.Link()
- if l.State().LocalUninit() { // Remotely opened
- ss := h.sessions[l.Session()]
- if ss == nil {
- proton.CloseError(
- l, amqp.Errorf(amqp.InternalError, ("no session for link")))
- break
- }
- var link Link
- if l.IsReceiver() {
- r := &receiver{link: incomingLink(ss, l)}
- link = r
- r.inAccept = true
- defer func() { r.inAccept = false }()
- } else {
- link = &sender{link: incomingLink(ss, l)}
- }
- if err := h.connection.accept(link); err != nil {
- proton.CloseError(l, err)
- break
- }
- link.open()
+ if l.State().LocalActive() { // Already opened locally.
+ break
}
-
- case proton.MLinkOpened:
- l := e.Link()
- if l.IsSender() {
- h.trySend(l)
+ ss := h.sessions[l.Session()]
+ if ss == nil {
+ h.internalError("no session for link %s", e.Link())
+ break
+ }
+ var incoming Incoming
+ if l.IsReceiver() {
+ incoming = &IncomingReceiver{makeIncomingLink(ss, l)}
+ } else {
+ incoming = &IncomingSender{makeIncomingLink(ss, l)}
+ }
+ h.connection.accept(incoming)
+ if err := incoming.error(); err != nil {
+ proton.CloseError(l, err)
+ break
}
case proton.MLinkClosing:
@@ -130,7 +122,17 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
case proton.MLinkClosed:
h.linkClosed(e.Link(), proton.EndpointError(e.Link()))
+ case proton.MConnectionClosing:
+ h.connection.err.Set(e.Connection().RemoteCondition().Error())
+
+ case proton.MConnectionClosed:
+ h.connection.err.Set(Closed) // If no error already set, this is an orderly close.
+
case proton.MDisconnected:
+ h.connection.err.Set(e.Transport().Condition().Error())
+ // If err not set at this point (e.g. to Closed) then this is unexpected.
+ h.connection.err.Set(amqp.Errorf(amqp.IllegalState, "unexpected disconnect on %s", h.connection))
+
err := h.connection.Error()
for l, _ := range h.links {
h.linkClosed(l, err)
@@ -154,19 +156,3 @@ func (h *handler) linkClosed(l proton.Link, err error) {
func (h *handler) addLink(rl proton.Link, ll Link) {
h.links[rl] = ll
}
-
-func (h *handler) trySend(l proton.Link) {
- if l.Credit() <= 0 {
- return
- }
- if s, ok := h.links[l].(*sender); ok {
- for ch := s.popBlocked(); l.Credit() > 0 && ch != nil; ch = s.popBlocked() {
- if snd, ok := <-ch; ok {
- s.doSend(snd)
- }
- }
- } else {
- h.connection.closed(
- amqp.Errorf(amqp.InternalError, "cannot find sender for link %s", l))
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
index abc8431..4bef53b 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
@@ -60,39 +60,39 @@ type Link interface {
open()
}
-// LinkSetting is a function that sets a link property. Passed when creating
-// a Sender or Receiver, do not use at any other time.
-type LinkSetting func(Link)
+// LinkSetting can be passed when creating a sender or receiver.
+// See functions that return LinkSetting for details
+type LinkSetting func(*link)
// Source sets address that messages are coming from.
-func Source(s string) LinkSetting { return func(l Link) { l.(*link).source = s } }
+func Source(s string) LinkSetting { return func(l *link) { l.source = s } }
// Target sets address that messages are going to.
-func Target(s string) LinkSetting { return func(l Link) { l.(*link).target = s } }
+func Target(s string) LinkSetting { return func(l *link) { l.target = s } }
// LinkName sets the link name.
-func LinkName(s string) LinkSetting { return func(l Link) { l.(*link).target = s } }
+func LinkName(s string) LinkSetting { return func(l *link) { l.target = s } }
// SndSettle sets the send settle mode
-func SndSettle(m SndSettleMode) LinkSetting { return func(l Link) { l.(*link).sndSettle = m } }
+func SndSettle(m SndSettleMode) LinkSetting { return func(l *link) { l.sndSettle = m } }
// RcvSettle sets the send settle mode
-func RcvSettle(m RcvSettleMode) LinkSetting { return func(l Link) { l.(*link).rcvSettle = m } }
+func RcvSettle(m RcvSettleMode) LinkSetting { return func(l *link) { l.rcvSettle = m } }
// SndSettleMode defines when the sending end of the link settles message delivery.
type SndSettleMode proton.SndSettleMode
// Capacity sets the link capacity
-func Capacity(n int) LinkSetting { return func(l Link) { l.(*link).capacity = n } }
+func Capacity(n int) LinkSetting { return func(l *link) { l.capacity = n } }
// Prefetch sets a receivers pre-fetch flag. Not relevant for a sender.
-func Prefetch(p bool) LinkSetting { return func(l Link) { l.(*link).prefetch = p } }
+func Prefetch(p bool) LinkSetting { return func(l *link) { l.prefetch = p } }
// AtMostOnce sets "fire and forget" mode, messages are sent but no
// acknowledgment is received, messages can be lost if there is a network
// failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst
func AtMostOnce() LinkSetting {
- return func(l Link) {
+ return func(l *link) {
SndSettle(SndSettled)(l)
RcvSettle(RcvFirst)(l)
}
@@ -104,7 +104,7 @@ func AtMostOnce() LinkSetting {
// that the message will be received twice in this case.
// Sets SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst
func AtLeastOnce() LinkSetting {
- return func(l Link) {
+ return func(l *link) {
SndSettle(SndUnsettled)(l)
RcvSettle(RcvFirst)(l)
}
@@ -145,8 +145,6 @@ type link struct {
session *session
eLink proton.Link
done chan struct{} // Closed when link is closed
-
- inAccept bool
}
func (l *link) Source() string { return l.source }
@@ -163,8 +161,8 @@ func (l *link) engine() *proton.Engine { return l.session.connection.engine }
func (l *link) handler() *handler { return l.session.connection.handler }
// Set up link fields and open the proton.Link
-func localLink(sn *session, isSender bool, setting ...LinkSetting) (*link, error) {
- l := &link{
+func localLink(sn *session, isSender bool, setting ...LinkSetting) (link, error) {
+ l := link{
session: sn,
isSender: isSender,
capacity: 1,
@@ -172,7 +170,7 @@ func localLink(sn *session, isSender bool, setting ...LinkSetting) (*link, error
done: make(chan struct{}),
}
for _, set := range setting {
- set(l)
+ set(&l)
}
if l.linkName == "" {
l.linkName = l.session.connection.container.nextLinkName()
@@ -184,7 +182,7 @@ func localLink(sn *session, isSender bool, setting ...LinkSetting) (*link, error
}
if l.eLink.IsNil() {
l.err.Set(internal.Errorf("cannot create link %s", l))
- return nil, l.err.Get()
+ return l, l.err.Get()
}
l.eLink.Source().SetAddress(l.source)
l.eLink.Target().SetAddress(l.target)
@@ -195,20 +193,27 @@ func localLink(sn *session, isSender bool, setting ...LinkSetting) (*link, error
return l, nil
}
+type incomingLink struct {
+ incoming
+ link
+}
+
// Set up a link from an incoming proton.Link.
-func incomingLink(sn *session, eLink proton.Link) link {
- l := link{
- session: sn,
- isSender: eLink.IsSender(),
- eLink: eLink,
- source: eLink.RemoteSource().Address(),
- target: eLink.RemoteTarget().Address(),
- linkName: eLink.Name(),
- sndSettle: SndSettleMode(eLink.RemoteSndSettleMode()),
- rcvSettle: RcvSettleMode(eLink.RemoteRcvSettleMode()),
- capacity: 1,
- prefetch: false,
- done: make(chan struct{}),
+func makeIncomingLink(sn *session, eLink proton.Link) incomingLink {
+ l := incomingLink{
+ link: link{
+ session: sn,
+ isSender: eLink.IsSender(),
+ eLink: eLink,
+ source: eLink.RemoteSource().Address(),
+ target: eLink.RemoteTarget().Address(),
+ linkName: eLink.Name(),
+ sndSettle: SndSettleMode(eLink.RemoteSndSettleMode()),
+ rcvSettle: RcvSettleMode(eLink.RemoteRcvSettleMode()),
+ capacity: 1,
+ prefetch: false,
+ done: make(chan struct{}),
+ },
}
l.str = eLink.String()
return l
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
index 92c0b90..59ac018 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
@@ -20,10 +20,9 @@ under the License.
package electron
import (
+ "qpid.apache.org/amqp"
"qpid.apache.org/internal"
"qpid.apache.org/proton"
- "qpid.apache.org/amqp"
- "sync"
"time"
)
@@ -63,10 +62,6 @@ type Receiver interface {
// Capacity is the size (number of messages) of the local message buffer
// These are messages received but not yet returned to the application by a call to Receive()
Capacity() int
-
- // SetCapacity sets Capacity and Prefetch of an accepted Receiver.
- // May only be called in an accept() function, see Connection.Listen()
- SetCapacity(capacity int, prefetch bool)
}
// Flow control policy for a receiver.
@@ -120,18 +115,12 @@ func (p noPrefetchPolicy) Post(r *receiver, err error) {
// Receiver implementation
type receiver struct {
link
- buffer chan ReceivedMessage
- policy policy
- setupOnce sync.Once
-}
-
-func (r *receiver) SetCapacity(capacity int, prefetch bool) {
- internal.Assert(r.inAccept, "Receiver.SetCapacity called outside of accept function")
- r.capacity = capacity
- r.prefetch = prefetch
+ buffer chan ReceivedMessage
+ policy policy
}
-func (r *receiver) setup() {
+func newReceiver(l link) *receiver {
+ r := &receiver{link: l}
if r.capacity < 1 {
r.capacity = 1
}
@@ -141,6 +130,9 @@ func (r *receiver) setup() {
r.policy = &noPrefetchPolicy{}
}
r.buffer = make(chan ReceivedMessage, r.capacity)
+ r.handler().addLink(r.eLink, r)
+ r.link.open()
+ return r
}
// call in proton goroutine.
@@ -156,15 +148,14 @@ func (r *receiver) Receive() (rm ReceivedMessage, err error) {
}
func (r *receiver) ReceiveTimeout(timeout time.Duration) (rm ReceivedMessage, err error) {
- r.setupOnce.Do(r.setup)
internal.Assert(r.buffer != nil, "Receiver is not open: %s", r)
r.policy.Pre(r)
defer func() { r.policy.Post(r, err) }()
- rmi, ok, timedout := timedReceive(r.buffer, timeout)
- switch {
- case timedout:
+ rmi, err := timedReceive(r.buffer, timeout)
+ switch err {
+ case Timeout:
return ReceivedMessage{}, Timeout
- case !ok:
+ case Closed:
return ReceivedMessage{}, r.Error()
default:
return rmi.(ReceivedMessage), nil
@@ -194,12 +185,6 @@ func (r *receiver) message(delivery proton.Delivery) {
}
}
-func (r *receiver) open() {
- r.setupOnce.Do(r.setup)
- r.link.open()
- r.handler().addLink(r.eLink, r)
-}
-
func (r *receiver) closed(err error) {
r.link.closed(err)
if r.buffer != nil {
@@ -230,3 +215,24 @@ func (rm *ReceivedMessage) Accept() error { return rm.Acknowledge(Accepted) }
// Reject is short for Acknowledge(Rejected)
func (rm *ReceivedMessage) Reject() error { return rm.Acknowledge(Rejected) }
+
+// IncomingReceiver is passed to the accept() function given to Connection.Listen()
+// when there is an incoming request for a receiver link.
+type IncomingReceiver struct {
+ incomingLink
+}
+
+// Link provides information about the incoming link.
+func (i *IncomingReceiver) Link() Link { return i }
+
+// AcceptReceiver sets Capacity and Prefetch of the accepted Receiver.
+func (i *IncomingReceiver) AcceptReceiver(capacity int, prefetch bool) Receiver {
+ i.capacity = capacity
+ i.prefetch = prefetch
+ return i.Accept().(Receiver)
+}
+
+func (i *IncomingReceiver) Accept() Endpoint {
+ i.accepted = true
+ return newReceiver(i.link)
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
index 3124f74..68cfbb3 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
@@ -23,10 +23,9 @@ package electron
import "C"
import (
- "container/list"
+ "qpid.apache.org/amqp"
"qpid.apache.org/internal"
"qpid.apache.org/proton"
- "qpid.apache.org/amqp"
"reflect"
"time"
)
@@ -69,7 +68,7 @@ type sendMessage struct {
type sender struct {
link
- blocked list.List // Channel of sendMessage for blocked senders.
+ credit chan struct{} // Signal available credit.
}
// Disposition indicates the outcome of a settled message delivery.
@@ -102,31 +101,6 @@ func (d Disposition) String() string {
}
}
-// Send a message, assumes there is credit
-func (s *sender) doSend(snd sendMessage) {
- delivery, err := s.eLink.Send(snd.m)
- switch sm := snd.sm.(type) {
- case nil:
- delivery.Settle()
- case *sentMessage:
- sm.delivery = delivery
- if err != nil {
- sm.settled(err)
- } else {
- s.handler().sentMessages[delivery] = sm
- }
- default:
- internal.Assert(false, "bad SentMessage type %T", snd.sm)
- }
-}
-
-func (s *sender) popBlocked() chan sendMessage {
- if s.blocked.Len() > 0 {
- return s.blocked.Remove(s.blocked.Front()).(chan sendMessage)
- }
- return nil
-}
-
func (s *sender) Send(m amqp.Message) (SentMessage, error) {
return s.SendTimeout(m, Forever)
}
@@ -152,52 +126,58 @@ func (s *sender) SendForgetTimeout(m amqp.Message, timeout time.Duration) error
}
func (s *sender) sendInternal(snd sendMessage, timeout time.Duration) (SentMessage, error) {
- if s.Error() != nil {
- return nil, s.Error()
- }
- var err error
- if timeout == 0 {
- err = s.engine().InjectWait(func() error {
- if s.eLink.Credit() > 0 {
- s.doSend(snd)
- return nil
- }
- return Timeout
- })
- } else {
- buf := make(chan sendMessage)
- done := make(chan struct{})
- defer close(buf)
- s.engine().Inject(func() { // Runs concurrently
- if s.eLink.Credit() > 0 {
- s.doSend(snd)
- close(done) // Signal already sent
- } else {
- s.blocked.PushBack(buf)
- }
- })
- select {
- case <-done: // Sent without blocking
- case buf <- snd: // Sent via blocking channel
- case <-s.done:
+ if _, err := timedReceive(s.credit, timeout); err != nil { // Wait for credit
+ if err == Closed {
err = s.Error()
- case <-After(timeout):
- err = Timeout
+ internal.Assert(err != nil)
}
+ return nil, err
}
- if err != nil {
+ if err := s.engine().Inject(func() { s.doSend(snd) }); err != nil {
return nil, err
}
return snd.sm, nil
}
+// Send a message. Handler goroutine
+func (s *sender) doSend(snd sendMessage) {
+ delivery, err := s.eLink.Send(snd.m)
+ switch sm := snd.sm.(type) {
+ case nil:
+ delivery.Settle()
+ case *sentMessage:
+ sm.delivery = delivery
+ if err != nil {
+ sm.settled(err)
+ } else {
+ s.handler().sentMessages[delivery] = sm
+ }
+ default:
+ internal.Assert(false, "bad SentMessage type %T", snd.sm)
+ }
+ if s.eLink.Credit() > 0 {
+ s.sendable() // Signal credit.
+ }
+}
+
+// Signal the sender has credit. Any goroutine.
+func (s *sender) sendable() {
+ select { // Non-blocking
+ case s.credit <- struct{}{}: // Set the flag if not already set.
+ default:
+ }
+}
+
func (s *sender) closed(err error) {
s.link.closed(err)
+ close(s.credit)
}
-func (s *sender) open() {
- s.link.open()
+func newSender(l link) *sender {
+ s := &sender{link: l, credit: make(chan struct{}, 1)}
s.handler().addLink(s.eLink, s)
+ s.link.open()
+ return s
}
// SentMessage represents a previously sent message. It allows you to wait for acknowledgement.
@@ -285,7 +265,7 @@ func (sm *sentMessage) Disposition() (Disposition, error) {
}
func (sm *sentMessage) DispositionTimeout(timeout time.Duration) (Disposition, error) {
- if _, _, timedout := timedReceive(sm.done, timeout); timedout {
+ if _, err := timedReceive(sm.done, timeout); err == Timeout {
return sm.disposition, Timeout
} else {
return sm.disposition, sm.err
@@ -317,3 +297,19 @@ func (sm *sentMessage) finish() {
}
func (sm *sentMessage) Error() error { return sm.err }
+
+// IncomingSender is passed to the accept() function given to Connection.Listen()
+// when there is an incoming request for a sender link.
+type IncomingSender struct {
+ incomingLink
+}
+
+// Link provides information about the incoming link.
+func (i *IncomingSender) Link() Link { return i }
+
+func (i *IncomingSender) AcceptSender() Sender { return i.Accept().(Sender) }
+
+func (i *IncomingSender) Accept() Endpoint {
+ i.accepted = true
+ return newSender(i.link)
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
index 612658a..3531da6 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
@@ -24,7 +24,6 @@ import (
)
// Session is an AMQP session, it contains Senders and Receivers.
-//
type Session interface {
Endpoint
@@ -36,10 +35,6 @@ type Session interface {
// Source address, or a ReceiverSettings struct containing more details
// settings.
Receiver(...LinkSetting) (Receiver, error)
-
- // SetCapacity sets the session buffer capacity in bytes.
- // Only has effect if called in an accept() function, see Connection.Listen()
- SetCapacity(bytes uint)
}
type session struct {
@@ -49,13 +44,27 @@ type session struct {
capacity uint
}
+// SessionSetting can be passed when creating a sender or receiver.
+// See functions that return SessionSetting for details
+type SessionSetting func(*session)
+
+// IncomingCapacity sets the size (in bytes) of the sessions incoming data buffer..
+func IncomingCapacity(cap uint) SessionSetting { return func(s *session) { s.capacity = cap } }
+
// in proton goroutine
-func newSession(c *connection, es proton.Session) *session {
- return &session{
+func newSession(c *connection, es proton.Session, setting ...SessionSetting) *session {
+ s := &session{
connection: c,
eSession: es,
endpoint: endpoint{str: es.String()},
}
+ for _, set := range setting {
+ set(s)
+ }
+ c.handler.sessions[s.eSession] = s
+ s.eSession.SetIncomingCapacity(s.capacity)
+ s.eSession.Open()
+ return s
}
func (s *session) Connection() Connection { return s.connection }
@@ -71,8 +80,7 @@ func (s *session) Sender(setting ...LinkSetting) (snd Sender, err error) {
err = s.engine().InjectWait(func() error {
l, err := localLink(s, true, setting...)
if err == nil {
- snd = &sender{link: *l}
- snd.(*sender).open()
+ snd = newSender(l)
}
return err
})
@@ -83,8 +91,7 @@ func (s *session) Receiver(setting ...LinkSetting) (rcv Receiver, err error) {
err = s.engine().InjectWait(func() error {
l, err := localLink(s, false, setting...)
if err == nil {
- rcv = &receiver{link: *l}
- rcv.(*receiver).open()
+ rcv = newReceiver(l)
}
return err
})
@@ -96,3 +103,23 @@ func (s *session) closed(err error) {
s.err.Set(err)
s.err.Set(Closed)
}
+
+// IncomingSession is passed to the accept() function given to Connection.Listen()
+// when there is an incoming session request.
+type IncomingSession struct {
+ incoming
+ h *handler
+ pSession proton.Session
+ capacity uint
+}
+
+// AcceptCapacity sets the session buffer capacity of an incoming session in bytes.
+func (i *IncomingSession) AcceptSession(bytes uint) Session {
+ i.capacity = bytes
+ return i.Accept().(Session)
+}
+
+func (i *IncomingSession) Accept() Endpoint {
+ i.accepted = true
+ return newSession(i.h.connection, i.pSession, IncomingCapacity(i.capacity))
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/electron/time.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/time.go b/proton-c/bindings/go/src/qpid.apache.org/electron/time.go
index ee61332..3407b82 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/time.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/time.go
@@ -49,24 +49,25 @@ const Forever time.Duration = -1
// timeout==0 means do a non-blocking receive attempt. timeout < 0 means block
// forever. Other values mean block up to the timeout.
//
-func timedReceive(channel interface{}, timeout time.Duration) (value interface{}, ok bool, timedout bool) {
+// Returns error Timeout on timeout, Closed on channel close.
+func timedReceive(channel interface{}, timeout time.Duration) (interface{}, error) {
cases := []reflect.SelectCase{
reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(channel)},
}
- switch {
- case timeout == 0: // Non-blocking receive
+ if timeout == 0 { // Non-blocking
cases = append(cases, reflect.SelectCase{Dir: reflect.SelectDefault})
- case timeout == Forever: // Block forever, nothing to add
- default: // Block up to timeout
+ } else { // Block up to timeout
cases = append(cases,
- reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(time.After(timeout))})
+ reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(After(timeout))})
}
- chosen, recv, recvOk := reflect.Select(cases)
+ chosen, value, ok := reflect.Select(cases)
switch {
+ case !ok:
+ return nil, Closed
case chosen == 0:
- return recv.Interface(), recvOk, false
+ return value.Interface(), nil
default:
- return nil, false, true
+ return nil, Timeout
}
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go b/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go
index e9d6d6f..51f70f8 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go
@@ -23,9 +23,9 @@ You can write clients and servers using this library.
This package is a port of the Proton C API into Go (see
http://qpid.apache.org/proton) Go programmers may find the 'electron' package
-more convenient, it provides a concurrent-safe API that allows you to do
-procedural loops in goroutines rather than implementing event handlers that must
-run in a single goroutine.
+more convenient. qpid.apache.org/electron provides a concurrent-safe API that
+allows you to run procedural loops in arbitrary goroutines rather than
+implementing event handlers that must run in a single goroutine.
Consult the C API documentation at http://qpid.apache.org/proton for more
information about the types here. There is a 1-1 correspondence between C type
@@ -42,21 +42,24 @@ goroutine that feeds events to a proton.MessagingHandler, which you must impleme
See the Engine documentation for more.
MessagingHandler defines an event handling interface that you can implement to
-react to AMQP protocol events. (There is also a lower-level EventHandler, but
-MessagingHandler provides a simpler set of events and automates common tasks for you.)
+react to AMQP protocol events. There is also a lower-level EventHandler, but
+MessagingHandler provides a simpler set of events and automates common tasks for you,
+for most applications it will be more convenient.
-All events generated by proton are handled in the single event-loop goroutine
-associated with the Connection and Engine. You can use Engine.Inject() or
-Engine.InjectWait() to inject additional functions into the event loop. Only
-injected functions or handler functions can use proton types (such as Session,
-Link etc.) Handlers and injected functions can set up channels to communicate
-with other goroutines..
+NOTE: Methods on most types defined in this package (Sessions, Links etc.) can
+*only* be called in the event handler goroutine of the relevant
+Connection/Engine, either by the HandleEvent method of a handler type or in a
+function injected into the goroutine via Inject() or InjectWait() Handlers and
+injected functions can set up channels to communicate with other goroutines.
+Note the Injecter associated with a handler available as part of the Event value
+passed to HandleEvent.
-Separate Connection and Engine instances are independent, and can run concurrently.
+Separate Engine instances are independent, and can run concurrently.
The 'electron' package is built on the proton package but instead offers a
concurrent-safe API that can use simple procedural loops rather than event
-handlers to express application logic. It may be easier to use w for some applications.
+handlers to express application logic. It is easier to use for most
+applications.
*/
package proton
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
index 5dc8727..2cebb49 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
@@ -39,15 +39,26 @@ import (
"unsafe"
)
-// Injecter allows functions to be "injected" into an event-processing loop.
+// Injecter allows functions to be "injected" into the event-processing loop, to
+// be called in the same goroutine as event handlers.
type Injecter interface {
- // Inject a function into an event-loop concurrency context.
+ // Inject a function into the engine goroutine.
//
- // f() will be called in the same concurrency context as event handers, so it
- // can safely use values that can used be used in that context. If f blocks it
- // will block the event loop so be careful calling blocking functions in f.
+ // f() will be called in the same goroutine as event handlers, so it can safely
+ // use values belonging to event handlers without synchronization. f() should
+ // not block, no further events or injected functions can be processed until
+ // f() returns.
//
- // Returns a non-nil error if the function could not be injected.
+ // Returns a non-nil error if the function could not be injected and will
+ // never be called. Otherwise the function will eventually be called.
+ //
+ // Note that proton values (Link, Session, Connection etc.) that existed when
+ // Inject(f) was called may have become invalid by the time f() is executed.
+ // Handlers should handle keep track of Closed events to ensure proton values
+ // are not used after they become invalid. One technique is to have map from
+ // proton values to application values. Check that the map has the correct
+ // proton/application value pair at the start of the injected function and
+ // delete the value from the map when handling a Closed event.
Inject(f func()) error
// InjectWait is like Inject but does not return till f() has completed.
@@ -72,19 +83,14 @@ func (b *bufferChan) buffer() []byte {
}
// Engine reads from a net.Conn, decodes AMQP events and calls the appropriate
-// Handler functions in a single event-loop goroutine. Actions taken by Handler
-// functions (such as sending messages) are encoded and written to the
-// net.Conn. Create a engine with NewEngine()
-//
-// The Engine runs a proton event loop in the goroutine that calls Engine.Run()
-// and creates goroutines to feed data to/from a net.Conn. You can create
-// multiple Engines to handle multiple connections concurrently.
+// Handler functions sequentially in a single goroutine. Actions taken by
+// Handler functions (such as sending messages) are encoded and written to the
+// net.Conn. You can create multiple Engines to handle multiple connections
+// concurrently.
//
-// Methods on proton values defined in this package (Sessions, Links etc.) can
-// only be called in the goroutine that executes the corresponding
-// Engine.Run(). You implement the EventHandler or MessagingHandler interfaces
-// and provide those values to NewEngine(). Their HandleEvent method will be
-// called in the Engine goroutine, in typical event-driven style.
+// You implement the EventHandler and/or MessagingHandler interfaces and provide
+// those values to NewEngine(). Their HandleEvent method will be called in the
+// event-handling goroutine.
//
// Handlers can pass values from an event (Connections, Links, Deliveries etc.) to
// other goroutines, store them, or use them as map indexes. Effectively they are
@@ -161,7 +167,7 @@ func NewEngine(conn net.Conn, handlers ...EventHandler) (*Engine, error) {
}
C.pn_connection_collect(eng.connection.pn, eng.collector)
eng.connection.Open()
- connectionContexts.Put(eng.connection, connectionContext{eng, eng.String()})
+ connectionContexts.Put(eng.connection, connectionContext{eng.String()})
return eng, nil
}
@@ -223,6 +229,7 @@ func (eng *Engine) Server() { eng.transport.SetServer() }
// Close the engine's connection, returns when the engine has exited.
func (eng *Engine) Close(err error) {
+ eng.err.Set(err)
eng.Inject(func() {
CloseError(eng.connection, err)
})
@@ -231,9 +238,7 @@ func (eng *Engine) Close(err error) {
// Disconnect the engine's connection without and AMQP close, returns when the engine has exited.
func (eng *Engine) Disconnect(err error) {
- if err != nil {
- eng.err.Set(err)
- }
+ eng.err.Set(err)
eng.conn.Close()
<-eng.running
}
@@ -281,8 +286,8 @@ func (eng *Engine) Run() error {
}()
wbuf := eng.write.buffer()[:0]
-loop:
- for {
+
+ for eng.err.Get() == nil {
if len(wbuf) == 0 {
eng.pop(&wbuf)
}
@@ -309,13 +314,12 @@ loop:
eng.netError(err)
}
eng.process()
- if eng.err.Get() != nil {
- break loop
- }
}
close(eng.write.buffers)
eng.conn.Close() // Make sure connection is closed
wait.Wait()
+ close(eng.running) // Signal goroutines have exited and Error is set.
+
connectionContexts.Delete(eng.connection)
if !eng.connection.IsNil() {
eng.connection.Free()
@@ -332,15 +336,10 @@ loop:
C.pn_handler_free(h.pn)
}
}
- close(eng.running) // Signal goroutines have exited and Error is set.
return eng.err.Get()
}
func (eng *Engine) netError(err error) {
- if err == nil {
- err = internal.Errorf("unknown network error")
- }
- eng.conn.Close() // Make sure both sides are closed
eng.err.Set(err)
eng.transport.CloseHead()
eng.transport.CloseTail()
@@ -389,17 +388,13 @@ func (eng *Engine) handle(e Event) {
h.HandleEvent(e)
}
if e.Type() == ETransportClosed {
- eng.err.Set(e.Connection().RemoteCondition().Error())
- eng.err.Set(e.Connection().Transport().Condition().Error())
- if eng.err.Get() == nil {
- eng.err.Set(io.EOF)
- }
+ eng.err.Set(io.EOF)
}
}
func (eng *Engine) process() {
for ce := C.pn_collector_peek(eng.collector); ce != nil; ce = C.pn_collector_peek(eng.collector) {
- eng.handle(makeEvent(ce))
+ eng.handle(makeEvent(ce, eng))
C.pn_collector_pop(eng.collector)
}
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go
index 8a5cbf8..53b744c 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/handlers.go
@@ -191,7 +191,7 @@ type endpointDelegator struct {
remoteOpen, remoteClose, localOpen, localClose EventType
opening, opened, closing, closed, error MessagingEvent
endpoint func(Event) Endpoint
- delegator *MessagingDelegator
+ delegator *MessagingAdapter
}
// HandleEvent handles an open/close event for an endpoint in a generic way.
@@ -240,10 +240,10 @@ func (d endpointDelegator) HandleEvent(e Event) {
}
}
-// MessagingDelegator implments a EventHandler and delegates to a MessagingHandler.
-// You can modify the exported fields before you pass the MessagingDelegator to
+// MessagingAdapter implments a EventHandler and delegates to a MessagingHandler.
+// You can modify the exported fields before you pass the MessagingAdapter to
// a Engine.
-type MessagingDelegator struct {
+type MessagingAdapter struct {
mhandler MessagingHandler
connection, session, link endpointDelegator
flowcontroller EventHandler
@@ -261,8 +261,8 @@ type MessagingDelegator struct {
PeerCloseError bool
}
-func NewMessagingDelegator(h MessagingHandler) *MessagingDelegator {
- return &MessagingDelegator{
+func NewMessagingAdapter(h MessagingHandler) *MessagingAdapter {
+ return &MessagingAdapter{
mhandler: h,
flowcontroller: nil,
AutoSettle: true,
@@ -281,7 +281,7 @@ func handleIf(h EventHandler, e Event) {
// Handle a proton event by passing the corresponding MessagingEvent(s) to
// the MessagingHandler.
-func (d *MessagingDelegator) HandleEvent(e Event) {
+func (d *MessagingAdapter) HandleEvent(e Event) {
handleIf(d.flowcontroller, e)
switch e.Type() {
@@ -352,7 +352,7 @@ func (d *MessagingDelegator) HandleEvent(e Event) {
}
}
-func (d *MessagingDelegator) incoming(e Event) (err error) {
+func (d *MessagingAdapter) incoming(e Event) (err error) {
delivery := e.Delivery()
if delivery.HasMessage() {
d.mhandler.HandleMessagingEvent(MMessage, e)
@@ -368,7 +368,7 @@ func (d *MessagingDelegator) incoming(e Event) (err error) {
return
}
-func (d *MessagingDelegator) outgoing(e Event) (err error) {
+func (d *MessagingAdapter) outgoing(e Event) (err error) {
delivery := e.Delivery()
if delivery.Updated() {
switch delivery.Remote().Type() {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
index 7d40890..45a6722 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
@@ -24,20 +24,20 @@ package proton
//#include <proton/codec.h>
//#include <proton/connection.h>
-//#include <proton/session.h>
-//#include <proton/session.h>
//#include <proton/delivery.h>
-//#include <proton/link.h>
//#include <proton/event.h>
-//#include <proton/transport.h>
//#include <proton/link.h>
+//#include <proton/link.h>
+//#include <proton/object.h>
+//#include <proton/session.h>
+//#include <proton/transport.h>
//#include <stdlib.h>
import "C"
import (
"fmt"
- "qpid.apache.org/internal"
"qpid.apache.org/amqp"
+ "qpid.apache.org/internal"
"reflect"
"time"
"unsafe"
@@ -45,34 +45,75 @@ import (
// TODO aconway 2015-05-05: Documentation for generated types.
+// CHandle holds an unsafe.Pointer to a proton C struct, the C type depends on the
+// Go type implementing this interface. For low level, at-your-own-risk use only.
+type CHandle interface {
+ // CPtr returns the unsafe C pointer, equivalent to a C void*.
+ CPtr() unsafe.Pointer
+}
+
+// Incref increases the refcount of a proton value, which prevents the
+// underlying C struct being freed until you call Decref().
+//
+// It can be useful to "pin" a proton value in memory while it is in use by
+// goroutines other than the event loop goroutine. For example if you Incref() a
+// Link, the underlying object is not freed when the link is closed, so means
+// other goroutines can continue to safely use it as an index in a map or inject
+// it into the event loop goroutine. There will of course be an error if you try
+// to use a link after it is closed, but not a segmentation fault.
+func Incref(c CHandle) {
+ if p := c.CPtr(); p != nil {
+ C.pn_incref(p)
+ }
+}
+
+// Decref decreases the refcount of a proton value, freeing the underlying C
+// struct if this is the last reference. Only call this if you previously
+// called Incref() for this value.
+func Decref(c CHandle) {
+ if p := c.CPtr(); p != nil {
+ C.pn_decref(p)
+ }
+}
+
// Event is an AMQP protocol event.
type Event struct {
pn *C.pn_event_t
eventType EventType
connection Connection
+ transport Transport
session Session
link Link
delivery Delivery
+ injecter Injecter
}
-func makeEvent(pn *C.pn_event_t) Event {
+func makeEvent(pn *C.pn_event_t, injecter Injecter) Event {
return Event{
pn: pn,
eventType: EventType(C.pn_event_type(pn)),
connection: Connection{C.pn_event_connection(pn)},
+ transport: Transport{C.pn_event_transport(pn)},
session: Session{C.pn_event_session(pn)},
link: Link{C.pn_event_link(pn)},
delivery: Delivery{C.pn_event_delivery(pn)},
+ injecter: injecter,
}
}
func (e Event) IsNil() bool { return e.eventType == EventType(0) }
func (e Event) Type() EventType { return e.eventType }
func (e Event) Connection() Connection { return e.connection }
+func (e Event) Transport() Transport { return e.transport }
func (e Event) Session() Session { return e.session }
func (e Event) Link() Link { return e.link }
func (e Event) Delivery() Delivery { return e.delivery }
func (e Event) String() string { return e.Type().String() }
+// Injecter should not be used in a handler function, but it can be passed to
+// other goroutines (via a channel or to a goroutine started by handler
+// functions) to let them inject functions back into the handlers goroutine.
+func (e Event) Injecter() Injecter { return e.injecter }
+
// Data holds a pointer to decoded AMQP data.
// Use amqp.marshal/unmarshal to access it as Go data types.
//
@@ -132,12 +173,14 @@ type Endpoint interface {
String() string
}
-// CloseError sets an error condition on an endpoint and closes the endpoint.
+// CloseError sets an error condition on an endpoint and closes the endpoint (if not already closed)
func CloseError(e Endpoint, err error) {
if err != nil {
e.Condition().SetError(err)
}
- e.Close()
+ if e.State().RemoteActive() {
+ e.Close()
+ }
}
// EndpointError returns the remote error if there is one, the local error if not
@@ -204,6 +247,20 @@ func (l Link) Delivery(tag string) Delivery {
return Delivery{C.pn_delivery(l.pn, pnTag(tag))}
}
+func (l Link) Connection() Connection { return l.Session().Connection() }
+
+// Human-readable link description including name, source, target and direction.
+func (l Link) String() string {
+ switch {
+ case l.IsNil():
+ return fmt.Sprintf("<nil-link>")
+ case l.IsSender():
+ return fmt.Sprintf("%s(%s->%s)", l.Name(), l.Source().Address(), l.Target().Address())
+ default:
+ return fmt.Sprintf("%s(%s<-%s)", l.Name(), l.Target().Address(), l.Source().Address())
+ }
+}
+
func cPtr(b []byte) *C.char {
if len(b) == 0 {
return nil
@@ -229,20 +286,11 @@ func (s Session) Receiver(name string) Link {
// Context information per connection.
type connectionContext struct {
- injecter Injecter
- str string
+ str string
}
var connectionContexts = internal.MakeSafeMap()
-// Injecter for event-loop associated with this connection.
-func (c Connection) Injecter() Injecter {
- if cc, ok := connectionContexts.Get(c).(connectionContext); ok {
- return cc.injecter
- }
- return nil
-}
-
// Unique (per process) string identifier for a connection, useful for debugging.
func (c Connection) String() string {
if cc, ok := connectionContexts.Get(c).(connectionContext); ok {
@@ -279,20 +327,6 @@ func (s Session) String() string {
return fmt.Sprintf("%s/%p", s.Connection(), s.pn)
}
-func (l Link) Connection() Connection { return l.Session().Connection() }
-
-// Human-readable link description including name, source, target and direction.
-func (l Link) String() string {
- switch {
- case l.IsNil():
- return fmt.Sprintf("<nil-link>")
- case l.IsSender():
- return fmt.Sprintf("%s(%s->%s)", l.Name(), l.Source().Address(), l.Target().Address())
- default:
- return fmt.Sprintf("%s(%s<-%s)", l.Name(), l.Target().Address(), l.Source().Address())
- }
-}
-
// Error returns an instance of amqp.Error or nil.
func (c Condition) Error() error {
if c.IsNil() || !c.IsSet() {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/bc0a242e/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/Terminus.java
----------------------------------------------------------------------
diff --git a/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/Terminus.java b/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/Terminus.java
index ac28b32..ef09837 100644
--- a/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/Terminus.java
+++ b/proton-j/src/main/java/org/apache/qpid/proton/amqp/messaging/Terminus.java
@@ -48,7 +48,7 @@ public abstract class Terminus
_dynamic = other._dynamic;
if (other._dynamicNodeProperties != null) {
// TODO: Do we need to copy or can we make a simple reference?
- _dynamicNodeProperties = new HashMap(other._dynamicNodeProperties); // FIXME
+ _dynamicNodeProperties = new HashMap(other._dynamicNodeProperties); // yFIXME
}
if (other._capabilities != null) {
_capabilities = other._capabilities.clone(); // FIXME?
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org