You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2015/10/08 06:30:37 UTC
[5/5] qpid-proton git commit: PROTON-1011: Go example of event driven
broker. Package renaming and some new features.
PROTON-1011: Go example of event driven broker. Package renaming and some new features.
New pacakges names:
- qpid.apache.org/amqp - amqp/Go data mapping
- qpid.apache.org/proton - faithful wrapper of proton C library
- qpid.apache.org/electron - alternative, procedural, concurrent-safe Go API
Simplified broker examples, complete proton and electron brokers for comparison.
- Send blocks for credit, added SendTimeout.
- Fixed some shut-down issues
- Session flow control.
- Additional unit tests
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/478ba4ea
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/478ba4ea
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/478ba4ea
Branch: refs/heads/master
Commit: 478ba4ea1e7a2c5e60ac8f64d6756f2e6658663b
Parents: 2789615
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Sep 29 17:29:22 2015 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 8 00:30:09 2015 -0400
----------------------------------------------------------------------
examples/go/CMakeLists.txt | 15 +-
examples/go/README.md | 24 +-
examples/go/broker.go | 161 ------
examples/go/electron/broker.go | 148 +++++
examples/go/electron/receive.go | 122 ++++
examples/go/electron/send.go | 123 ++++
examples/go/example_test.go | 6 +-
examples/go/proton/broker.go | 299 ++++++++++
examples/go/receive.go | 119 ----
examples/go/send.go | 121 ----
examples/go/util/queue.go | 89 +--
examples/go/util/util.go | 2 +-
proton-c/bindings/go/CMakeLists.txt | 4 +-
proton-c/bindings/go/README.md | 151 +----
.../bindings/go/src/qpid.apache.org/README.md | 105 ++++
.../bindings/go/src/qpid.apache.org/amqp/doc.go | 34 ++
.../go/src/qpid.apache.org/amqp/error.go | 66 +++
.../go/src/qpid.apache.org/amqp/interop | 1 +
.../go/src/qpid.apache.org/amqp/interop_test.go | 381 +++++++++++++
.../go/src/qpid.apache.org/amqp/marshal.go | 250 +++++++++
.../go/src/qpid.apache.org/amqp/message.go | 347 ++++++++++++
.../go/src/qpid.apache.org/amqp/message_test.go | 166 ++++++
.../go/src/qpid.apache.org/amqp/types.go | 198 +++++++
.../go/src/qpid.apache.org/amqp/unmarshal.go | 556 +++++++++++++++++++
.../bindings/go/src/qpid.apache.org/amqp/url.go | 96 ++++
.../go/src/qpid.apache.org/amqp/url_test.go | 51 ++
.../src/qpid.apache.org/electron/connection.go | 192 +++++++
.../src/qpid.apache.org/electron/container.go | 71 +++
.../go/src/qpid.apache.org/electron/doc.go | 57 ++
.../go/src/qpid.apache.org/electron/endpoint.go | 68 +++
.../go/src/qpid.apache.org/electron/handler.go | 176 ++++++
.../go/src/qpid.apache.org/electron/link.go | 242 ++++++++
.../qpid.apache.org/electron/messaging_test.go | 412 ++++++++++++++
.../go/src/qpid.apache.org/electron/receiver.go | 232 ++++++++
.../go/src/qpid.apache.org/electron/sender.go | 319 +++++++++++
.../go/src/qpid.apache.org/electron/session.go | 98 ++++
.../go/src/qpid.apache.org/electron/time.go | 81 +++
.../go/src/qpid.apache.org/internal/error.go | 118 ++++
.../src/qpid.apache.org/internal/flexchannel.go | 82 +++
.../internal/flexchannel_test.go | 89 +++
.../go/src/qpid.apache.org/internal/safemap.go | 57 ++
.../go/src/qpid.apache.org/internal/uuid.go | 70 +++
.../go/src/qpid.apache.org/proton/README.md | 12 -
.../go/src/qpid.apache.org/proton/amqp/doc.go | 34 --
.../go/src/qpid.apache.org/proton/amqp/error.go | 66 ---
.../go/src/qpid.apache.org/proton/amqp/interop | 1 -
.../qpid.apache.org/proton/amqp/interop_test.go | 381 -------------
.../src/qpid.apache.org/proton/amqp/marshal.go | 250 ---------
.../src/qpid.apache.org/proton/amqp/message.go | 347 ------------
.../qpid.apache.org/proton/amqp/message_test.go | 166 ------
.../go/src/qpid.apache.org/proton/amqp/types.go | 198 -------
.../qpid.apache.org/proton/amqp/unmarshal.go | 556 -------------------
.../go/src/qpid.apache.org/proton/amqp/url.go | 96 ----
.../src/qpid.apache.org/proton/amqp/url_test.go | 51 --
.../proton/concurrent/connection.go | 213 -------
.../proton/concurrent/container.go | 71 ---
.../qpid.apache.org/proton/concurrent/doc.go | 46 --
.../proton/concurrent/endpoint.go | 87 ---
.../proton/concurrent/handler.go | 137 -----
.../qpid.apache.org/proton/concurrent/link.go | 232 --------
.../proton/concurrent/messaging_test.go | 205 -------
.../proton/concurrent/receiver.go | 241 --------
.../qpid.apache.org/proton/concurrent/sender.go | 190 -------
.../proton/concurrent/session.go | 114 ----
.../qpid.apache.org/proton/concurrent/time.go | 71 ---
.../go/src/qpid.apache.org/proton/doc.go | 49 +-
.../go/src/qpid.apache.org/proton/engine.go | 218 ++++----
.../go/src/qpid.apache.org/proton/handlers.go | 22 +-
.../qpid.apache.org/proton/internal/error.go | 121 ----
.../proton/internal/flexchannel.go | 82 ---
.../proton/internal/flexchannel_test.go | 89 ---
.../qpid.apache.org/proton/internal/safemap.go | 57 --
.../src/qpid.apache.org/proton/internal/uuid.go | 70 ---
.../go/src/qpid.apache.org/proton/message.go | 17 +-
.../go/src/qpid.apache.org/proton/wrappers.go | 25 +-
.../src/qpid.apache.org/proton/wrappers_gen.go | 2 +-
76 files changed, 5534 insertions(+), 4982 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/examples/go/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/go/CMakeLists.txt b/examples/go/CMakeLists.txt
index 873180d..1b68ebe 100644
--- a/examples/go/CMakeLists.txt
+++ b/examples/go/CMakeLists.txt
@@ -18,10 +18,11 @@
#
if(BUILD_GO)
- set(examples receive send broker)
+ set(examples electron/receive electron/send electron/broker proton/broker)
foreach(example ${examples})
- add_custom_target(go-example-${example} ALL
+ string(REPLACE / - target ${example})
+ add_custom_target(go-example-${target} ALL
COMMAND ${GO_BUILD} ${GO_EXAMPLE_FLAGS} -o ${CMAKE_CURRENT_BINARY_DIR}/${example} ${CMAKE_CURRENT_SOURCE_DIR}/${example}.go
DEPENDS go-packages qpid-proton)
endforeach()
@@ -30,8 +31,14 @@ if(BUILD_GO)
DEPENDS go-packages qpid-proton)
add_test(
- NAME go_example_test
- COMMAND ${CMAKE_CURRENT_BINARY_DIR}/example_test -broker broker)
+ NAME go_example_electron_test
+ COMMAND ${CMAKE_CURRENT_BINARY_DIR}/example_test -broker broker
+ WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/electron)
+
+ add_test(
+ NAME go_example_proton_test
+ COMMAND ${CMAKE_CURRENT_BINARY_DIR}/example_test -broker ../proton/broker
+ WORKING_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/electron)
list(APPEND ADDITIONAL_MAKE_CLEAN_FILES ${examples})
endif()
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/examples/go/README.md
----------------------------------------------------------------------
diff --git a/examples/go/README.md b/examples/go/README.md
index c0bfd85..0114d0e 100644
--- a/examples/go/README.md
+++ b/examples/go/README.md
@@ -2,22 +2,30 @@
There are 3 Go packages for proton:
-- qpid.apache.org/proton/concurrent: Easy-to-use, concurrent API for concurrent clients and servers.
-- qpid.apache.org/proton/amqp: Convert AMQP messages and data to and from Go data types.
+- qpid.apache.org/electron: Concurrent, procedural API for messaging clients and servers.
- qpid.apache.org/proton: Direct access to the event-driven, concurrent-unsafe proton library.
+- qpid.apache.org/amqp: Convert AMQP messages and data to and from Go data types.
-Most applications should use the `concurrent` package. The `proton` package is
-for applications that need low-level access to the proton library.
+`proton` and `electron` are alternative APIs for sending messages. `proton` is a
+direct wrapping of the concurrent-unsafe, event-driven C proton API. `electron`
+is a procedural, concurrent-safe interface that may be more convenient and
+familiar for Go programmers. The examples `proton/broker.go` and
+`electron/broker.go` give an illustration of how the APIs differ.
## Example programs
-- [receive.go](receive.go) receive from many connections concurrently.
-- [send.go](send.go) send to many connections concurrently.
+electron
+- [receive.go](electron/receive.go) receive from many connections concurrently.
+- [send.go](electron/send.go) send to many connections concurrently.
+- [broker.go](electron/broker.go) a simple broker using the electron API
+
+proton
+- [broker.go](proton/broker.go) a simple broker using the proton API
## Using the Go packages
-Use `go get qpid.apache.org/proton/concurrent` or check out the proton
-repository and set your GOPATH environment variable to include
+Use `go get qpid.apache.org/electron` or check out the proton repository and set
+your GOPATH environment variable to include
`/<path-to-proton>/proton-c/bindings/go`
The proton Go packages include C code so the cgo compiler needs to be able to
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/examples/go/broker.go
----------------------------------------------------------------------
diff --git a/examples/go/broker.go b/examples/go/broker.go
deleted file mode 100644
index 47d0a76..0000000
--- a/examples/go/broker.go
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-//
-// This is a simple AMQP broker implemented using the concurrent interface.
-//
-// It maintains a set of named in-memory queues of messages. Clients can send
-// messages to queues or subscribe to receive messages from them.
-//
-//
-
-package main
-
-import (
- "./util"
- "flag"
- "fmt"
- "log"
- "net"
- "os"
- "qpid.apache.org/proton/concurrent"
-)
-
-// Usage and command-line flags
-func usage() {
- fmt.Fprintf(os.Stderr, `
-Usage: %s
-A simple broker-like demo. Queues are created automatically for sender or receiver addrsses.
-`, os.Args[0])
- flag.PrintDefaults()
-}
-
-var addr = flag.String("addr", ":amqp", "Listening address")
-
-func main() {
- flag.Usage = usage
- flag.Parse()
- b := newBroker()
- err := b.listen(*addr)
- util.ExitIf(err)
-}
-
-type broker struct {
- container concurrent.Container
- queues util.QueueMap
-}
-
-func newBroker() *broker {
- return &broker{
- container: concurrent.NewContainer(""),
- queues: util.MakeQueueMap(),
- }
-}
-
-// Listen for incoming connections
-func (b *broker) listen(addr string) (err error) {
- listener, err := net.Listen("tcp", addr)
- if err != nil {
- return err
- }
- log.Printf("Listening on %s\n", listener.Addr())
- defer listener.Close()
- for {
- conn, err := listener.Accept()
- if err != nil {
- return err
- }
- c, err := b.container.Connection(conn)
- if err != nil {
- return err
- }
- // Make this a server connection. Must be done before Open()
- c.Server() // Server-side protocol negotiation.
- c.Listen() // Enable remotely-opened endpoints.
- if err := c.Open(); err != nil {
- return err
- }
- util.Debugf("accept %s\n", c)
- // Accept remotely-opened endpoints on the connection
- go b.accept(c)
- }
-}
-
-// accept remotely-opened endpoints (Session, Sender and Receiver)
-func (b *broker) accept(c concurrent.Connection) {
- for ep, err := c.Accept(); err == nil; ep, err = c.Accept() {
- switch ep := ep.(type) {
- case concurrent.Session:
- util.Debugf("accept session %s\n", ep)
- ep.Open()
- case concurrent.Sender:
- util.Debugf("accept sender %s\n", ep)
- ep.Open()
- go b.sender(ep)
- case concurrent.Receiver:
- util.Debugf("accept receiver %s\n", ep)
- ep.SetCapacity(100, true) // Pre-fetch 100 messages
- ep.Open()
- go b.receiver(ep)
- }
- }
-}
-
-// sender pops from a the queue in the sender's Source address and send messages.
-func (b *broker) sender(sender concurrent.Sender) {
- qname := sender.Settings().Source
- if qname == "" {
- log.Printf("invalid consumer, no source address: %s", sender)
- return
- }
- q := b.queues.Get(qname)
- for {
- m := <-q.Pop
- if m == nil {
- break
- }
- if sm, err := sender.Send(m); err == nil {
- sm.Forget() // FIXME aconway 2015-09-24: Ignore acknowledgements
- util.Debugf("send %s: %s\n", sender, util.FormatMessage(m))
- } else {
- util.Debugf("send error %s: %s\n", sender, err)
- q.Putback <- m
- break
- }
- }
-}
-
-func (b *broker) receiver(receiver concurrent.Receiver) {
- qname := receiver.Settings().Target
- if qname == "" {
- log.Printf("invalid producer, no target address: %s", receiver)
- return
- }
- q := b.queues.Get(qname)
- for {
- if rm, err := receiver.Receive(); err == nil {
- util.Debugf("recv %s: %s\n", receiver, util.FormatMessage(rm.Message))
- q.Push <- rm.Message
- rm.Accept()
- } else {
- util.Debugf("recv error %s: %s\n", receiver, err)
- break
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/examples/go/electron/broker.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/broker.go b/examples/go/electron/broker.go
new file mode 100644
index 0000000..4b877df
--- /dev/null
+++ b/examples/go/electron/broker.go
@@ -0,0 +1,148 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+//
+// This is a simple AMQP broker implemented using the procedural electron package.
+//
+// It maintains a set of named in-memory queues of messages. Clients can send
+// messages to queues or subscribe to receive messages from them.
+//
+
+package main
+
+import (
+ "../util"
+ "flag"
+ "fmt"
+ "net"
+ "os"
+ "qpid.apache.org/electron"
+)
+
+// Usage and command-line flags
+func usage() {
+ fmt.Fprintf(os.Stderr, `
+Usage: %s
+A simple broker-like demo. Queues are created automatically for sender or receiver addrsses.
+`, os.Args[0])
+ flag.PrintDefaults()
+}
+
+var addr = flag.String("addr", ":amqp", "Listening address")
+var credit = flag.Int("credit", 100, "Receiver credit window")
+var qsize = flag.Int("qsize", 1000, "Max queue size")
+
+func main() {
+ flag.Usage = usage
+ flag.Parse()
+
+ b := newBroker()
+ listener, err := net.Listen("tcp", *addr)
+ util.ExitIf(err)
+ defer listener.Close()
+ fmt.Printf("Listening on %s\n", listener.Addr())
+
+ // Loop accepting new connections.
+ for {
+ conn, err := listener.Accept()
+ if err != nil {
+ util.Debugf("Accept error: %s", err)
+ continue
+ }
+ if err := b.connection(conn); err != nil {
+ if err != nil {
+ util.Debugf("Connection error: %s", err)
+ continue
+ }
+ }
+ }
+}
+
+type broker struct {
+ queues util.Queues
+ container electron.Container
+}
+
+func newBroker() *broker {
+ return &broker{util.MakeQueues(*qsize), electron.NewContainer("")}
+}
+
+// connection creates a new AMQP connection for a net.Conn.
+func (b *broker) connection(conn net.Conn) error {
+ c, err := b.container.Connection(conn)
+ if err != nil {
+ return err
+ }
+ c.Server() // Enable server-side protocol negotiation.
+ c.Listen(b.accept) // Call accept() for remotely-opened endpoints.
+ if err := c.Open(); err != nil {
+ return err
+ }
+ util.Debugf("Accepted %s", c)
+ return nil
+}
+
+// accept remotely-opened endpoints (Session, Sender and Receiver)
+// and start goroutines to service them.
+func (b *broker) accept(ep electron.Endpoint) error {
+ switch ep := ep.(type) {
+ case electron.Sender:
+ util.Debugf("%s opened", ep)
+ go b.sender(ep)
+ case electron.Receiver:
+ util.Debugf("%s opened", ep)
+ ep.SetCapacity(100, true) // Pre-fetch 100 messages
+ go b.receiver(ep)
+ }
+ return nil
+}
+
+// sender pops messages from a queue and sends them.
+func (b *broker) sender(sender electron.Sender) {
+ q := b.queues.Get(sender.Source())
+ for {
+ m, ok := <-q
+ if !ok { // Queue closed
+ return
+ }
+ if err := sender.SendForget(m); err == nil {
+ util.Debugf("send %s: %s", sender, util.FormatMessage(m))
+ } else {
+ util.Debugf("send error %s: %s", sender, err)
+ q <- m // Put it back on the queue.
+ break
+ }
+ }
+}
+
+// receiver receives messages and pushes to the queue named by the receivers's
+// Target address
+func (b *broker) receiver(receiver electron.Receiver) {
+ q := b.queues.Get(receiver.Target())
+ for {
+ if rm, err := receiver.Receive(); err == nil {
+ util.Debugf("%s: received %s", receiver, util.FormatMessage(rm.Message))
+ q <- rm.Message
+ rm.Accept()
+ } else {
+ util.Debugf("%s: error %s", receiver, err)
+ break
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/examples/go/electron/receive.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/receive.go b/examples/go/electron/receive.go
new file mode 100644
index 0000000..7639375
--- /dev/null
+++ b/examples/go/electron/receive.go
@@ -0,0 +1,122 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package main
+
+import (
+ "../util"
+ "flag"
+ "fmt"
+ "log"
+ "net"
+ "os"
+ "path"
+ "qpid.apache.org/amqp"
+ "qpid.apache.org/electron"
+ "sync"
+)
+
+// Usage and command-line flags
+func usage() {
+ fmt.Fprintf(os.Stderr, `Usage: %s url [url ...]
+Receive messages from all the listed URLs concurrently and print them.
+`, os.Args[0])
+ flag.PrintDefaults()
+}
+
+var count = flag.Uint64("count", 1, "Stop after receiving this many messages.")
+
+func main() {
+ flag.Usage = usage
+ flag.Parse()
+
+ urls := flag.Args() // Non-flag arguments are URLs to receive from
+ if len(urls) == 0 {
+ log.Println("No URL provided")
+ usage()
+ os.Exit(1)
+ }
+
+ messages := make(chan amqp.Message) // Channel for messages from goroutines to main()
+ stop := make(chan struct{}) // Closing this channel means the program is stopping.
+ var wait sync.WaitGroup // Used by main() to wait for all goroutines to end.
+ wait.Add(len(urls)) // Wait for one goroutine per URL.
+
+ _, prog := path.Split(os.Args[0])
+ container := electron.NewContainer(fmt.Sprintf("%v:%v", prog, os.Getpid()))
+ connections := make(chan electron.Connection, len(urls)) // Connections to close on exit
+
+ // Start a goroutine to for each URL to receive messages and send them to the messages channel.
+ // main() receives and prints them.
+ for _, urlStr := range urls {
+ util.Debugf("Connecting to %s\n", urlStr)
+ go func(urlStr string) { // Start the goroutine
+
+ defer wait.Done() // Notify main() when this goroutine is done.
+ url, err := amqp.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults.
+ util.ExitIf(err)
+
+ // Open a new connection
+ conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port"
+ util.ExitIf(err)
+ c, err := container.Connection(conn)
+ util.ExitIf(err)
+ util.ExitIf(c.Open())
+ connections <- c // Save connection so we can Close() when main() ends
+
+ // Create a Receiver using the path of the URL as the source address
+ r, err := c.Receiver(electron.Source(url.Path))
+ util.ExitIf(err)
+
+ // Loop receiving messages and sending them to the main() goroutine
+ for {
+ rm, err := r.Receive()
+ if err == electron.Closed {
+ return
+ }
+ util.ExitIf(err)
+ select { // Send m to main() or stop
+ case messages <- rm.Message: // Send to main()
+ case <-stop: // The program is stopping.
+ return
+ }
+ }
+ }(urlStr)
+ }
+
+ // All goroutines are started, we are receiving messages.
+ fmt.Printf("Listening on %d connections\n", len(urls))
+
+ // print each message until the count is exceeded.
+ for i := uint64(0); i < *count; i++ {
+ m := <-messages
+ util.Debugf("%s\n", util.FormatMessage(m))
+ }
+ fmt.Printf("Received %d messages\n", *count)
+
+ // Close all connections, this will interrupt goroutines blocked in Receiver.Receive()
+ for i := 0; i < len(urls); i++ {
+ c := <-connections
+ util.Debugf("close %s", c)
+ c.Close(nil)
+ }
+ close(stop) // Signal all goroutines to stop.
+ wait.Wait() // Wait for all goroutines to finish.
+ close(messages)
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/examples/go/electron/send.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/send.go b/examples/go/electron/send.go
new file mode 100644
index 0000000..94a77e7
--- /dev/null
+++ b/examples/go/electron/send.go
@@ -0,0 +1,123 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package main
+
+import (
+ "../util"
+ "flag"
+ "fmt"
+ "log"
+ "net"
+ "os"
+ "path"
+ "qpid.apache.org/amqp"
+ "qpid.apache.org/electron"
+ "sync"
+)
+
+// Usage and command-line flags
+func usage() {
+ fmt.Fprintf(os.Stderr, `Usage: %s url [url ...]
+Send messages to each URL concurrently with body "<url-path>-<n>" where n is the message number.
+`, os.Args[0])
+ flag.PrintDefaults()
+}
+
+var count = flag.Int64("count", 1, "Send this may messages per address.")
+
+type sent struct {
+ name string
+ sentMessage electron.SentMessage
+}
+
+func main() {
+ flag.Usage = usage
+ flag.Parse()
+
+ urls := flag.Args() // Non-flag arguments are URLs to receive from
+ if len(urls) == 0 {
+ log.Println("No URL provided")
+ flag.Usage()
+ os.Exit(1)
+ }
+
+ sentChan := make(chan sent) // Channel to receive all the delivery receipts.
+ var wait sync.WaitGroup // Used by main() to wait for all goroutines to end.
+ wait.Add(len(urls)) // Wait for one goroutine per URL.
+
+ _, prog := path.Split(os.Args[0])
+ container := electron.NewContainer(fmt.Sprintf("%v:%v", prog, os.Getpid()))
+ var connections []electron.Connection // Store connctions to close on exit
+
+ // Start a goroutine for each URL to send messages.
+ for _, urlStr := range urls {
+ util.Debugf("Connecting to %v\n", urlStr)
+ go func(urlStr string) {
+
+ defer wait.Done() // Notify main() that this goroutine is done.
+ url, err := amqp.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults.
+ util.ExitIf(err)
+
+ // Open a new connection
+ conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port"
+ util.ExitIf(err)
+ c, err := container.Connection(conn)
+ util.ExitIf(err)
+ err = c.Open()
+ util.ExitIf(err)
+ connections = append(connections, c) // Save connection so it will be closed when main() ends
+
+ // Create a Sender using the path of the URL as the AMQP address
+ s, err := c.Sender(electron.Target(url.Path))
+ util.ExitIf(err)
+
+ // Loop sending messages.
+ for i := int64(0); i < *count; i++ {
+ m := amqp.NewMessage()
+ body := fmt.Sprintf("%v-%v", url.Path, i)
+ m.Marshal(body)
+ sentMessage, err := s.Send(m)
+ util.ExitIf(err)
+ sentChan <- sent{body, sentMessage}
+ }
+ }(urlStr)
+ }
+
+ // Wait for all the acknowledgements
+ expect := int(*count) * len(urls)
+ util.Debugf("Started senders, expect %v acknowledgements\n", expect)
+ for i := 0; i < expect; i++ {
+ d := <-sentChan
+ disposition, err := d.sentMessage.Disposition()
+ if err != nil {
+ util.Debugf("acknowledgement[%v] %v error: %v\n", i, d.name, err)
+ } else {
+ util.Debugf("acknowledgement[%v] %v (%v)\n", i, d.name, disposition)
+ }
+ }
+ fmt.Printf("Received all %v acknowledgements\n", expect)
+
+ wait.Wait() // Wait for all goroutines to finish.
+ for _, c := range connections { // Close all connections
+ if c != nil {
+ c.Close(nil)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/examples/go/example_test.go
----------------------------------------------------------------------
diff --git a/examples/go/example_test.go b/examples/go/example_test.go
index 2afd95c..a4d7d80 100644
--- a/examples/go/example_test.go
+++ b/examples/go/example_test.go
@@ -110,14 +110,14 @@ func checkEqual(want interface{}, got interface{}) error {
// 'go build' uses the installed copy of the proton Go libraries, which may be out of date.
func checkStaleLibs(t *testing.T) {
var stale []string
- pp := "qpid.apache.org/proton"
- for _, p := range []string{pp, pp + "/amqp", pp + "/concurrent"} {
+ pp := "qpid.apache.org"
+ for _, p := range []string{pp + "/proton", pp + "/amqp", pp + "/electron"} {
out, err := exec.Command("go", "list", "-f", "{{.Stale}}", p).CombinedOutput()
if err != nil {
t.Fatalf("failed to execute 'go list': %v\n%v", err, string(out))
}
if string(out) != "false\n" {
- stale = append(stale, pp)
+ stale = append(stale, p)
}
}
if len(stale) > 0 {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/examples/go/proton/broker.go
----------------------------------------------------------------------
diff --git a/examples/go/proton/broker.go b/examples/go/proton/broker.go
new file mode 100644
index 0000000..dbb4a82
--- /dev/null
+++ b/examples/go/proton/broker.go
@@ -0,0 +1,299 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+//
+// This is a simple AMQP broker implemented using the event-driven proton package.
+//
+// It maintains a set of named in-memory queues of messages. Clients can send
+// messages to queues or subscribe to receive messages from them.
+//
+
+package main
+
+import (
+ "../util"
+ "flag"
+ "fmt"
+ "io"
+ "net"
+ "os"
+ "qpid.apache.org/amqp"
+ "qpid.apache.org/proton"
+)
+
+// Usage and command-line flags
+func usage() {
+ fmt.Fprintf(os.Stderr, `
+Usage: %s
+A simple broker-like demo. Queues are created automatically for sender or receiver addrsses.
+`, os.Args[0])
+ flag.PrintDefaults()
+}
+
+var addr = flag.String("addr", ":amqp", "Listening address")
+var credit = flag.Int("credit", 100, "Receiver credit window")
+var qsize = flag.Int("qsize", 1000, "Max queue size")
+
+func main() {
+ flag.Usage = usage
+ flag.Parse()
+
+ b := newBroker()
+ listener, err := net.Listen("tcp", *addr)
+ util.ExitIf(err)
+ defer listener.Close()
+ fmt.Printf("Listening on %s\n", listener.Addr())
+
+ // Loop accepting new connections.
+ for {
+ conn, err := listener.Accept()
+ if err != nil {
+ util.Debugf("Accept error: %s", err)
+ continue
+ }
+ if err := b.connection(conn); err != nil {
+ if err != nil {
+ util.Debugf("Connection error: %s", err)
+ continue
+ }
+ }
+ }
+}
+
+type broker struct {
+ queues util.Queues
+}
+
+func newBroker() *broker {
+ return &broker{util.MakeQueues(*qsize)}
+}
+
+// connection creates a new AMQP connection for a net.Conn.
+func (b *broker) connection(conn net.Conn) error {
+ delegator := proton.NewMessagingDelegator(newHandler(&b.queues, *credit))
+ // We want to accept messages when they are enqueued, not just when they
+ // are received, so we turn off auto-accept and prefetch by the handler.
+ delegator.Prefetch = 0
+ delegator.AutoAccept = false
+ engine, err := proton.NewEngine(conn, delegator)
+ if err != nil {
+ return err
+ }
+ engine.Server() // Enable server-side protocol negotiation.
+ go func() { // Start goroutine to run the engine event loop
+ engine.Run()
+ util.Debugf("Closed %s", engine)
+ }()
+ util.Debugf("Accepted %s", engine)
+ return nil
+}
+
+// receiver is a channel to buffer messages waiting to go on the queue.
+type receiver chan receivedMessage
+
+// receivedMessage is a message and the corresponding delivery for acknowledgement.
+type receivedMessage struct {
+ delivery proton.Delivery
+ message amqp.Message
+}
+
+// sender is a signal channel, closed when we are done sending.
+type sender chan struct{}
+
+// handler handles AMQP events. There is one handler per connection. The
+// handler does not need to be concurrent-safe as proton will serialize all
+// calls to a handler. We will use channels to communicate from the handler
+// to goroutines sending and receiving messages.
+type handler struct {
+ queues *util.Queues
+ credit int // Credit window for receiver flow control.
+ receivers map[proton.Link]receiver
+ senders map[proton.Link]sender
+}
+
+func newHandler(queues *util.Queues, credit int) *handler {
+ return &handler{
+ queues,
+ credit,
+ make(map[proton.Link]receiver),
+ make(map[proton.Link]sender),
+ }
+}
+
+// Handle an AMQP event.
+func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) {
+ switch t {
+
+ case proton.MLinkOpening:
+ l := e.Link()
+ var err error
+ if l.IsReceiver() {
+ err = h.receiver(l)
+ } else { // IsSender()
+ err = h.sender(l)
+ }
+ if err == nil {
+ util.Debugf("%s opened", l)
+ } else {
+ util.Debugf("%s open error: %s", l, err)
+ proton.CloseError(l, err)
+ }
+
+ case proton.MLinkClosing:
+ l := e.Link()
+ if r, ok := h.receivers[l]; ok {
+ close(r)
+ delete(h.receivers, l)
+ } else if s, ok := h.senders[l]; ok {
+ close(s)
+ delete(h.senders, l)
+ }
+ util.Debugf("%s closed", l)
+
+ case proton.MSendable:
+ l := e.Link()
+ q := h.queues.Get(l.RemoteSource().Address())
+ if n, err := h.sendAll(e.Link(), q); err == nil && n > 0 {
+ // Still have credit, start a watcher.
+ go h.sendWatch(e.Link(), q)
+ }
+
+ case proton.MMessage:
+ l := e.Link()
+ d := e.Delivery()
+ m, err := d.Message() // Must decode message immediately before link state changes.
+ if err != nil {
+ util.Debugf("%s error decoding message: %s", e.Link(), err)
+ proton.CloseError(l, err)
+ } else {
+ // This will not block, AMQP credit prevents us from overflowing the buffer.
+ h.receivers[l] <- receivedMessage{d, m}
+ util.Debugf("%s received %s", l, util.FormatMessage(m))
+ }
+
+ case proton.MConnectionClosing, proton.MDisconnected:
+ for l, r := range h.receivers {
+ close(r)
+ delete(h.receivers, l)
+ }
+ for l, s := range h.senders {
+ close(s)
+ delete(h.senders, l)
+ }
+ }
+}
+
+// receiver is called by the handler when a receiver link opens.
+//
+// It sets up data structures in the handler and then starts a goroutine
+// to receive messages and put them on a queue.
+func (h *handler) receiver(l proton.Link) error {
+ q := h.queues.Get(l.RemoteTarget().Address())
+ buffer := make(receiver, h.credit)
+ h.receivers[l] = buffer
+ l.Flow(cap(buffer)) // credit==cap(buffer) so we won't overflow the buffer.
+ go h.runReceive(l, buffer, q)
+ return nil
+}
+
+// runReceive moves messages from buffer to queue
+func (h *handler) runReceive(l proton.Link, buffer receiver, q util.Queue) {
+ for rm := range buffer {
+ q <- rm.message
+ rm2 := rm // Save in temp var for injected closure
+ err := l.Connection().Injecter().Inject(func() {
+ rm2.delivery.Accept()
+ l.Flow(1)
+ })
+ if err != nil {
+ util.Debugf("%s receive error: %s", l, err)
+ proton.CloseError(l, err)
+ }
+ }
+}
+
+// sender is called by the handler when a sender link opens.
+// It sets up a sender structures in the handler.
+func (h *handler) sender(l proton.Link) error {
+ h.senders[l] = make(sender)
+ return nil
+}
+
+// send one message in handler context, assumes we have credit.
+func (h *handler) send(l proton.Link, m amqp.Message, q util.Queue) error {
+ delivery, err := l.Send(m)
+ if err != nil {
+ h.closeSender(l, err)
+ return err
+ }
+ delivery.Settle() // Pre-settled, unreliable.
+ util.Debugf("%s sent %s", l, util.FormatMessage(m))
+ return nil
+}
+
+// sendAll sends as many messages as possible without blocking, call in handler context.
+// Returns the number of credits left, >0 means we ran out of messages.
+func (h *handler) sendAll(l proton.Link, q util.Queue) (int, error) {
+ for l.Credit() > 0 {
+ select {
+ case m, ok := <-q:
+ if ok { // Got a message
+ if err := h.send(l, m, q); err != nil {
+ return 0, err
+ }
+ } else { // Queue is closed
+ l.Close()
+ return 0, io.EOF
+ }
+ default: // Queue empty
+ return l.Credit(), nil
+ }
+ }
+ return l.Credit(), nil
+}
+
+// sendWatch watches the queue for more messages and re-runs sendAll.
+// Run in a separate goroutine, so must inject handler functions.
+func (h *handler) sendWatch(l proton.Link, q util.Queue) {
+ select {
+ case m, ok := <-q:
+ l.Connection().Injecter().Inject(func() {
+ if ok {
+ if h.send(l, m, q) != nil {
+ return
+ }
+ if n, err := h.sendAll(l, q); err != nil {
+ return
+ } else if n > 0 {
+ go h.sendWatch(l, q) // Start a new watcher.
+ }
+ }
+ })
+ case <-h.senders[l]: // Closed
+ return
+ }
+}
+
+// closeSender closes a sender link and signals goroutines processing that sender.
+func (h *handler) closeSender(l proton.Link, err error) {
+ util.Debugf("%s sender closed: %s", l, err)
+ proton.CloseError(l, err)
+ close(h.senders[l])
+ delete(h.senders, l)
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/examples/go/receive.go
----------------------------------------------------------------------
diff --git a/examples/go/receive.go b/examples/go/receive.go
deleted file mode 100644
index 86244d7..0000000
--- a/examples/go/receive.go
+++ /dev/null
@@ -1,119 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package main
-
-import (
- "./util"
- "flag"
- "fmt"
- "log"
- "net"
- "os"
- "qpid.apache.org/proton/amqp"
- "qpid.apache.org/proton/concurrent"
- "sync"
-)
-
-// Usage and command-line flags
-func usage() {
- fmt.Fprintf(os.Stderr, `Usage: %s url [url ...]
-Receive messages from all the listed URLs concurrently and print them.
-`, os.Args[0])
- flag.PrintDefaults()
-}
-
-var count = flag.Uint64("count", 1, "Stop after receiving this many messages.")
-
-func main() {
- flag.Usage = usage
- flag.Parse()
-
- urls := flag.Args() // Non-flag arguments are URLs to receive from
- if len(urls) == 0 {
- log.Println("No URL provided")
- usage()
- os.Exit(1)
- }
-
- messages := make(chan amqp.Message) // Channel for messages from goroutines to main()
- stop := make(chan struct{}) // Closing this channel means the program is stopping.
- var wait sync.WaitGroup // Used by main() to wait for all goroutines to end.
- wait.Add(len(urls)) // Wait for one goroutine per URL.
-
- container := concurrent.NewContainer("")
- connections := make(chan concurrent.Connection, len(urls)) // Connections to close on exit
-
- // Start a goroutine to for each URL to receive messages and send them to the messages channel.
- // main() receives and prints them.
- for _, urlStr := range urls {
- util.Debugf("Connecting to %s\n", urlStr)
- go func(urlStr string) { // Start the goroutine
-
- defer wait.Done() // Notify main() when this goroutine is done.
- url, err := amqp.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults.
- util.ExitIf(err)
-
- // Open a new connection
- conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port"
- util.ExitIf(err)
- c, err := container.Connection(conn)
- util.ExitIf(err)
- util.ExitIf(c.Open())
- connections <- c // Save connection so we can Close() when main() ends
-
- // Create a Receiver using the path of the URL as the source address
- r, err := c.Receiver(url.Path)
- util.ExitIf(err)
-
- // Loop receiving messages and sending them to the main() goroutine
- for {
- rm, err := r.Receive()
- if err == concurrent.Closed {
- return
- }
- util.ExitIf(err)
- select { // Send m to main() or stop
- case messages <- rm.Message: // Send to main()
- case <-stop: // The program is stopping.
- return
- }
- }
- }(urlStr)
- }
-
- // All goroutines are started, we are receiving messages.
- fmt.Printf("Listening on %d connections\n", len(urls))
-
- // print each message until the count is exceeded.
- for i := uint64(0); i < *count; i++ {
- m := <-messages
- util.Debugf("%s\n", util.FormatMessage(m))
- }
- fmt.Printf("Received %d messages\n", *count)
-
- // Close all connections, this will interrupt goroutines blocked in Receiver.Receive()
- for i := 0; i < len(urls); i++ {
- c := <-connections
- c.Disconnect(nil) // FIXME aconway 2015-09-25: Close
- }
- close(stop) // Signal all goroutines to stop.
- wait.Wait() // Wait for all goroutines to finish.
- close(messages)
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/examples/go/send.go
----------------------------------------------------------------------
diff --git a/examples/go/send.go b/examples/go/send.go
deleted file mode 100644
index edac2ae..0000000
--- a/examples/go/send.go
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package main
-
-import (
- "./util"
- "flag"
- "fmt"
- "log"
- "net"
- "os"
- "qpid.apache.org/proton/amqp"
- "qpid.apache.org/proton/concurrent"
- "sync"
-)
-
-// Usage and command-line flags
-func usage() {
- fmt.Fprintf(os.Stderr, `Usage: %s url [url ...]
-Send messages to each URL concurrently with body "<url-path>-<n>" where n is the message number.
-`, os.Args[0])
- flag.PrintDefaults()
-}
-
-var count = flag.Int64("count", 1, "Send this may messages per address.")
-
-type sent struct {
- name string
- sentMessage concurrent.SentMessage
-}
-
-func main() {
- flag.Usage = usage
- flag.Parse()
-
- urls := flag.Args() // Non-flag arguments are URLs to receive from
- if len(urls) == 0 {
- log.Println("No URL provided")
- flag.Usage()
- os.Exit(1)
- }
-
- sentChan := make(chan sent) // Channel to receive all the delivery receipts.
- var wait sync.WaitGroup // Used by main() to wait for all goroutines to end.
- wait.Add(len(urls)) // Wait for one goroutine per URL.
-
- container := concurrent.NewContainer("")
- var connections []concurrent.Connection // Store connctions to close on exit
-
- // Start a goroutine for each URL to send messages.
- for _, urlStr := range urls {
- util.Debugf("Connecting to %v\n", urlStr)
- go func(urlStr string) {
-
- defer wait.Done() // Notify main() that this goroutine is done.
- url, err := amqp.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults.
- util.ExitIf(err)
-
- // Open a new connection
- conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port"
- util.ExitIf(err)
- c, err := container.Connection(conn)
- util.ExitIf(err)
- err = c.Open()
- util.ExitIf(err)
- connections = append(connections, c) // Save connection so it will be closed when main() ends
-
- // Create a Sender using the path of the URL as the AMQP address
- s, err := c.Sender(url.Path)
- util.ExitIf(err)
-
- // Loop sending messages.
- for i := int64(0); i < *count; i++ {
- m := amqp.NewMessage()
- body := fmt.Sprintf("%v-%v", url.Path, i)
- m.Marshal(body)
- sentMessage, err := s.Send(m)
- util.ExitIf(err)
- sentChan <- sent{body, sentMessage}
- }
- }(urlStr)
- }
-
- // Wait for all the acknowledgements
- expect := int(*count) * len(urls)
- util.Debugf("Started senders, expect %v acknowledgements\n", expect)
- for i := 0; i < expect; i++ {
- d := <-sentChan
- disposition, err := d.sentMessage.Disposition()
- if err != nil {
- util.Debugf("acknowledgement[%v] %v error: %v\n", i, d.name, err)
- } else {
- util.Debugf("acknowledgement[%v] %v (%v)\n", i, d.name, disposition)
- }
- }
- fmt.Printf("Received all %v acknowledgements\n", expect)
-
- wait.Wait() // Wait for all goroutines to finish.
- for _, c := range connections { // Close all connections
- if c != nil {
- c.Close(nil)
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/examples/go/util/queue.go
----------------------------------------------------------------------
diff --git a/examples/go/util/queue.go b/examples/go/util/queue.go
index 075c4d2..d844c0d 100644
--- a/examples/go/util/queue.go
+++ b/examples/go/util/queue.go
@@ -20,87 +20,32 @@ under the License.
package util
import (
- "container/list"
- "qpid.apache.org/proton/amqp"
+ "qpid.apache.org/amqp"
"sync"
)
-// Queue is a concurrent-safe queue of amqp.Message.
-type Queue struct {
- name string
- messages list.List // List of amqp.Message
- // Send to Push to push a message onto back of queue
- Push chan amqp.Message
- // Receive from Pop to pop a message from the front of the queue.
- Pop chan amqp.Message
- // Send to Putback to put an unsent message back on the front of the queue.
- Putback chan amqp.Message
-}
+// Use a buffered channel as a very simple queue.
+type Queue chan amqp.Message
-func NewQueue(name string) *Queue {
- q := &Queue{
- name: name,
- Push: make(chan amqp.Message),
- Pop: make(chan amqp.Message),
- Putback: make(chan amqp.Message),
- }
- go q.run()
- return q
+// Concurrent-safe map of queues.
+type Queues struct {
+ queueSize int
+ m map[string]Queue
+ lock sync.Mutex
}
-// Close the queue. Any remaining messages on Pop can still be received.
-func (q *Queue) Close() { close(q.Push); close(q.Putback) }
-
-// Run runs the queue, returns when q.Close() is called.
-func (q *Queue) run() {
- defer close(q.Pop)
- for {
- var pop chan amqp.Message
- var front amqp.Message
- if el := q.messages.Front(); el != nil {
- front = el.Value.(amqp.Message)
- pop = q.Pop // Only select for pop if there is something to pop.
- }
- select {
- case m, ok := <-q.Push:
- if !ok {
- return
- }
- Debugf("%s push: %s\n", q.name, FormatMessage(m))
- q.messages.PushBack(m)
- case m, ok := <-q.Putback:
- Debugf("%s put-back: %s\n", q.name, FormatMessage(m))
- if !ok {
- return
- }
- q.messages.PushFront(m)
- case pop <- front:
- Debugf("%s pop: %s\n", q.name, FormatMessage(front))
- q.messages.Remove(q.messages.Front())
- }
- }
+func MakeQueues(queueSize int) Queues {
+ return Queues{queueSize: queueSize, m: make(map[string]Queue)}
}
-// QueueMap is a concurrent-safe map of queues that creates new queues
-// on demand.
-type QueueMap struct {
- lock sync.Mutex
- m map[string]*Queue
-}
-
-func MakeQueueMap() QueueMap { return QueueMap{m: make(map[string]*Queue)} }
-
-func (qm *QueueMap) Get(name string) *Queue {
- if name == "" {
- panic("Attempt to get queue with no name")
- }
- qm.lock.Lock()
- defer qm.lock.Unlock()
- q := qm.m[name]
+// Create a queue if not found.
+func (qs *Queues) Get(name string) Queue {
+ qs.lock.Lock()
+ defer qs.lock.Unlock()
+ q := qs.m[name]
if q == nil {
- q = NewQueue(name)
- qm.m[name] = q
- Debugf("queue %s create", name)
+ q = make(Queue, qs.queueSize)
+ qs.m[name] = q
}
return q
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/examples/go/util/util.go
----------------------------------------------------------------------
diff --git a/examples/go/util/util.go b/examples/go/util/util.go
index 72c6646..f158386 100644
--- a/examples/go/util/util.go
+++ b/examples/go/util/util.go
@@ -27,7 +27,7 @@ import (
"log"
"os"
"path"
- "qpid.apache.org/proton/amqp"
+ "qpid.apache.org/amqp"
)
// Debug flag "-debug" enables debug output with Debugf
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/CMakeLists.txt b/proton-c/bindings/go/CMakeLists.txt
index 0631eae..d24bf2e 100644
--- a/proton-c/bindings/go/CMakeLists.txt
+++ b/proton-c/bindings/go/CMakeLists.txt
@@ -45,8 +45,8 @@ if (BUILD_GO)
# Install packages in the source tree, go tools aren't friendly otherwise.
# All build output goes in git-ignored pkg or bin subdirectories.
- set(qgo "qpid.apache.org/proton")
- set(packages ${qgo} ${qgo}/amqp ${qgo}/concurrent ${qgo}/internal)
+ set(q "qpid.apache.org")
+ set(packages ${q}/amqp ${q}/internal ${q}/proton ${q}/electron)
add_custom_target(go-packages ALL
COMMAND ${GO_INSTALL} ${packages}
WORKING_DIRECTORY ${CMAKE_BINARY_DIR}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/README.md
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/README.md b/proton-c/bindings/go/README.md
deleted file mode 100644
index 14ccf0f..0000000
--- a/proton-c/bindings/go/README.md
+++ /dev/null
@@ -1,150 +0,0 @@
-# Go binding for proton
-
-This is a a [Go](http://golang.org) binding for proton.
-The API is subject to change but is stabilizing.
-
-Feedback is strongly encouraged:
-
-- Email <pr...@qpid.apache.org>
-- Create issues <https://issues.apache.org/jira/browse/PROTON>, attach patches to an issue.
-
-The package documentation is available at: <http://godoc.org/qpid.apache.org/proton>
-
-See the [examples](https://github.com/apache/qpid-proton/blob/master/examples/cpp/README.mdw)
-for working examples and practical instructions on how to get started.
-
-The rest of this page discusses the high-level goals and design issues.
-
-## Goals
-
-The API should
-
-- be idiomatic, unsurprising, and easy to use for Go developers.
-- support client and server development.
-- make simple tasks simple.
-- provide deep access to AMQP protocol when that is required.
-
-There are two types of developer we want to support
-
-1. Go developers using AMQP as a message transport:
- - Straightforward conversions between Go built-in types and AMQP types.
- - Easy message exchange via Go channels to support use in goroutines.
-
-2. AMQP-aware developers using Go as an implementation language:
- - Go types to exactly represent all AMQP types and encoding details.
- - Full access to detailed AMQP concepts: sessions, links, deliveries etc.
-
-## Status
-
-There are 3 go packages for proton:
-
-- qpid.apache.org/proton/amqp: converts AMQP messages and data types to and from Go data types.
-- qpid.apache.org/proton/concurrent: easy-to-use, concurrent API for clients and servers.
-- qpid.apache.org/proton: full low-level access to the proton engine.
-
-The `amqp` package provides conversions between AMQP and Go data types that are
-used by the other two packages.
-
-The `concurrent` package provides a simple procedural API that can be used with
-goroutines to construct concurrent AMQP clients and servers.
-
-The `proton` package is a concurrency-unsafe, event-driven API. It is a very
-thin wrapper providing almost direct access to the underlying proton C API.
-
-The `concurrent` package will probably be more familiar and convenient to Go
-programmers for most use cases. The `proton` package may be more familiar if
-you have used proton in other languages.
-
-Note the `concurrent` package itself is implemented in terms of the `proton`
-package. It takes care of running concurrency-unsafe `proton` code in dedicated
-goroutines and setting up channels to move data between user and proton
-goroutines safely. It hides all this complexity behind a simple procedural
-interface rather than presenting an event-driven interface.
-
-See the [examples](../../../examples/go/README.md) for a better illustration of the APIs.
-
-### Why two APIs?
-
-Go is a concurrent language and encourages applications to be divided into
-concurrent *goroutines*. It provides traditional locking but it encourages the
-use *channels* to communicate between goroutines without explicit locks:
-
- "Share memory by communicating, don't communicate by sharing memory"
-
-The idea is that a given value is only operated on by one goroutine at a time,
-but values can easily be passed from one goroutine to another.
-
-Go literature distinguishes between:
-
-- *concurrency*: "keeping track of things that could be done in parallel"
-- *parallelism*: "actually doing things in parallel"
-
-The application expresses concurrency by starting goroutines for potentially
-concurrent tasks. The Go run-times schedule the activity of goroutines onto a
-small number (possibly one) of actual parallel executions.
-
-Even with *no* parallelism, concurrency lets the Go run-times *order* work with
-respect to events like file descriptors being readable/writable, channels having
-data, timers firing etc. Go automatically takes care of switching out goroutines
-that block or sleep so it is normal to write code in terms of blocking calls.
-
-Event-driven programming (such as poll, epoll, select or the `proton` package)
-also channels unpredictably ordered events to actions in one or a small pool of
-execution threads. However this requires a different style of programming:
-"event-driven" or "reactive" programming. Go developers call it "inside-out"
-programming. In an event-driven architecture blocking is a big problem as it
-consumes a scarce thread of execution, so actions that take time to complete
-have to be re-structured in terms of future event delivery.
-
-The promise of Go is that you can express your application in concurrent,
-procedural terms with simple blocking calls and the Go run-times will turn it
-inside-out for you. Write as many goroutines as you want, and let Go interleave
-and schedule them efficiently.
-
-For example: the Go equivalent of listening for connections is a goroutine with
-a simple endless loop that calls a blocking Listen() function and starts a
-goroutine for each new connection. Each connection has its own goroutine that
-deals with just that connection till it closes.
-
-The benefit is that the variables and logic live closer together. Once you're in
-a goroutine, you have everything you need in local variables, and they are
-preserved across blocking calls. There's no need to store details in context
-objects that you have to look up when handling a later event to figure out how
-to continue where you left off.
-
-The `proton` API is important because it is close to the original proton-C
-reactive API and gives you direct access to the underlying library. However it
-is un-Go-like in it's event-driven nature, and it requires care as methods on
-values associated with the same underlying proton engine are not
-concurrent-safe.
-
-The `concurrent` API hides the event-driven details behind a simple blocking API
-that can be safely called from arbitrary goroutines. Under the covers data is
-passed through channels to dedicated goroutines running separate `proton` event
-loops for each connection.
-
-### Design of the concurrent API
-
-Code from the `proton` package runs _only_ in a dedicated goroutine (per
-connection). This makes it safe to use proton C data structures associated with
-that connection.
-
-Code in the `concurrent` package can run in any goroutine, and holds `proton`
-package values with proton object pointers. To use those values, it "injects" a
-function into the proton goroutine via a special channel. Injected functions
-can use temporary channels to allow the calling code to wait for results. Such
-waiting is only for the local event-loop, not across network calls.
-
-The API exposes blocking calls returning normal error values. The user can write
-simple blocking code or start their own goroutine loops and channels as
-appropriate. Details of our internal channel use and error handling are hidden,
-which simplifies the API and gives us more implementation flexibility.
-
-## New to Go?
-
-If you are new to Go then these are a good place to start:
-
-- [A Tour of Go](http://tour.golang.org)
-- [Effective Go](http://golang.org/doc/effective_go.html)
-
-Then look at the tools and library docs at <http://golang.org> as you need them.
diff --git a/proton-c/bindings/go/README.md b/proton-c/bindings/go/README.md
new file mode 120000
index 0000000..38521ba
--- /dev/null
+++ b/proton-c/bindings/go/README.md
@@ -0,0 +1 @@
+src/qpid.apache.org/README.md
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/README.md
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/README.md b/proton-c/bindings/go/src/qpid.apache.org/README.md
new file mode 100644
index 0000000..b99047d
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/README.md
@@ -0,0 +1,105 @@
+# Qpid Go Libraries for AMQP
+
+These packages provide [Go](http://golang.org) support for sending and receiving AMQP
+messages in client or server applications.
+
+Package documentation is available at: <http://godoc.org/qpid.apache.org/>
+
+See the [examples](https://github.com/apache/qpid-proton/blob/master/examples/go/README.md)
+for working examples and practical instructions on how to get started.
+
+Feedback is encouraged at:
+
+- Email <pr...@qpid.apache.org>
+- Create issues <https://issues.apache.org/jira/browse/PROTON>, attach patches to an issue.
+
+## Status
+
+There are 3 go packages for proton:
+
+`qpid.apache.org/electron`: procedural, concurrent-safe Go library for AMQP messaging.
+A simple procedural API that can easily be used with goroutines and channels to construct
+concurrent AMQP clients and servers.
+
+`qpid.apache.org/proton`: event-driven, concurrent-unsafe Go library for AMQP messaging.
+A simple port into Go of the Proton C library. Its event-driven, single-threaded nature
+may be off-putting for Go programmers, hence the electron API.
+
+`qpid.apache.org/amqp`: converts AMQP messages and data types to and from Go data types.
+Used by both the proton and electron packages to represent AMQP types.
+
+See the
+[examples](https://github.com/apache/qpid-proton/blob/master/examples/go/README.md)
+for an illustration of the APIs, in particular compare `proton/broker.go` and
+`electron/broker.go` which illustrate the different API approaches to the same
+task (a simple broker.)
+
+
+### Why two APIs?
+
+Go is a concurrent language and encourages applications to be divided into
+concurrent *goroutines*. It provides traditional locking but it encourages the
+use *channels* to communicate between goroutines without explicit locks:
+
+ "Share memory by communicating, don't communicate by sharing memory"
+
+The idea is that a given value is only operated on by one goroutine at a time,
+but values can easily be passed from one goroutine to another.
+
+Go literature distinguishes between:
+
+- *concurrency*: "keeping track of things that could be done in parallel"
+- *parallelism*: "actually doing things in parallel"
+
+The application expresses concurrency by starting goroutines for potentially
+concurrent tasks. The Go run-times schedule the activity of goroutines onto a
+small number (possibly one) of actual parallel executions.
+
+Even with *no* parallelism, concurrency lets the Go run-times *order* work with
+respect to events like file descriptors being readable/writable, channels having
+data, timers firing etc. Go automatically takes care of switching out goroutines
+that block or sleep so it is normal to write code in terms of blocking calls.
+
+Event-driven programming (such as poll, epoll, select or the `proton` package)
+also channels unpredictably ordered events to actions in one or a small pool of
+execution threads. However this requires a different style of programming:
+"event-driven" or "reactive" programming. Go developers call it "inside-out"
+programming. In an event-driven architecture blocking is a big problem as it
+consumes a scarce thread of execution, so actions that take time to complete
+have to be re-structured in terms of future event delivery.
+
+The promise of Go is that you can express your application in concurrent,
+procedural terms with simple blocking calls and the Go run-times will turn it
+inside-out for you. Write as many goroutines as you want, and let Go interleave
+and schedule them efficiently.
+
+For example: the Go equivalent of listening for connections is a goroutine with
+a simple endless loop that calls a blocking Listen() function and starts a
+goroutine for each new connection. Each connection has its own goroutine that
+deals with just that connection till it closes.
+
+The benefit is that the variables and logic live closer together. Once you're in
+a goroutine, you have everything you need in local variables, and they are
+preserved across blocking calls. There's no need to store details in context
+objects that you have to look up when handling a later event to figure out how
+to continue where you left off.
+
+The `proton` API is important because it is close to the original proton-C
+reactive API and gives you direct access to the underlying library. However it
+is un-Go-like in it's event-driven nature, and it requires care as methods on
+values associated with the same underlying proton engine are not
+concurrent-safe.
+
+The `electron` API hides the event-driven details behind a simple blocking API
+that can be safely called from arbitrary goroutines. Under the covers data is
+passed through channels to dedicated goroutines running separate `proton` event
+loops for each connection.
+
+## New to Go?
+
+If you are new to Go then these are a good place to start:
+
+- [A Tour of Go](http://tour.golang.org)
+- [Effective Go](http://golang.org/doc/effective_go.html)
+
+Then look at the tools and library docs at <http://golang.org> as you need them.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/amqp/doc.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/doc.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/doc.go
new file mode 100644
index 0000000..323c344
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/doc.go
@@ -0,0 +1,34 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/*
+Package amqp encodes and decodes AMQP messages and data types as Go types.
+
+It follows the standard 'encoding' libraries pattern. The mapping between AMQP
+and Go types is described in the documentation of the Marshal and Unmarshal
+functions.
+
+AMQP is an open standard for inter-operable message exchange, see <http://www.amqp.org/>
+*/
+package amqp
+
+// #cgo LDFLAGS: -lqpid-proton
+import "C"
+
+// This file is just for the package comment.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/amqp/error.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/error.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/error.go
new file mode 100644
index 0000000..868dbf3
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/error.go
@@ -0,0 +1,66 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package amqp
+
+import (
+ "fmt"
+ "reflect"
+)
+
+// Error is an AMQP error condition. It has a name and a description.
+// It implements the Go error interface so can be returned as an error value.
+//
+// You can pass amqp.Error to methods that pass an error to a remote endpoint,
+// this gives you full control over what the remote endpoint will see.
+//
+// You can also pass any Go error to such functions, the remote peer
+// will see the equivalent of MakeError(error)
+//
+type Error struct{ Name, Description string }
+
+// Error implements the Go error interface for AMQP error errors.
+func (c Error) Error() string { return fmt.Sprintf("proton %s: %s", c.Name, c.Description) }
+
+// Errorf makes a Error with name and formatted description as per fmt.Sprintf
+func Errorf(name, format string, arg ...interface{}) Error {
+ return Error{name, fmt.Sprintf(format, arg...)}
+}
+
+// MakeError makes an AMQP error from a go error using the Go error type as the name
+// and the err.Error() string as the description.
+func MakeError(err error) Error {
+ return Error{reflect.TypeOf(err).Name(), err.Error()}
+}
+
+var (
+ InternalError = "amqp:internal-error"
+ NotFound = "amqp:not-found"
+ UnauthorizedAccess = "amqp:unauthorized-access"
+ DecodeError = "amqp:decode-error"
+ ResourceLimit = "amqp:resource-limit"
+ NotAllowed = "amqp:not-allowed"
+ InvalidField = "amqp:invalid-field"
+ NotImplemented = "amqp:not-implemented"
+ ResourceLocked = "amqp:resource-locked"
+ PreerrorFailed = "amqp:preerror-failed"
+ ResourceDeleted = "amqp:resource-deleted"
+ IllegalState = "amqp:illegal-state"
+ FrameSizeTooSmall = "amqp:frame-size-too-small"
+)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/amqp/interop
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/interop b/proton-c/bindings/go/src/qpid.apache.org/amqp/interop
new file mode 120000
index 0000000..ad6fcad
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/interop
@@ -0,0 +1 @@
+../../../../../../tests/interop
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/amqp/interop_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/interop_test.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/interop_test.go
new file mode 100644
index 0000000..b36ef64
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/interop_test.go
@@ -0,0 +1,381 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+// Test that conversion of Go type to/from AMQP is compatible with other
+// bindings.
+//
+package amqp
+
+import (
+ "bytes"
+ "fmt"
+ "io"
+ "io/ioutil"
+ "os"
+ "reflect"
+ "strings"
+ "testing"
+)
+
+func checkEqual(want interface{}, got interface{}) error {
+ if !reflect.DeepEqual(want, got) {
+ return fmt.Errorf("%#v != %#v", want, got)
+ }
+ return nil
+}
+
+func getReader(name string) (r io.Reader) {
+ r, err := os.Open("interop/" + name + ".amqp")
+ if err != nil {
+ panic(fmt.Errorf("Can't open %#v: %v", name, err))
+ }
+ return
+}
+
+func remaining(d *Decoder) string {
+ remainder, _ := ioutil.ReadAll(io.MultiReader(d.Buffered(), d.reader))
+ return string(remainder)
+}
+
+// checkDecode: want is the expected value, gotPtr is a pointer to a
+// instance of the same type for Decode.
+func checkDecode(d *Decoder, want interface{}, gotPtr interface{}, t *testing.T) {
+
+ if err := d.Decode(gotPtr); err != nil {
+ t.Error("Decode failed", err)
+ return
+ }
+ got := reflect.ValueOf(gotPtr).Elem().Interface()
+ if err := checkEqual(want, got); err != nil {
+ t.Error("Decode bad value:", err)
+ return
+ }
+
+ // Try round trip encoding
+ bytes, err := Marshal(want, nil)
+ if err != nil {
+ t.Error("Marshal failed", err)
+ return
+ }
+ n, err := Unmarshal(bytes, gotPtr)
+ if err != nil {
+ t.Error("Unmarshal failed", err)
+ return
+ }
+ if err := checkEqual(n, len(bytes)); err != nil {
+ t.Error("Bad unmarshal length", err)
+ return
+ }
+ got = reflect.ValueOf(gotPtr).Elem().Interface()
+ if err = checkEqual(want, got); err != nil {
+ t.Error("Bad unmarshal value", err)
+ return
+ }
+}
+
+func TestUnmarshal(t *testing.T) {
+ bytes, err := ioutil.ReadAll(getReader("strings"))
+ if err != nil {
+ t.Error(err)
+ }
+ for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", "", ""} {
+ var got string
+ n, err := Unmarshal(bytes, &got)
+ if err != nil {
+ t.Error(err)
+ }
+ if want != got {
+ t.Errorf("%#v != %#v", want, got)
+ }
+ bytes = bytes[n:]
+ }
+}
+
+func TestPrimitivesExact(t *testing.T) {
+ d := NewDecoder(getReader("primitives"))
+ // Decoding into exact types
+ var b bool
+ checkDecode(d, true, &b, t)
+ checkDecode(d, false, &b, t)
+ var u8 uint8
+ checkDecode(d, uint8(42), &u8, t)
+ var u16 uint16
+ checkDecode(d, uint16(42), &u16, t)
+ var i16 int16
+ checkDecode(d, int16(-42), &i16, t)
+ var u32 uint32
+ checkDecode(d, uint32(12345), &u32, t)
+ var i32 int32
+ checkDecode(d, int32(-12345), &i32, t)
+ var u64 uint64
+ checkDecode(d, uint64(12345), &u64, t)
+ var i64 int64
+ checkDecode(d, int64(-12345), &i64, t)
+ var f32 float32
+ checkDecode(d, float32(0.125), &f32, t)
+ var f64 float64
+ checkDecode(d, float64(0.125), &f64, t)
+}
+
+func TestPrimitivesCompatible(t *testing.T) {
+ d := NewDecoder(getReader("primitives"))
+ // Decoding into compatible types
+ var b bool
+ var i int
+ var u uint
+ var f float64
+ checkDecode(d, true, &b, t)
+ checkDecode(d, false, &b, t)
+ checkDecode(d, uint(42), &u, t)
+ checkDecode(d, uint(42), &u, t)
+ checkDecode(d, -42, &i, t)
+ checkDecode(d, uint(12345), &u, t)
+ checkDecode(d, -12345, &i, t)
+ checkDecode(d, uint(12345), &u, t)
+ checkDecode(d, -12345, &i, t)
+ checkDecode(d, 0.125, &f, t)
+ checkDecode(d, 0.125, &f, t)
+}
+
+// checkDecodeValue: want is the expected value, decode into a reflect.Value
+func checkDecodeInterface(d *Decoder, want interface{}, t *testing.T) {
+
+ var got, got2 interface{}
+ if err := d.Decode(&got); err != nil {
+ t.Error("Decode failed", err)
+ return
+ }
+ if err := checkEqual(want, got); err != nil {
+ t.Error(err)
+ return
+ }
+ // Try round trip encoding
+ bytes, err := Marshal(got, nil)
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ n, err := Unmarshal(bytes, &got2)
+ if err != nil {
+ t.Error(err)
+ return
+ }
+ if err := checkEqual(n, len(bytes)); err != nil {
+ t.Error(err)
+ return
+ }
+ if err := checkEqual(want, got2); err != nil {
+ t.Error(err)
+ return
+ }
+}
+
+func TestPrimitivesInterface(t *testing.T) {
+ d := NewDecoder(getReader("primitives"))
+ checkDecodeInterface(d, true, t)
+ checkDecodeInterface(d, false, t)
+ checkDecodeInterface(d, uint8(42), t)
+ checkDecodeInterface(d, uint16(42), t)
+ checkDecodeInterface(d, int16(-42), t)
+ checkDecodeInterface(d, uint32(12345), t)
+ checkDecodeInterface(d, int32(-12345), t)
+ checkDecodeInterface(d, uint64(12345), t)
+ checkDecodeInterface(d, int64(-12345), t)
+ checkDecodeInterface(d, float32(0.125), t)
+ checkDecodeInterface(d, float64(0.125), t)
+}
+
+func TestStrings(t *testing.T) {
+ d := NewDecoder(getReader("strings"))
+ // Test decoding as plain Go strings
+ for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", "", ""} {
+ var got string
+ checkDecode(d, want, &got, t)
+ }
+ remains := remaining(d)
+ if remains != "" {
+ t.Errorf("leftover: %s", remains)
+ }
+
+ // Test decoding as specific string types
+ d = NewDecoder(getReader("strings"))
+ var bytes []byte
+ var str, sym string
+ checkDecode(d, []byte("abc\000defg"), &bytes, t)
+ checkDecode(d, "abcdefg", &str, t)
+ checkDecode(d, "abcdefg", &sym, t)
+ checkDecode(d, make([]byte, 0), &bytes, t)
+ checkDecode(d, "", &str, t)
+ checkDecode(d, "", &sym, t)
+ remains = remaining(d)
+ if remains != "" {
+ t.Fatalf("leftover: %s", remains)
+ }
+
+ // Test some error handling
+ d = NewDecoder(getReader("strings"))
+ var s string
+ err := d.Decode(s)
+ if err == nil {
+ t.Fatal("Expected error")
+ }
+ if !strings.Contains(err.Error(), "not a pointer") {
+ t.Error(err)
+ }
+ var i int
+ err = d.Decode(&i)
+ if !strings.Contains(err.Error(), "cannot unmarshal") {
+ t.Error(err)
+ }
+ _, err = Unmarshal([]byte{}, nil)
+ if !strings.Contains(err.Error(), "not enough data") {
+ t.Error(err)
+ }
+ _, err = Unmarshal([]byte("foobar"), nil)
+ if !strings.Contains(err.Error(), "invalid-argument") {
+ t.Error(err)
+ }
+}
+
+func TestEncodeDecode(t *testing.T) {
+ type data struct {
+ s string
+ i int
+ u8 uint8
+ b bool
+ f float32
+ v interface{}
+ }
+
+ in := data{"foo", 42, 9, true, 1.234, "thing"}
+
+ buf := bytes.Buffer{}
+ e := NewEncoder(&buf)
+ if err := e.Encode(in.s); err != nil {
+ t.Error(err)
+ }
+ if err := e.Encode(in.i); err != nil {
+ t.Error(err)
+ }
+ if err := e.Encode(in.u8); err != nil {
+ t.Error(err)
+ }
+ if err := e.Encode(in.b); err != nil {
+ t.Error(err)
+ }
+ if err := e.Encode(in.f); err != nil {
+ t.Error(err)
+ }
+ if err := e.Encode(in.v); err != nil {
+ t.Error(err)
+ }
+
+ var out data
+ d := NewDecoder(&buf)
+ if err := d.Decode(&out.s); err != nil {
+ t.Error(err)
+ }
+ if err := d.Decode(&out.i); err != nil {
+ t.Error(err)
+ }
+ if err := d.Decode(&out.u8); err != nil {
+ t.Error(err)
+ }
+ if err := d.Decode(&out.b); err != nil {
+ t.Error(err)
+ }
+ if err := d.Decode(&out.f); err != nil {
+ t.Error(err)
+ }
+ if err := d.Decode(&out.v); err != nil {
+ t.Error(err)
+ }
+
+ if err := checkEqual(in, out); err != nil {
+ t.Error(err)
+ }
+}
+
+func TestMap(t *testing.T) {
+ d := NewDecoder(getReader("maps"))
+
+ // Generic map
+ var m Map
+ checkDecode(d, Map{"one": int32(1), "two": int32(2), "three": int32(3)}, &m, t)
+
+ // Interface as map
+ var i interface{}
+ checkDecode(d, Map{int32(1): "one", int32(2): "two", int32(3): "three"}, &i, t)
+
+ d = NewDecoder(getReader("maps"))
+ // Specific typed map
+ var m2 map[string]int
+ checkDecode(d, map[string]int{"one": 1, "two": 2, "three": 3}, &m2, t)
+
+ // Nested map
+ m = Map{int64(1): "one", "two": int32(2), true: Map{uint8(1): true, uint8(2): false}}
+ bytes, err := Marshal(m, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ _, err = Unmarshal(bytes, &i)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if err = checkEqual(m, i); err != nil {
+ t.Fatal(err)
+ }
+}
+
+func TestList(t *testing.T) {
+ d := NewDecoder(getReader("lists"))
+ var l List
+ checkDecode(d, List{int32(32), "foo", true}, &l, t)
+ checkDecode(d, List{}, &l, t)
+}
+
+// TODO aconway 2015-09-08: the message.amqp file seems to be incorrectly coded as
+// as an AMQP string *inside* an AMQP binary?? Skip the test for now.
+func TODO_TestMessage(t *testing.T) {
+ bytes, err := ioutil.ReadAll(getReader("message"))
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ m, err := DecodeMessage(bytes)
+ if err != nil {
+ t.Fatal(err)
+ } else {
+ if err := checkEqual(m.Body(), "hello"); err != nil {
+ t.Error(err)
+ }
+ }
+
+ m2 := NewMessageWith("hello")
+ bytes2, err := m2.Encode(nil)
+ if err != nil {
+ t.Error(err)
+ } else {
+ if err = checkEqual(bytes, bytes2); err != nil {
+ t.Error(err)
+ }
+ }
+}
+
+// TODO aconway 2015-03-13: finish the full interop test
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go
new file mode 100644
index 0000000..666b4f6
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go
@@ -0,0 +1,250 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package amqp
+
+// #include <proton/codec.h>
+import "C"
+
+import (
+ "io"
+ "qpid.apache.org/internal"
+ "reflect"
+ "unsafe"
+)
+
+func dataError(prefix string, data *C.pn_data_t) error {
+ err := internal.PnError(unsafe.Pointer(C.pn_data_error(data)))
+ if err != nil {
+ err = internal.Errorf("%s: %s", prefix, err.(internal.Error))
+ }
+ return err
+}
+
+/*
+Marshal encodes a Go value as AMQP data in buffer.
+If buffer is nil, or is not large enough, a new buffer is created.
+
+Returns the buffer used for encoding with len() adjusted to the actual size of data.
+
+Go types are encoded as follows
+
+ +-------------------------------------+--------------------------------------------+
+ |Go type |AMQP type |
+ +-------------------------------------+--------------------------------------------+
+ |bool |bool |
+ +-------------------------------------+--------------------------------------------+
+ |int8, int16, int32, int64 (int) |byte, short, int, long (int or long) |
+ +-------------------------------------+--------------------------------------------+
+ |uint8, uint16, uint32, uint64 (uint) |ubyte, ushort, uint, ulong (uint or ulong) |
+ +-------------------------------------+--------------------------------------------+
+ |float32, float64 |float, double. |
+ +-------------------------------------+--------------------------------------------+
+ |string |string |
+ +-------------------------------------+--------------------------------------------+
+ |[]byte, Binary |binary |
+ +-------------------------------------+--------------------------------------------+
+ |Symbol |symbol |
+ +-------------------------------------+--------------------------------------------+
+ |interface{} |the contained type |
+ +-------------------------------------+--------------------------------------------+
+ |nil |null |
+ +-------------------------------------+--------------------------------------------+
+ |map[K]T |map with K and T converted as above |
+ +-------------------------------------+--------------------------------------------+
+ |Map |map, may have mixed types for keys, values |
+ +-------------------------------------+--------------------------------------------+
+ |[]T |list with T converted as above |
+ +-------------------------------------+--------------------------------------------+
+ |List |list, may have mixed types values |
+ +-------------------------------------+--------------------------------------------+
+
+The following Go types cannot be marshaled: uintptr, function, interface, channel
+
+TODO
+
+Go types: array, slice, struct, complex64/128.
+
+AMQP types: decimal32/64/128, char, timestamp, uuid, array, multi-section message bodies.
+
+Described types.
+
+*/
+func Marshal(v interface{}, buffer []byte) (outbuf []byte, err error) {
+ defer doRecover(&err)
+ data := C.pn_data(0)
+ defer C.pn_data_free(data)
+ marshal(v, data)
+ encode := func(buf []byte) ([]byte, error) {
+ n := int(C.pn_data_encode(data, cPtr(buf), cLen(buf)))
+ switch {
+ case n == int(C.PN_OVERFLOW):
+ return buf, overflow
+ case n < 0:
+ return buf, dataError("marshal error", data)
+ default:
+ return buf[:n], nil
+ }
+ }
+ return encodeGrow(buffer, encode)
+}
+
+const minEncode = 256
+
+// overflow is returned when an encoding function can't fit data in the buffer.
+var overflow = internal.Errorf("buffer too small")
+
+// encodeFn encodes into buffer[0:len(buffer)].
+// Returns buffer with length adjusted for data encoded.
+// If buffer too small, returns overflow as error.
+type encodeFn func(buffer []byte) ([]byte, error)
+
+// encodeGrow calls encode() into buffer, if it returns overflow grows the buffer.
+// Returns the final buffer.
+func encodeGrow(buffer []byte, encode encodeFn) ([]byte, error) {
+ if buffer == nil || len(buffer) == 0 {
+ buffer = make([]byte, minEncode)
+ }
+ var err error
+ for buffer, err = encode(buffer); err == overflow; buffer, err = encode(buffer) {
+ buffer = make([]byte, 2*len(buffer))
+ }
+ return buffer, err
+}
+
+func marshal(v interface{}, data *C.pn_data_t) {
+ switch v := v.(type) {
+ case nil:
+ C.pn_data_put_null(data)
+ case bool:
+ C.pn_data_put_bool(data, C.bool(v))
+ case int8:
+ C.pn_data_put_byte(data, C.int8_t(v))
+ case int16:
+ C.pn_data_put_short(data, C.int16_t(v))
+ case int32:
+ C.pn_data_put_int(data, C.int32_t(v))
+ case int64:
+ C.pn_data_put_long(data, C.int64_t(v))
+ case int:
+ if unsafe.Sizeof(0) == 8 {
+ C.pn_data_put_long(data, C.int64_t(v))
+ } else {
+ C.pn_data_put_int(data, C.int32_t(v))
+ }
+ case uint8:
+ C.pn_data_put_ubyte(data, C.uint8_t(v))
+ case uint16:
+ C.pn_data_put_ushort(data, C.uint16_t(v))
+ case uint32:
+ C.pn_data_put_uint(data, C.uint32_t(v))
+ case uint64:
+ C.pn_data_put_ulong(data, C.uint64_t(v))
+ case uint:
+ if unsafe.Sizeof(0) == 8 {
+ C.pn_data_put_ulong(data, C.uint64_t(v))
+ } else {
+ C.pn_data_put_uint(data, C.uint32_t(v))
+ }
+ case float32:
+ C.pn_data_put_float(data, C.float(v))
+ case float64:
+ C.pn_data_put_double(data, C.double(v))
+ case string:
+ C.pn_data_put_string(data, pnBytes([]byte(v)))
+ case []byte:
+ C.pn_data_put_binary(data, pnBytes(v))
+ case Binary:
+ C.pn_data_put_binary(data, pnBytes([]byte(v)))
+ case Symbol:
+ C.pn_data_put_symbol(data, pnBytes([]byte(v)))
+ case Map: // Special map type
+ C.pn_data_put_map(data)
+ C.pn_data_enter(data)
+ for key, val := range v {
+ marshal(key, data)
+ marshal(val, data)
+ }
+ C.pn_data_exit(data)
+ default:
+ switch reflect.TypeOf(v).Kind() {
+ case reflect.Map:
+ putMap(data, v)
+ case reflect.Slice:
+ putList(data, v)
+ default:
+ panic(internal.Errorf("cannot marshal %s to AMQP", reflect.TypeOf(v)))
+ }
+ }
+ err := dataError("marshal", data)
+ if err != nil {
+ panic(err)
+ }
+ return
+}
+
+func clearMarshal(v interface{}, data *C.pn_data_t) {
+ C.pn_data_clear(data)
+ marshal(v, data)
+}
+
+func putMap(data *C.pn_data_t, v interface{}) {
+ mapValue := reflect.ValueOf(v)
+ C.pn_data_put_map(data)
+ C.pn_data_enter(data)
+ for _, key := range mapValue.MapKeys() {
+ marshal(key.Interface(), data)
+ marshal(mapValue.MapIndex(key).Interface(), data)
+ }
+ C.pn_data_exit(data)
+}
+
+func putList(data *C.pn_data_t, v interface{}) {
+ listValue := reflect.ValueOf(v)
+ C.pn_data_put_list(data)
+ C.pn_data_enter(data)
+ for i := 0; i < listValue.Len(); i++ {
+ marshal(listValue.Index(i).Interface(), data)
+ }
+ C.pn_data_exit(data)
+}
+
+// Encoder encodes AMQP values to an io.Writer
+type Encoder struct {
+ writer io.Writer
+ buffer []byte
+}
+
+// New encoder returns a new encoder that writes to w.
+func NewEncoder(w io.Writer) *Encoder {
+ return &Encoder{w, make([]byte, minEncode)}
+}
+
+func (e *Encoder) Encode(v interface{}) (err error) {
+ e.buffer, err = Marshal(v, e.buffer)
+ if err == nil {
+ e.writer.Write(e.buffer)
+ }
+ return err
+}
+
+func replace(data *C.pn_data_t, v interface{}) {
+ C.pn_data_clear(data)
+ marshal(v, data)
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org