You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/10/19 12:38:02 UTC
qpid-proton git commit: NO-JIRA: [go] Improved client/server example,
minor doc updates
Repository: qpid-proton
Updated Branches:
refs/heads/master dbda49bb2 -> 97815c342
NO-JIRA: [go] Improved client/server example, minor doc updates
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/97815c34
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/97815c34
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/97815c34
Branch: refs/heads/master
Commit: 97815c342c68c4ee2f9919ccd20e8493296b1743
Parents: dbda49b
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Oct 18 16:58:47 2017 +0100
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 19 13:37:27 2017 +0100
----------------------------------------------------------------------
examples/go/electron/receive.go | 2 +-
examples/go/electron/send.go | 2 +-
.../src/qpid.apache.org/electron/connection.go | 24 ++++--
.../src/qpid.apache.org/electron/container.go | 3 +-
.../electron/ex_client_server_test.go | 81 -------------------
.../electron/example_client_server_test.go | 85 ++++++++++++++++++++
6 files changed, 105 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/97815c34/examples/go/electron/receive.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/receive.go b/examples/go/electron/receive.go
index 93046b3..568fcc9 100644
--- a/examples/go/electron/receive.go
+++ b/examples/go/electron/receive.go
@@ -74,7 +74,7 @@ func main() {
defer wait.Done() // Notify main() when this goroutine is done.
url, err := amqp.ParseURL(urlStr)
fatalIf(err)
- c, err := container.Dial("tcp", url.Host)
+ c, err := container.Dial("tcp", url.Host) // NOTE: Dial takes just the Host part of the URL
fatalIf(err)
connections <- c // Save connection so we can Close() when main() ends
addr := strings.TrimPrefix(url.Path, "/")
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/97815c34/examples/go/electron/send.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/send.go b/examples/go/electron/send.go
index f96400b..ac4e367 100644
--- a/examples/go/electron/send.go
+++ b/examples/go/electron/send.go
@@ -73,7 +73,7 @@ func main() {
defer wait.Done() // Notify main() when this goroutine is done.
url, err := amqp.ParseURL(urlStr)
fatalIf(err)
- c, err := container.Dial("tcp", url.Host)
+ c, err := container.Dial("tcp", url.Host) // NOTE: Dial takes just the Host part of the URL
fatalIf(err)
connections <- c // Save connection so we can Close() when main() ends
addr := strings.TrimPrefix(url.Path, "/")
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/97815c34/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
index 8f62491..267ee1e 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
@@ -83,11 +83,17 @@ type Connection interface {
WaitTimeout(time.Duration) error
// Incoming returns a channel for incoming endpoints opened by the remote peer.
- // See the Incoming interface for more.
+ // See the Incoming interface for more detail.
//
- // Not receiving from Incoming() and calling Accept/Reject will block the
- // electron event loop. You should run a loop to handle the types that
- // interest you in a switch{} and and Accept() all others.
+ // Note: this channel will first return an *IncomingConnection for the
+ // connection itself which allows you to look at security information and
+ // decide whether to Accept() or Reject() the connection. Then it will return
+ // *IncomingSession, *IncomingSender and *IncomingReceiver as they are opened
+ // by the remote end.
+ //
+ // Note 2: you must receiving from Incoming() and call Accept/Reject to avoid
+ // blocking electron event loop. Normally you would run a loop in a goroutine
+ // to handle incoming types that interest and Accept() those that don't.
Incoming() <-chan Incoming
}
@@ -387,8 +393,9 @@ func globalSASLInit(eng *proton.Engine) {
}
// Dial is shorthand for using net.Dial() then NewConnection()
-func Dial(network, addr string, opts ...ConnectionOption) (c Connection, err error) {
- conn, err := net.Dial(network, addr)
+// See net.Dial() for the meaning of the network, address arguments.
+func Dial(network, address string, opts ...ConnectionOption) (c Connection, err error) {
+ conn, err := net.Dial(network, address)
if err == nil {
c, err = NewConnection(conn, opts...)
}
@@ -396,8 +403,9 @@ func Dial(network, addr string, opts ...ConnectionOption) (c Connection, err err
}
// DialWithDialer is shorthand for using dialer.Dial() then NewConnection()
-func DialWithDialer(dialer *net.Dialer, network, addr string, opts ...ConnectionOption) (c Connection, err error) {
- conn, err := dialer.Dial(network, addr)
+// See net.Dial() for the meaning of the network, address arguments.
+func DialWithDialer(dialer *net.Dialer, network, address string, opts ...ConnectionOption) (c Connection, err error) {
+ conn, err := dialer.Dial(network, address)
if err == nil {
c, err = NewConnection(conn, opts...)
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/97815c34/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/container.go b/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
index efb24ff..7c19aa5 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
@@ -43,7 +43,8 @@ type Container interface {
// Dial is shorthand for
// conn, err := net.Dial(); c, err := Connection(conn, opts...)
- Dial(network string, addr string, opts ...ConnectionOption) (Connection, error)
+ // See net.Dial() for the meaning of the network, address arguments.
+ Dial(network string, address string, opts ...ConnectionOption) (Connection, error)
// Accept is shorthand for:
// conn, err := l.Accept(); c, err := Connection(conn, append(opts, Server()...)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/97815c34/proton-c/bindings/go/src/qpid.apache.org/electron/ex_client_server_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/ex_client_server_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/ex_client_server_test.go
deleted file mode 100644
index 93f275b..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/ex_client_server_test.go
+++ /dev/null
@@ -1,81 +0,0 @@
-package electron_test
-
-import (
- "fmt"
- "net"
- "qpid.apache.org/amqp"
- "qpid.apache.org/electron"
-)
-
-// Print errors
-func check(msg string, err error) bool {
- if err != nil {
- fmt.Printf("%s: %s\n", msg, err)
- }
- return err == nil
-}
-
-func runServer(cont electron.Container, l net.Listener) {
- for c, err := cont.Accept(l); check("accept connection", err); c, err = cont.Accept(l) {
- go func() { // Process connections concurrently, accepting AMQP endpoints
- for in := range c.Incoming() {
- ep := in.Accept() // Accept all endpoints
- go func() { // Process endpoints concurrently
- switch ep := ep.(type) {
- case electron.Sender:
- m := amqp.NewMessageWith("hello yourself")
- fmt.Printf("server %q sending %q\n", ep.Source(), m.Body())
- ep.SendForget(m) // One-way send, client does not need to Accept.
- case electron.Receiver:
- if rm, err := ep.Receive(); check("server receive", err) {
- fmt.Printf("server %q received %q\n", ep.Target(), rm.Message.Body())
- err := rm.Accept() // Client is waiting for Accept.
- check("accept message", err)
- }
- }
- }()
- }
- }()
- }
-}
-
-func startServer() (addr net.Addr) {
- cont := electron.NewContainer("server")
- if l, err := net.Listen("tcp", ""); check("listen", err) {
- addr = l.Addr()
- go runServer(cont, l)
- }
- return addr
-}
-
-// Connect to addr and send/receive a message.
-func client(addr net.Addr) {
- if c, err := electron.Dial(addr.Network(), addr.String()); check("dial", err) {
- defer c.Close(nil)
- if s, err := c.Sender(electron.Target("target")); check("sender", err) {
- fmt.Printf("client sending\n")
- s.SendSync(amqp.NewMessageWith("hello")) // Send and wait for server to Accept()
- }
- if r, err := c.Receiver(electron.Source("source")); check("receiver", err) {
- if rm, err := r.Receive(); err == nil {
- fmt.Printf("client received %q\n", rm.Message.Body())
- }
- }
- }
-}
-
-// Example client and server communicating via AMQP over a TCP/IP connection.
-//
-// Normally client and server would be separate processes.
-// For more realistic examples:
-// https://github.com/apache/qpid-proton/blob/master/examples/go/README.md
-//
-func Example_clientServer() {
- addr := startServer()
- client(addr)
- // Output:
- // client sending
- // server "target" received "hello"
- // server "source" sending "hello yourself"
- // client received "hello yourself"
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/97815c34/proton-c/bindings/go/src/qpid.apache.org/electron/example_client_server_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/example_client_server_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/example_client_server_test.go
new file mode 100644
index 0000000..3aa5892
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/example_client_server_test.go
@@ -0,0 +1,85 @@
+package electron_test
+
+import (
+ "fmt"
+ "net"
+ "qpid.apache.org/amqp"
+ "qpid.apache.org/electron"
+ "sync"
+)
+
+// Example Server that accepts a single Connection, Session and Receiver link
+// and prints messages received until the link closes.
+func Server(l net.Listener) {
+ cont := electron.NewContainer("server")
+ c, _ := cont.Accept(l) // Ignoring error handling
+ l.Close() // This server only accepts one connection
+ // Process incoming endpoints till we get a Receiver link
+ var r electron.Receiver
+ for r == nil {
+ in := <-c.Incoming()
+ switch in := in.(type) {
+ case *electron.IncomingSession, *electron.IncomingConnection:
+ in.Accept() // Accept the incoming connection and session for the receiver
+ case *electron.IncomingReceiver:
+ in.SetCapacity(10)
+ in.SetPrefetch(true) // Automatic flow control for a buffer of 10 messages.
+ r = in.Accept().(electron.Receiver)
+ case nil:
+ return // Connection is closed
+ default:
+ in.Reject(amqp.Errorf("example-server", "unexpected endpoint %v", in))
+ }
+ }
+ go func() { // Reject any further incoming endpoints
+ for in := range c.Incoming() {
+ in.Reject(amqp.Errorf("example-server", "unexpected endpoint %v", in))
+ }
+ }()
+ // Receive messages till the Receiver closes
+ rm, err := r.Receive()
+ for ; err == nil; rm, err = r.Receive() {
+ fmt.Printf("server received: %q\n", rm.Message.Body())
+ rm.Accept() // Signal to the client that the message was accepted
+ }
+ fmt.Printf("server receiver closed: %v\n", err)
+}
+
+// Example client sending messages to a server running in a goroutine.
+//
+// Normally client and server would be separate processes. For more realistic and detailed examples:
+// https://github.com/apache/qpid-proton/blob/master/examples/go/README.md
+//
+func Example_clientServer() {
+ // NOTE: We ignoring error handling in this example
+ l, _ := net.Listen("tcp", "") // Open a listening port for server, client connect to this port
+
+ // SERVER: start the server running in a separate goroutine
+ var waitServer sync.WaitGroup // We will wait for the server goroutine to finish before exiting
+ waitServer.Add(1)
+ go func() { // Run the server in the background
+ defer waitServer.Done()
+ Server(l)
+ }()
+
+ // CLIENT: Send messages to the server
+ addr := l.Addr()
+ c, _ := electron.Dial(addr.Network(), addr.String())
+ s, _ := c.Sender()
+ for i := 0; i < 3; i++ {
+ msg := fmt.Sprintf("hello %v", i)
+ // Send and wait for the Outcome from the server.
+ // Note: For higher throughput, use SendAsync() to send a stream of messages
+ // and process the returning stream of Outcomes concurrently.
+ s.SendSync(amqp.NewMessageWith(msg))
+ }
+ c.Close(nil) // Closing the connection will stop the server
+
+ waitServer.Wait() // Let the server finish
+
+ // Output:
+ // server received: "hello 0"
+ // server received: "hello 1"
+ // server received: "hello 2"
+ // server receiver closed: EOF
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org