You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/10/19 12:38:02 UTC

qpid-proton git commit: NO-JIRA: [go] Improved client/server example, minor doc updates

Repository: qpid-proton
Updated Branches:
  refs/heads/master dbda49bb2 -> 97815c342


NO-JIRA: [go] Improved client/server example, minor doc updates


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/97815c34
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/97815c34
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/97815c34

Branch: refs/heads/master
Commit: 97815c342c68c4ee2f9919ccd20e8493296b1743
Parents: dbda49b
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Oct 18 16:58:47 2017 +0100
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 19 13:37:27 2017 +0100

----------------------------------------------------------------------
 examples/go/electron/receive.go                 |  2 +-
 examples/go/electron/send.go                    |  2 +-
 .../src/qpid.apache.org/electron/connection.go  | 24 ++++--
 .../src/qpid.apache.org/electron/container.go   |  3 +-
 .../electron/ex_client_server_test.go           | 81 -------------------
 .../electron/example_client_server_test.go      | 85 ++++++++++++++++++++
 6 files changed, 105 insertions(+), 92 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/97815c34/examples/go/electron/receive.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/receive.go b/examples/go/electron/receive.go
index 93046b3..568fcc9 100644
--- a/examples/go/electron/receive.go
+++ b/examples/go/electron/receive.go
@@ -74,7 +74,7 @@ func main() {
 			defer wait.Done() // Notify main() when this goroutine is done.
 			url, err := amqp.ParseURL(urlStr)
 			fatalIf(err)
-			c, err := container.Dial("tcp", url.Host)
+			c, err := container.Dial("tcp", url.Host) // NOTE: Dial takes just the Host part of the URL
 			fatalIf(err)
 			connections <- c // Save connection so we can Close() when main() ends
 			addr := strings.TrimPrefix(url.Path, "/")

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/97815c34/examples/go/electron/send.go
----------------------------------------------------------------------
diff --git a/examples/go/electron/send.go b/examples/go/electron/send.go
index f96400b..ac4e367 100644
--- a/examples/go/electron/send.go
+++ b/examples/go/electron/send.go
@@ -73,7 +73,7 @@ func main() {
 			defer wait.Done() // Notify main() when this goroutine is done.
 			url, err := amqp.ParseURL(urlStr)
 			fatalIf(err)
-			c, err := container.Dial("tcp", url.Host)
+			c, err := container.Dial("tcp", url.Host) // NOTE: Dial takes just the Host part of the URL
 			fatalIf(err)
 			connections <- c // Save connection so we can Close() when main() ends
 			addr := strings.TrimPrefix(url.Path, "/")

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/97815c34/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
index 8f62491..267ee1e 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
@@ -83,11 +83,17 @@ type Connection interface {
 	WaitTimeout(time.Duration) error
 
 	// Incoming returns a channel for incoming endpoints opened by the remote peer.
-	// See the Incoming interface for more.
+	// See the Incoming interface for more detail.
 	//
-	// Not receiving from Incoming() and calling Accept/Reject will block the
-	// electron event loop. You should run a loop to handle the types that
-	// interest you in a switch{} and and Accept() all others.
+	// Note: this channel will first return an *IncomingConnection for the
+	// connection itself which allows you to look at security information and
+	// decide whether to Accept() or Reject() the connection. Then it will return
+	// *IncomingSession, *IncomingSender and *IncomingReceiver as they are opened
+	// by the remote end.
+	//
+	// Note 2: you must receiving from Incoming() and call Accept/Reject to avoid
+	// blocking electron event loop. Normally you would run a loop in a goroutine
+	// to handle incoming types that interest and Accept() those that don't.
 	Incoming() <-chan Incoming
 }
 
@@ -387,8 +393,9 @@ func globalSASLInit(eng *proton.Engine) {
 }
 
 // Dial is shorthand for using net.Dial() then NewConnection()
-func Dial(network, addr string, opts ...ConnectionOption) (c Connection, err error) {
-	conn, err := net.Dial(network, addr)
+// See net.Dial() for the meaning of the network, address arguments.
+func Dial(network, address string, opts ...ConnectionOption) (c Connection, err error) {
+	conn, err := net.Dial(network, address)
 	if err == nil {
 		c, err = NewConnection(conn, opts...)
 	}
@@ -396,8 +403,9 @@ func Dial(network, addr string, opts ...ConnectionOption) (c Connection, err err
 }
 
 // DialWithDialer is shorthand for using dialer.Dial() then NewConnection()
-func DialWithDialer(dialer *net.Dialer, network, addr string, opts ...ConnectionOption) (c Connection, err error) {
-	conn, err := dialer.Dial(network, addr)
+// See net.Dial() for the meaning of the network, address arguments.
+func DialWithDialer(dialer *net.Dialer, network, address string, opts ...ConnectionOption) (c Connection, err error) {
+	conn, err := dialer.Dial(network, address)
 	if err == nil {
 		c, err = NewConnection(conn, opts...)
 	}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/97815c34/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/container.go b/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
index efb24ff..7c19aa5 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/container.go
@@ -43,7 +43,8 @@ type Container interface {
 
 	// Dial is shorthand for
 	//     conn, err := net.Dial(); c, err := Connection(conn, opts...)
-	Dial(network string, addr string, opts ...ConnectionOption) (Connection, error)
+	// See net.Dial() for the meaning of the network, address arguments.
+	Dial(network string, address string, opts ...ConnectionOption) (Connection, error)
 
 	// Accept is shorthand for:
 	//     conn, err := l.Accept(); c, err := Connection(conn, append(opts, Server()...)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/97815c34/proton-c/bindings/go/src/qpid.apache.org/electron/ex_client_server_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/ex_client_server_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/ex_client_server_test.go
deleted file mode 100644
index 93f275b..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/ex_client_server_test.go
+++ /dev/null
@@ -1,81 +0,0 @@
-package electron_test
-
-import (
-	"fmt"
-	"net"
-	"qpid.apache.org/amqp"
-	"qpid.apache.org/electron"
-)
-
-//  Print errors
-func check(msg string, err error) bool {
-	if err != nil {
-		fmt.Printf("%s: %s\n", msg, err)
-	}
-	return err == nil
-}
-
-func runServer(cont electron.Container, l net.Listener) {
-	for c, err := cont.Accept(l); check("accept connection", err); c, err = cont.Accept(l) {
-		go func() { // Process connections concurrently, accepting AMQP endpoints
-			for in := range c.Incoming() {
-				ep := in.Accept() // Accept all endpoints
-				go func() {       // Process endpoints concurrently
-					switch ep := ep.(type) {
-					case electron.Sender:
-						m := amqp.NewMessageWith("hello yourself")
-						fmt.Printf("server %q sending %q\n", ep.Source(), m.Body())
-						ep.SendForget(m) // One-way send, client does not need to Accept.
-					case electron.Receiver:
-						if rm, err := ep.Receive(); check("server receive", err) {
-							fmt.Printf("server %q received %q\n", ep.Target(), rm.Message.Body())
-							err := rm.Accept() // Client is waiting for Accept.
-							check("accept message", err)
-						}
-					}
-				}()
-			}
-		}()
-	}
-}
-
-func startServer() (addr net.Addr) {
-	cont := electron.NewContainer("server")
-	if l, err := net.Listen("tcp", ""); check("listen", err) {
-		addr = l.Addr()
-		go runServer(cont, l)
-	}
-	return addr
-}
-
-// Connect to addr and send/receive a message.
-func client(addr net.Addr) {
-	if c, err := electron.Dial(addr.Network(), addr.String()); check("dial", err) {
-		defer c.Close(nil)
-		if s, err := c.Sender(electron.Target("target")); check("sender", err) {
-			fmt.Printf("client sending\n")
-			s.SendSync(amqp.NewMessageWith("hello")) // Send and wait for server to Accept()
-		}
-		if r, err := c.Receiver(electron.Source("source")); check("receiver", err) {
-			if rm, err := r.Receive(); err == nil {
-				fmt.Printf("client received %q\n", rm.Message.Body())
-			}
-		}
-	}
-}
-
-// Example client and server communicating via AMQP over a TCP/IP connection.
-//
-// Normally client and server would be separate processes.
-// For more realistic examples:
-//     https://github.com/apache/qpid-proton/blob/master/examples/go/README.md
-//
-func Example_clientServer() {
-	addr := startServer()
-	client(addr)
-	// Output:
-	// client sending
-	// server "target" received "hello"
-	// server "source" sending "hello yourself"
-	// client received "hello yourself"
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/97815c34/proton-c/bindings/go/src/qpid.apache.org/electron/example_client_server_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/example_client_server_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/example_client_server_test.go
new file mode 100644
index 0000000..3aa5892
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/example_client_server_test.go
@@ -0,0 +1,85 @@
+package electron_test
+
+import (
+	"fmt"
+	"net"
+	"qpid.apache.org/amqp"
+	"qpid.apache.org/electron"
+	"sync"
+)
+
+// Example Server that accepts a single Connection, Session and Receiver link
+// and prints messages received until the link closes.
+func Server(l net.Listener) {
+	cont := electron.NewContainer("server")
+	c, _ := cont.Accept(l) // Ignoring error handling
+	l.Close()              // This server only accepts one connection
+	// Process incoming endpoints till we get a Receiver link
+	var r electron.Receiver
+	for r == nil {
+		in := <-c.Incoming()
+		switch in := in.(type) {
+		case *electron.IncomingSession, *electron.IncomingConnection:
+			in.Accept() // Accept the incoming connection and session for the receiver
+		case *electron.IncomingReceiver:
+			in.SetCapacity(10)
+			in.SetPrefetch(true) // Automatic flow control for a buffer of 10 messages.
+			r = in.Accept().(electron.Receiver)
+		case nil:
+			return // Connection is closed
+		default:
+			in.Reject(amqp.Errorf("example-server", "unexpected endpoint %v", in))
+		}
+	}
+	go func() { // Reject any further incoming endpoints
+		for in := range c.Incoming() {
+			in.Reject(amqp.Errorf("example-server", "unexpected endpoint %v", in))
+		}
+	}()
+	// Receive messages till the Receiver closes
+	rm, err := r.Receive()
+	for ; err == nil; rm, err = r.Receive() {
+		fmt.Printf("server received: %q\n", rm.Message.Body())
+		rm.Accept() // Signal to the client that the message was accepted
+	}
+	fmt.Printf("server receiver closed: %v\n", err)
+}
+
+// Example client sending messages to a server running in a goroutine.
+//
+// Normally client and server would be separate processes. For more realistic and detailed examples:
+//     https://github.com/apache/qpid-proton/blob/master/examples/go/README.md
+//
+func Example_clientServer() {
+	// NOTE: We ignoring error handling in this example
+	l, _ := net.Listen("tcp", "") // Open a listening port for server, client connect to this port
+
+	// SERVER: start the server running in a separate goroutine
+	var waitServer sync.WaitGroup // We will wait for the server goroutine to finish before exiting
+	waitServer.Add(1)
+	go func() { // Run the server in the background
+		defer waitServer.Done()
+		Server(l)
+	}()
+
+	// CLIENT: Send messages to the server
+	addr := l.Addr()
+	c, _ := electron.Dial(addr.Network(), addr.String())
+	s, _ := c.Sender()
+	for i := 0; i < 3; i++ {
+		msg := fmt.Sprintf("hello %v", i)
+		// Send and wait for the Outcome from the server.
+		// Note: For higher throughput, use SendAsync() to send a stream of messages
+		// and process the returning stream of Outcomes concurrently.
+		s.SendSync(amqp.NewMessageWith(msg))
+	}
+	c.Close(nil) // Closing the connection will stop the server
+
+	waitServer.Wait() // Let the server finish
+
+	// Output:
+	// server received: "hello 0"
+	// server received: "hello 1"
+	// server received: "hello 2"
+	// server receiver closed: EOF
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org