You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/01/03 22:03:01 UTC
[29/30] qpid-proton git commit: NO-JIRA: Go example error handling
and logging improvements
NO-JIRA: Go example error handling and logging improvements
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/4abf23af
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/4abf23af
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/4abf23af
Branch: refs/heads/go1
Commit: 4abf23afa65fea25efa5d56e11d58af9d1ec98bc
Parents: cd6ecc5
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Jan 3 16:52:44 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue Jan 3 16:52:44 2017 -0500
----------------------------------------------------------------------
examples/go/electron/broker.go | 23 ++++++++------------
examples/go/electron/receive.go | 12 +++++-----
examples/go/electron/send.go | 4 ++--
.../go/src/qpid.apache.org/electron/endpoint.go | 4 ++++
.../go/src/qpid.apache.org/proton/wrappers.go | 8 +++----
5 files changed, 26 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4abf23af/examples/go/electron/broker.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/broker.go b/examples/go/electron/broker.go
index d698838..2078d1e 100644
--- a/examples/go/electron/broker.go
+++ b/examples/go/electron/broker.go
@@ -55,7 +55,7 @@ func main() {
flag.Parse()
b := &broker{
queues: util.MakeQueues(*qsize),
- container: electron.NewContainer(fmt.Sprintf("broker[%s]", os.Getpid())),
+ container: electron.NewContainer(fmt.Sprintf("broker[%v]", os.Getpid())),
acks: make(chan electron.Outcome),
sent: make(chan sentMessage),
}
@@ -113,28 +113,23 @@ type connection struct {
// and start goroutines to service them.
func (c *connection) run() {
for in := range c.connection.Incoming() {
+ util.Debugf("incoming %v", in)
+
switch in := in.(type) {
case *electron.IncomingSender:
- if in.Source() == "" {
- in.Reject(fmt.Errorf("no source"))
- } else {
- go c.sender(in.Accept().(electron.Sender))
- }
+ s := in.Accept().(electron.Sender)
+ go c.sender(s)
case *electron.IncomingReceiver:
- if in.Target() == "" {
- in.Reject(fmt.Errorf("no target"))
- } else {
- in.SetPrefetch(true)
- in.SetCapacity(*credit) // Pre-fetch up to credit window.
- go c.receiver(in.Accept().(electron.Receiver))
- }
+ in.SetPrefetch(true)
+ in.SetCapacity(*credit) // Pre-fetch up to credit window.
+ r := in.Accept().(electron.Receiver)
+ go c.receiver(r)
default:
in.Accept() // Accept sessions unconditionally
}
- util.Debugf("incoming: %v", in)
}
util.Debugf("incoming closed: %v", c.connection)
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4abf23af/examples/go/electron/receive.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/receive.go b/examples/go/electron/receive.go
index 7a505d8..3bbe327 100644
--- a/examples/go/electron/receive.go
+++ b/examples/go/electron/receive.go
@@ -57,7 +57,7 @@ func main() {
var wait sync.WaitGroup // Used by main() to wait for all goroutines to end.
wait.Add(len(urls)) // Wait for one goroutine per URL.
- container := electron.NewContainer(fmt.Sprintf("receive[%s]", os.Getpid()))
+ container := electron.NewContainer(fmt.Sprintf("receive[%v]", os.Getpid()))
connections := make(chan electron.Connection, len(urls)) // Connections to close on exit
// Start a goroutine to for each URL to receive messages and send them to the messages channel.
@@ -81,12 +81,13 @@ func main() {
// Loop receiving messages and sending them to the main() goroutine
for {
- if rm, err := r.Receive(); err != nil {
- util.Debugf("closed %v: %v", urlStr, err)
- return
- } else {
+ if rm, err := r.Receive(); err == nil {
rm.Accept()
messages <- rm.Message
+ } else if err == electron.Closed {
+ return
+ } else {
+ log.Fatalf("receive error %v: %v", urlStr, err)
}
}
}(urlStr)
@@ -103,6 +104,7 @@ func main() {
fmt.Printf("Received %d messages\n", *count)
// Close all connections, this will interrupt goroutines blocked in Receiver.Receive()
+ // with electron.Closed.
for i := 0; i < len(urls); i++ {
c := <-connections
util.Debugf("close %s", c)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4abf23af/examples/go/electron/send.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/send.go b/examples/go/electron/send.go
index 4ea93ec..68b8b2e 100644
--- a/examples/go/electron/send.go
+++ b/examples/go/electron/send.go
@@ -56,7 +56,7 @@ func main() {
var wait sync.WaitGroup
wait.Add(len(urls)) // Wait for one goroutine per URL.
- container := electron.NewContainer(fmt.Sprintf("send[%s]", os.Getpid()))
+ container := electron.NewContainer(fmt.Sprintf("send[%v]", os.Getpid()))
connections := make(chan electron.Connection, len(urls)) // Connctions to close on exit
// Start a goroutine for each URL to send messages.
@@ -93,7 +93,7 @@ func main() {
for i := 0; i < expect; i++ {
out := <-sentChan // Outcome of async sends.
if out.Error != nil {
- util.Debugf("acknowledgement[%v] %v error: %v\n", i, out.Value, out.Error)
+ log.Fatalf("acknowledgement[%v] %v error: %v\n", i, out.Value, out.Error)
} else {
util.Debugf("acknowledgement[%v] %v (%v)\n", i, out.Value, out.Status)
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4abf23af/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go b/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
index ca93e5b..a6ce6c8 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
@@ -29,6 +29,10 @@ import (
// was closed cleanly.
var Closed = io.EOF
+// EOF is an alias for io.EOF. It is returned as an error when an endpoint
+// was closed cleanly.
+var EOF = io.EOF
+
// Endpoint is the local end of a communications channel to the remote peer
// process. The following interface implement Endpoint: Connection, Session,
// Sender and Receiver.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4abf23af/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
index 4d55e4f..b6386b8 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
@@ -169,14 +169,14 @@ type Endpoint interface {
RemoteCondition() Condition
// Human readable name
String() string
- // Human readable endpoint type "link", "session" etc.
+ // Human readable endpoint type "sender-link", "session" etc.
Type() string
}
// CloseError sets an error condition (if err != nil) on an endpoint and closes
// the endpoint if not already closed
func CloseError(e Endpoint, err error) {
- if err != nil {
+ if err != nil && !e.Condition().IsSet() {
e.Condition().SetError(err)
}
e.Close()
@@ -262,9 +262,9 @@ func (l Link) String() string {
func (l Link) Type() string {
if l.IsSender() {
- return "link(sender)"
+ return "sender-link"
} else {
- return "link(receiver)"
+ return "receiver-link"
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org