You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/01/18 01:16:23 UTC
[07/10] qpid-proton git commit: PROTON-1390: Get rid of relative
../util.go import, simplify examples.
PROTON-1390: Get rid of relative ../util.go import, simplify examples.
gccgo does not support relative imports, simplify the examples to remove the need
for a common library.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/0f156d72
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/0f156d72
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/0f156d72
Branch: refs/heads/go1
Commit: 0f156d721f56d265d1d36814f1334fd959ab0d09
Parents: 9fc393a
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Jan 17 19:12:55 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue Jan 17 19:42:54 2017 -0500
----------------------------------------------------------------------
examples/go/electron/broker.go | 70 ++++++++++++++++++++++++++++--------
examples/go/electron/receive.go | 56 +++++++++++++++--------------
examples/go/electron/send.go | 51 +++++++++++++-------------
examples/go/proton/broker.go | 69 +++++++++++++++++++++++++++--------
examples/go/util/queue.go | 61 -------------------------------
examples/go/util/util.go | 68 -----------------------------------
6 files changed, 167 insertions(+), 208 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0f156d72/examples/go/electron/broker.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/broker.go b/examples/go/electron/broker.go
index 2078d1e..9228195 100644
--- a/examples/go/electron/broker.go
+++ b/examples/go/electron/broker.go
@@ -27,7 +27,6 @@ under the License.
package main
import (
- "../util"
"flag"
"fmt"
"log"
@@ -35,6 +34,7 @@ import (
"os"
"qpid.apache.org/amqp"
"qpid.apache.org/electron"
+ "sync"
)
// Usage and command-line flags
@@ -49,12 +49,19 @@ A simple broker-like demo. Queues are created automatically for sender or receiv
var addr = flag.String("addr", ":amqp", "Listening address")
var credit = flag.Int("credit", 100, "Receiver credit window")
var qsize = flag.Int("qsize", 1000, "Max queue size")
+var debug = flag.Bool("debug", false, "Print detailed debug output")
+var debugf = func(format string, data ...interface{}) {} // Default no debugging output
func main() {
flag.Usage = usage
flag.Parse()
+
+ if *debug {
+ debugf = func(format string, data ...interface{}) { log.Printf(format, data...) }
+ }
+
b := &broker{
- queues: util.MakeQueues(*qsize),
+ queues: makeQueues(*qsize),
container: electron.NewContainer(fmt.Sprintf("broker[%v]", os.Getpid())),
acks: make(chan electron.Outcome),
sent: make(chan sentMessage),
@@ -66,7 +73,7 @@ func main() {
// State for the broker
type broker struct {
- queues util.Queues // A collection of queues.
+ queues queues // A collection of queues.
container electron.Container // electron.Container manages AMQP connections.
sent chan sentMessage // Channel to record sent messages.
acks chan electron.Outcome // Channel to receive the Outcome of sent messages.
@@ -76,7 +83,7 @@ type broker struct {
// If a message is rejected or not acknowledged due to a failure, we will put it back on the queue.
type sentMessage struct {
m amqp.Message
- q util.Queue
+ q queue
}
// run listens for incoming net.Conn connections and starts an electron.Connection for each one.
@@ -94,12 +101,12 @@ func (b *broker) run() error {
for {
c, err := b.container.Accept(listener)
if err != nil {
- util.Debugf("Accept error: %v", err)
+ debugf("Accept error: %v", err)
continue
}
cc := &connection{b, c}
go cc.run() // Handle the connection
- util.Debugf("Accepted %v", c)
+ debugf("Accepted %v", c)
}
}
@@ -113,7 +120,7 @@ type connection struct {
// and start goroutines to service them.
func (c *connection) run() {
for in := range c.connection.Incoming() {
- util.Debugf("incoming %v", in)
+ debugf("incoming %v", in)
switch in := in.(type) {
@@ -131,7 +138,7 @@ func (c *connection) run() {
in.Accept() // Accept sessions unconditionally
}
}
- util.Debugf("incoming closed: %v", c.connection)
+ debugf("incoming closed: %v", c.connection)
}
// receiver receives messages and pushes to a queue.
@@ -139,11 +146,11 @@ func (c *connection) receiver(receiver electron.Receiver) {
q := c.broker.queues.Get(receiver.Target())
for {
if rm, err := receiver.Receive(); err == nil {
- util.Debugf("%v: received %v", receiver, util.FormatMessage(rm.Message))
+ debugf("%v: received %v %#v", receiver, rm.Message)
q <- rm.Message
rm.Accept()
} else {
- util.Debugf("%v error: %v", receiver, err)
+ debugf("%v error: %v", receiver, err)
break
}
}
@@ -154,13 +161,13 @@ func (c *connection) sender(sender electron.Sender) {
q := c.broker.queues.Get(sender.Source())
for {
if sender.Error() != nil {
- util.Debugf("%v closed: %v", sender, sender.Error())
+ debugf("%v closed: %v", sender, sender.Error())
return
}
select {
case m := <-q:
- util.Debugf("%v: sent %v", sender, util.FormatMessage(m))
+ debugf("%v: sent %#v", sender, m)
sm := sentMessage{m, q}
c.broker.sent <- sm // Record sent message
sender.SendAsync(m, c.broker.acks, sm) // Receive outcome on c.broker.acks with Value sm
@@ -191,9 +198,44 @@ func (b *broker) acknowledgements() {
delete(sentMap, sm)
if outcome.Status != electron.Accepted { // Error, release or rejection
sm.q.PutBack(sm.m) // Put the message back on the queue.
- util.Debugf("message %v put back, status %v, error %v",
- util.FormatMessage(sm.m), outcome.Status, outcome.Error)
+ debugf("message %#v put back, status %v, error %v", sm.m, outcome.Status, outcome.Error)
}
}
}
}
+
+// Use a buffered channel as a very simple queue.
+type queue chan amqp.Message
+
+// Put a message back on the queue, does not block.
+func (q queue) PutBack(m amqp.Message) {
+ select {
+ case q <- m:
+ default:
+ // Not an efficient implementation but ensures we don't block the caller.
+ go func() { q <- m }()
+ }
+}
+
+// Concurrent-safe map of queues.
+type queues struct {
+ queueSize int
+ m map[string]queue
+ lock sync.Mutex
+}
+
+func makeQueues(queueSize int) queues {
+ return queues{queueSize: queueSize, m: make(map[string]queue)}
+}
+
+// Create a queue if not found.
+func (qs *queues) Get(name string) queue {
+ qs.lock.Lock()
+ defer qs.lock.Unlock()
+ q := qs.m[name]
+ if q == nil {
+ q = make(queue, qs.queueSize)
+ qs.m[name] = q
+ }
+ return q
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0f156d72/examples/go/electron/receive.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/receive.go b/examples/go/electron/receive.go
index 3bbe327..161e911 100644
--- a/examples/go/electron/receive.go
+++ b/examples/go/electron/receive.go
@@ -20,7 +20,6 @@ under the License.
package main
import (
- "../util"
"flag"
"fmt"
"log"
@@ -39,11 +38,16 @@ Receive messages from all the listed URLs concurrently and print them.
}
var count = flag.Uint64("count", 1, "Stop after receiving this many messages.")
+var debug = flag.Bool("debug", false, "Print detailed debug output")
+var debugf = func(format string, data ...interface{}) {} // Default no debugging output
func main() {
flag.Usage = usage
flag.Parse()
+ if *debug {
+ debugf = func(format string, data ...interface{}) { log.Printf(format, data...) }
+ }
urls := flag.Args() // Non-flag arguments are URLs to receive from
if len(urls) == 0 {
log.Println("No URL provided")
@@ -63,33 +67,31 @@ func main() {
// Start a goroutine to for each URL to receive messages and send them to the messages channel.
// main() receives and prints them.
for _, urlStr := range urls {
- util.Debugf("Connecting to %s\n", urlStr)
+ debugf("Connecting to %s\n", urlStr)
go func(urlStr string) { // Start the goroutine
-
- defer wait.Done() // Notify main() when this goroutine is done.
- url, err := amqp.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults.
- util.ExitIf(err)
-
- // Open a new connection
- c, err := container.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port"
- util.ExitIf(err)
- connections <- c // Save connection so we can Close() when main() ends
-
- // Create a Receiver using the path of the URL as the source address
- r, err := c.Receiver(electron.Source(url.Path))
- util.ExitIf(err)
-
- // Loop receiving messages and sending them to the main() goroutine
- for {
- if rm, err := r.Receive(); err == nil {
- rm.Accept()
- messages <- rm.Message
- } else if err == electron.Closed {
- return
- } else {
- log.Fatalf("receive error %v: %v", urlStr, err)
+ defer wait.Done() // Notify main() when this goroutine is done.
+ var err error
+ if url, err := amqp.ParseURL(urlStr); err == nil {
+ if c, err := container.Dial("tcp", url.Host); err == nil {
+ connections <- c // Save connection so we can Close() when main() ends
+ if r, err := c.Receiver(electron.Source(url.Path)); err == nil {
+ // Loop receiving messages and sending them to the main() goroutine
+ for {
+ if rm, err := r.Receive(); err == nil {
+ rm.Accept()
+ messages <- rm.Message
+ } else if err == electron.Closed {
+ return
+ } else {
+ log.Fatal("receive error %v: %v", urlStr, err)
+ }
+ }
+ }
}
}
+ if err != nil {
+ log.Fatal(err)
+ }
}(urlStr)
}
@@ -99,7 +101,7 @@ func main() {
// print each message until the count is exceeded.
for i := uint64(0); i < *count; i++ {
m := <-messages
- util.Debugf("%s\n", util.FormatMessage(m))
+ debugf("%#v\n", m)
}
fmt.Printf("Received %d messages\n", *count)
@@ -107,7 +109,7 @@ func main() {
// with electron.Closed.
for i := 0; i < len(urls); i++ {
c := <-connections
- util.Debugf("close %s", c)
+ debugf("close %s", c)
c.Close(nil)
}
wait.Wait() // Wait for all goroutines to finish.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0f156d72/examples/go/electron/send.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/send.go b/examples/go/electron/send.go
index 68b8b2e..9ab5f1c 100644
--- a/examples/go/electron/send.go
+++ b/examples/go/electron/send.go
@@ -20,7 +20,6 @@ under the License.
package main
import (
- "../util"
"flag"
"fmt"
"log"
@@ -39,11 +38,17 @@ Send messages to each URL concurrently with body "<url-path>-<n>" where n is the
}
var count = flag.Int64("count", 1, "Send this may messages per address.")
+var debug = flag.Bool("debug", false, "Print detailed debug output")
+var Debugf = func(format string, data ...interface{}) {} // Default no debugging output
func main() {
flag.Usage = usage
flag.Parse()
+ if *debug {
+ Debugf = func(format string, data ...interface{}) { log.Printf(format, data...) }
+ }
+
urls := flag.Args() // Non-flag arguments are URLs to receive from
if len(urls) == 0 {
log.Println("No URL provided")
@@ -61,41 +66,39 @@ func main() {
// Start a goroutine for each URL to send messages.
for _, urlStr := range urls {
- util.Debugf("Connecting to %v\n", urlStr)
+ Debugf("Connecting to %v\n", urlStr)
go func(urlStr string) {
-
- defer wait.Done() // Notify main() that this goroutine is done.
- url, err := amqp.ParseURL(urlStr) // Like net/url.Parse() but with AMQP defaults.
- util.ExitIf(err)
-
- // Open a new connection
- c, err := container.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port"
- util.ExitIf(err)
- connections <- c // Save connection so we can Close() when main() ends
-
- // Create a Sender using the path of the URL as the AMQP address
- s, err := c.Sender(electron.Target(url.Path))
- util.ExitIf(err)
-
- // Loop sending messages.
- for i := int64(0); i < *count; i++ {
- m := amqp.NewMessage()
- body := fmt.Sprintf("%v-%v", url.Path, i)
- m.Marshal(body)
- s.SendAsync(m, sentChan, body) // Outcome will be sent to sentChan
+ defer wait.Done() // Notify main() when this goroutine is done.
+ var err error
+ if url, err := amqp.ParseURL(urlStr); err == nil {
+ if c, err := container.Dial("tcp", url.Host); err == nil {
+ connections <- c // Save connection so we can Close() when main() ends
+ if s, err := c.Sender(electron.Target(url.Path)); err == nil {
+ // Loop sending messages.
+ for i := int64(0); i < *count; i++ {
+ m := amqp.NewMessage()
+ body := fmt.Sprintf("%v-%v", url.Path, i)
+ m.Marshal(body)
+ s.SendAsync(m, sentChan, body) // Outcome will be sent to sentChan
+ }
+ }
+ }
+ }
+ if err != nil {
+ log.Fatal(err)
}
}(urlStr)
}
// Wait for all the acknowledgements
expect := int(*count) * len(urls)
- util.Debugf("Started senders, expect %v acknowledgements\n", expect)
+ Debugf("Started senders, expect %v acknowledgements\n", expect)
for i := 0; i < expect; i++ {
out := <-sentChan // Outcome of async sends.
if out.Error != nil {
log.Fatalf("acknowledgement[%v] %v error: %v\n", i, out.Value, out.Error)
} else {
- util.Debugf("acknowledgement[%v] %v (%v)\n", i, out.Value, out.Status)
+ Debugf("acknowledgement[%v] %v (%v)\n", i, out.Value, out.Status)
}
}
fmt.Printf("Received all %v acknowledgements\n", expect)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0f156d72/examples/go/proton/broker.go
----------------------------------------------------------------------
diff --git a/examples/go/proton/broker.go b/examples/go/proton/broker.go
index 8f0efda..8291ca9 100644
--- a/examples/go/proton/broker.go
+++ b/examples/go/proton/broker.go
@@ -30,7 +30,6 @@ under the License.
package main
import (
- "../util"
"flag"
"fmt"
"log"
@@ -38,6 +37,7 @@ import (
"os"
"qpid.apache.org/amqp"
"qpid.apache.org/proton"
+ "sync"
)
// Usage and command-line flags
@@ -52,11 +52,16 @@ A simple broker-like demo. Queues are created automatically for sender or receiv
var addr = flag.String("addr", ":amqp", "Listening address")
var credit = flag.Int("credit", 100, "Receiver credit window")
var qsize = flag.Int("qsize", 1000, "Max queue size")
+var debug = flag.Bool("debug", false, "Print detailed debug output")
+var debugf = func(format string, data ...interface{}) {} // Default no debugging output
func main() {
flag.Usage = usage
flag.Parse()
- b := &broker{util.MakeQueues(*qsize)}
+ if *debug {
+ debugf = func(format string, data ...interface{}) { log.Printf(format, data...) }
+ }
+ b := &broker{makeQueues(*qsize)}
if err := b.run(); err != nil {
log.Fatal(err)
}
@@ -64,7 +69,7 @@ func main() {
// State for the broker
type broker struct {
- queues util.Queues
+ queues queues
}
// Listens for connections and starts a proton.Engine for each one.
@@ -78,7 +83,7 @@ func (b *broker) run() error {
for {
conn, err := listener.Accept()
if err != nil {
- util.Debugf("Accept error: %v", err)
+ debugf("Accept error: %v", err)
continue
}
adapter := proton.NewMessagingAdapter(newHandler(&b.queues))
@@ -88,14 +93,14 @@ func (b *broker) run() error {
adapter.AutoAccept = false
engine, err := proton.NewEngine(conn, adapter)
if err != nil {
- util.Debugf("Connection error: %v", err)
+ debugf("Connection error: %v", err)
continue
}
engine.Server() // Enable server-side protocol negotiation.
- util.Debugf("Accepted connection %s", engine)
+ debugf("Accepted connection %s", engine)
go func() { // Start goroutine to run the engine event loop
engine.Run()
- util.Debugf("Closed %s", engine)
+ debugf("Closed %s", engine)
}()
}
}
@@ -105,13 +110,13 @@ func (b *broker) run() error {
// all calls to the handler. We use channels to communicate between the handler
// goroutine and other goroutines sending and receiving messages.
type handler struct {
- queues *util.Queues
+ queues *queues
receivers map[proton.Link]*receiver
senders map[proton.Link]*sender
injecter proton.Injecter
}
-func newHandler(queues *util.Queues) *handler {
+func newHandler(queues *queues) *handler {
return &handler{
queues: queues,
receivers: make(map[proton.Link]*receiver),
@@ -156,7 +161,7 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
}
// This will not block as AMQP credit is set to the buffer capacity.
r.buffer <- receivedMessage{e.Delivery(), m}
- util.Debugf("link %s received %s", e.Link(), util.FormatMessage(m))
+ debugf("link %s received %#v", e.Link(), m)
case proton.MConnectionClosed, proton.MDisconnected:
for l, _ := range h.receivers {
@@ -187,11 +192,11 @@ func (h *handler) linkClosed(l proton.Link, err error) {
// channels.
type link struct {
l proton.Link
- q util.Queue
+ q queue
h *handler
}
-func makeLink(l proton.Link, q util.Queue, h *handler) link {
+func makeLink(l proton.Link, q queue, h *handler) link {
lnk := link{l: l, q: q, h: h}
return lnk
}
@@ -280,7 +285,7 @@ func (s *sender) sendable() {
// run runs in a separate goroutine. It monitors the queue for messages and injects
// a function to send them when there is credit
func (s *sender) run() {
- var q util.Queue // q is nil initially as we have no credit.
+ var q queue // q is nil initially as we have no credit.
for {
select {
case _, ok := <-s.credit:
@@ -323,9 +328,45 @@ func (s *sender) sendOne(m amqp.Message) error {
delivery, err := s.l.Send(m)
if err == nil {
delivery.Settle() // Pre-settled, unreliable.
- util.Debugf("link %s sent %s", s.l, util.FormatMessage(m))
+ debugf("link %s sent %#v", s.l, m)
} else {
s.q.PutBack(m) // Put the message back on the queue, don't block
}
return err
}
+
+// Use a buffered channel as a very simple queue.
+type queue chan amqp.Message
+
+// Put a message back on the queue, does not block.
+func (q queue) PutBack(m amqp.Message) {
+ select {
+ case q <- m:
+ default:
+ // Not an efficient implementation but ensures we don't block the caller.
+ go func() { q <- m }()
+ }
+}
+
+// Concurrent-safe map of queues.
+type queues struct {
+ queueSize int
+ m map[string]queue
+ lock sync.Mutex
+}
+
+func makeQueues(queueSize int) queues {
+ return queues{queueSize: queueSize, m: make(map[string]queue)}
+}
+
+// Create a queue if not found.
+func (qs *queues) Get(name string) queue {
+ qs.lock.Lock()
+ defer qs.lock.Unlock()
+ q := qs.m[name]
+ if q == nil {
+ q = make(queue, qs.queueSize)
+ qs.m[name] = q
+ }
+ return q
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0f156d72/examples/go/util/queue.go
----------------------------------------------------------------------
diff --git a/examples/go/util/queue.go b/examples/go/util/queue.go
deleted file mode 100644
index 2eaba72..0000000
--- a/examples/go/util/queue.go
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package util
-
-import (
- "qpid.apache.org/amqp"
- "sync"
-)
-
-// Use a buffered channel as a very simple queue.
-type Queue chan amqp.Message
-
-// Put a message back on the queue, does not block.
-func (q Queue) PutBack(m amqp.Message) {
- select {
- case q <- m:
- default:
- // Not an efficient implementation but ensures we don't block the caller.
- go func() { q <- m }()
- }
-}
-
-// Concurrent-safe map of queues.
-type Queues struct {
- queueSize int
- m map[string]Queue
- lock sync.Mutex
-}
-
-func MakeQueues(queueSize int) Queues {
- return Queues{queueSize: queueSize, m: make(map[string]Queue)}
-}
-
-// Create a queue if not found.
-func (qs *Queues) Get(name string) Queue {
- qs.lock.Lock()
- defer qs.lock.Unlock()
- q := qs.m[name]
- if q == nil {
- q = make(Queue, qs.queueSize)
- qs.m[name] = q
- }
- return q
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0f156d72/examples/go/util/util.go
----------------------------------------------------------------------
diff --git a/examples/go/util/util.go b/examples/go/util/util.go
deleted file mode 100644
index 20f2192..0000000
--- a/examples/go/util/util.go
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-// util contains utility types and functions to simplify parts of the example
-// code that are not related to the use of proton.
-package util
-
-import (
- "flag"
- "fmt"
- "log"
- "os"
- "path"
- "qpid.apache.org/amqp"
-)
-
-// Debug flag "-debug" enables debug output with Debugf
-var Debug = flag.Bool("debug", false, "Print detailed debug output")
-
-// Full flag "-full" enables full message output by FormatMessage
-var Full = flag.Bool("full", false, "Print full message not just body.")
-
-// Debugf logs debug messages if "-debug" flag is set.
-func Debugf(format string, data ...interface{}) {
- if *Debug {
- log.Printf(format, data...)
- }
-}
-
-// Simple error handling for demo.
-func ExitIf(err error) {
- if err != nil {
- log.Fatal(err)
- }
-}
-
-// FormatMessage formats a message as a string, just the body by default or
-// the full message (with properties etc.) if "-full" flag is set.
-func FormatMessage(m amqp.Message) string {
- if *Full {
- return fmt.Sprintf("%#v", m)
- } else {
- return fmt.Sprintf("%#v", m.Body())
- }
-}
-
-// For example programs, use the program name as the log prefix.
-func init() {
- log.SetFlags(0)
- _, prog := path.Split(os.Args[0])
- log.SetPrefix(fmt.Sprintf("%s(%d): ", prog, os.Getpid()))
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org