You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/10/19 12:40:36 UTC
[50/50] qpid-proton git commit: Merge branch 'master' into go1 -
examples and doc
Merge branch 'master' into go1 - examples and doc
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/14f7ca56
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/14f7ca56
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/14f7ca56
Branch: refs/heads/go1
Commit: 14f7ca56f863f45393ced61f75861cf1e1a8e5be
Parents: 3d8368b 97815c3
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Oct 19 13:39:16 2017 +0100
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 19 13:39:16 2017 +0100
----------------------------------------------------------------------
README.md | 20 ++++++-
electron/connection.go | 24 +++++---
electron/container.go | 3 +-
electron/ex_client_server_test.go | 81 ---------------------------
electron/example_client_server_test.go | 85 +++++++++++++++++++++++++++++
5 files changed, 120 insertions(+), 93 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/14f7ca56/README.md
----------------------------------------------------------------------
diff --cc README.md
index 7415929,8b34f63..0977787
--- a/README.md
+++ b/README.md
@@@ -1,102 -1,43 +1,116 @@@
-Qpid Proton - AMQP messaging toolkit
-====================================
+# Qpid Go packages for AMQP
-Linux Build | Windows Build
-------------|--------------
-[![Linux Build Status](https://travis-ci.org/apache/qpid-proton.svg?branch=master)](https://travis-ci.org/apache/qpid-proton) | [![Windows Build Status](https://ci.appveyor.com/api/projects/status/github/apache/qpid-proton?branch=master&svg=true)](https://ci.appveyor.com/project/ke4qqq/qpid-proton/branch/master)
+These packages provide [Go](http://golang.org) support for sending and receiving
+AMQP messages in client or server applications. Reference documentation is
+available at: <http://godoc.org/?q=qpid.apache.org>
- They require the [proton-C library](http://qpid.apache.org/proton) to be installed.
- On many platforms it is avaialable pre-packaged, for example on Fedora
-Qpid Proton is a high-performance, lightweight messaging library. It can be
-used in the widest range of messaging applications, including brokers, client
-libraries, routers, bridges, proxies, and more. Proton makes it trivial to
-integrate with the AMQP 1.0 ecosystem from any platform, environment, or
-language
++They require the
++[proton-C library and header files](http://qpid.apache.org/proton) to be
++installed. On many platforms it is avaialable pre-packaged, for example on
++Fedora
- yum install qpid-proton-c-devel
-Features
---------
++ dnf install qpid-proton-c-devel
+
- - A flexible and capable reactive messaging API
- - Full control of AMQP 1.0 protocol semantics
- - Portable C implementation with bindings to popular languages
- - Peer-to-peer and brokered messaging
- - Secure communication via SSL and SASL
++If you built proton from source, you can set environment variables to find the
++built libraries and headers as follows:
+
-Universal - Proton is designed to scale both up and down. Equally suitable for
-simple clients or high-powered servers, it can be deployed in simple
-peer-to-peer configurations or as part of a global federated messaging network.
++ source <build-directory>/config.sh
+
-Embeddable - Proton is carefully written to be portable and cross platform. It
-has minimal dependencies, and it is architected to be usable with any threading
-model, as well as with non-threaded applications. These features make it
-uniquely suited for embedding messaging capabilities into existing software.
++If you have installed the library and headers in non-standard directories, then
++add them to the following environment variables:
+
-Standard - Built around the AMQP 1.0 messaging standard, Proton is not only
-ideal for building out your own messaging applications but also for connecting
-them to the broader ecosystem of AMQP 1.0-based messaging applications.
++ LD_LIBRARY_PATH # directory containing the library
++ LIBRARY_PATH # directory containing the library
++ C_INCLUDE_PATH # directory containing the proton/ subdirectory with header files
-Getting Started
----------------
+There are 3 packages:
-See the included INSTALL.md file for build and install instructions and the
-DEVELOPERS file for information on how to modify and test the library code
-itself.
+[qpid.apache.org/amqp](http://godoc.org/qpid.apache.org/amqp) provides functions
+to convert AMQP messages and data types to and from Go data types. Used by both
+the proton and electron packages to manage AMQP data.
-Please see http://qpid.apache.org/proton for a more info.
+[qpid.apache.org/electron](http://godoc.org/qpid.apache.org/electron) is a
+simple, concurrent-safe API for sending and receiving messages. It can be used
+with goroutines and channels to build concurrent AMQP clients and servers.
+
+[qpid.apache.org/proton](http://godoc.org/qpid.apache.org/proton) is an
+event-driven, concurrent-unsafe package that closely follows the proton C
+API. Most Go programmers will find the
+[electron](http://godoc.org/qpid.apache.org/electron) package easier to use.
+
+See the [examples](https://github.com/apache/qpid-proton/blob/master/examples/go/README.md)
+to help you get started.
+
+Feedback is encouraged at:
+
+- Email <pr...@qpid.apache.org>
+- Create issues <https://issues.apache.org/jira/browse/PROTON>, attach patches to an issue.
+
+### Why two APIs?
+
+The `proton` API is a direct mapping of the proton C library into Go. It is
+usable but not very natural for a Go programmer because it takes an
+*event-driven* approach and has no built-in support for concurrent
+use. `electron` uses `proton` internally but provides a more Go-like API that is
+safe to use from multiple concurrent goroutines.
+
+Go encourages programs to be structured as concurrent *goroutines* that
+communicate via *channels*. Go literature distinguishes between:
+
+- *concurrency*: "keeping track of things that could be done in parallel"
+- *parallelism*: "actually doing things in parallel on multiple CPUs or cores"
+
+A Go program expresses concurrency by starting goroutines for potentially
+concurrent tasks. The Go runtime schedules the activity of goroutines onto a
+small number (possibly one) of actual parallel executions.
+
+Even with no hardware parallelism, goroutine concurrency lets the Go runtime
+order unpredictable events like file descriptors being readable/writable,
+channels having data, timers firing etc. Go automatically takes care of
+switching out goroutines that block or sleep so it is normal to write code in
+terms of blocking calls.
+
+By contrast, event-driven programming is based on polling mechanisms like
+`select`, `poll` or `epoll`. These also dispatch unpredictably ordered events to
+a single thread or a small thread pool. However this requires a different style
+of programming: "event-driven" or "reactive" programming. Go developers call it
+"inside-out" programming. In an event-driven program blocking is a big problem
+as it consumes a scarce thread of execution, so actions that take time to
+complete have to be re-structured in terms of multiple events.
+
+The promise of Go is that you can express your program in concurrent, sequential
+terms and the Go runtime will turn it inside-out for you. You can start
+goroutines for all concurrent activities. They can loop forever or block for as
+long as they need waiting for timers, IO or any unpredictable event. Go will
+interleave and schedule them efficiently onto the available parallel hardware.
+
+For example: in the `electron` API, you can send a message and wait for it to be
+acknowledged in a single function. All the information about the message, why
+you sent it, and what to do when it is acknowledged can be held in local
+variables, all the code is in a simple sequence. Other goroutines in your
+program can be sending and receiving messages concurrently, they are not
+blocked.
+
+In the `proton` API, an event handler that sends a message must return
+*immediately*, it cannot block the event loop to wait for
+acknowledgement. Acknowledgement is a separate event, so the code for handling
+it is in a different event handler. Context information about the message has to
+be stored in some non-local variable that both functions can find. This makes
+the code harder to follow.
+
+The `proton` API is important because it is the foundation for the `electron`
+API, and may be useful for programs that need to be close to the original C
+library for some reason. However the `electron` API hides the event-driven
+details behind simple, sequential, concurrent-safe methods that can be called
+from arbitrary goroutines. Under the covers, data is passed through channels to
+dedicated `proton` goroutines so user goroutines can work concurrently with the
+proton event-loop.
+
+## New to Go?
+
+If you are new to Go then these are a good place to start:
+
+- [A Tour of Go](http://tour.golang.org)
+- [Effective Go](http://golang.org/doc/effective_go.html)
+
+Then look at the tools and docs at <http://golang.org> as you need them.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/14f7ca56/electron/connection.go
----------------------------------------------------------------------
diff --cc electron/connection.go
index 8f62491,0000000..267ee1e
mode 100644,000000..100644
--- a/electron/connection.go
+++ b/electron/connection.go
@@@ -1,405 -1,0 +1,413 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+// #include <proton/disposition.h>
+import "C"
+
+import (
+ "net"
+ "qpid.apache.org/proton"
+ "sync"
+ "time"
+)
+
+// Settings associated with a Connection.
+type ConnectionSettings interface {
+ // Authenticated user name associated with the connection.
+ User() string
+
+ // The AMQP virtual host name for the connection.
+ //
+ // Optional, useful when the server has multiple names and provides different
+ // service based on the name the client uses to connect.
+ //
+ // By default it is set to the DNS host name that the client uses to connect,
+ // but it can be set to something different at the client side with the
+ // VirtualHost() option.
+ //
+ // Returns error if the connection fails to authenticate.
+ VirtualHost() string
+
+ // Heartbeat is the maximum delay between sending frames that the remote peer
+ // has requested of us. If the interval expires an empty "heartbeat" frame
+ // will be sent automatically to keep the connection open.
+ Heartbeat() time.Duration
+}
+
+// Connection is an AMQP connection, created by a Container.
+type Connection interface {
+ Endpoint
+ ConnectionSettings
+
+ // Sender opens a new sender on the DefaultSession.
+ Sender(...LinkOption) (Sender, error)
+
+ // Receiver opens a new Receiver on the DefaultSession().
+ Receiver(...LinkOption) (Receiver, error)
+
+ // DefaultSession() returns a default session for the connection. It is opened
+ // on the first call to DefaultSession and returned on subsequent calls.
+ DefaultSession() (Session, error)
+
+ // Session opens a new session.
+ Session(...SessionOption) (Session, error)
+
+ // Container for the connection.
+ Container() Container
+
+ // Disconnect the connection abruptly with an error.
+ Disconnect(error)
+
+ // Wait waits for the connection to be disconnected.
+ Wait() error
+
+ // WaitTimeout is like Wait but returns Timeout if the timeout expires.
+ WaitTimeout(time.Duration) error
+
+ // Incoming returns a channel for incoming endpoints opened by the remote peer.
- // See the Incoming interface for more.
++ // See the Incoming interface for more detail.
+ //
- // Not receiving from Incoming() and calling Accept/Reject will block the
- // electron event loop. You should run a loop to handle the types that
- // interest you in a switch{} and and Accept() all others.
++ // Note: this channel will first return an *IncomingConnection for the
++ // connection itself which allows you to look at security information and
++ // decide whether to Accept() or Reject() the connection. Then it will return
++ // *IncomingSession, *IncomingSender and *IncomingReceiver as they are opened
++ // by the remote end.
++ //
++ // Note 2: you must receiving from Incoming() and call Accept/Reject to avoid
++ // blocking electron event loop. Normally you would run a loop in a goroutine
++ // to handle incoming types that interest and Accept() those that don't.
+ Incoming() <-chan Incoming
+}
+
+type connectionSettings struct {
+ user, virtualHost string
+ heartbeat time.Duration
+}
+
+func (c connectionSettings) User() string { return c.user }
+func (c connectionSettings) VirtualHost() string { return c.virtualHost }
+func (c connectionSettings) Heartbeat() time.Duration { return c.heartbeat }
+
+// ConnectionOption can be passed when creating a connection to configure various options
+type ConnectionOption func(*connection)
+
+// User returns a ConnectionOption sets the user name for a connection
+func User(user string) ConnectionOption {
+ return func(c *connection) {
+ c.user = user
+ c.pConnection.SetUser(user)
+ }
+}
+
+// VirtualHost returns a ConnectionOption to set the AMQP virtual host for the connection.
+// Only applies to outbound client connection.
+func VirtualHost(virtualHost string) ConnectionOption {
+ return func(c *connection) {
+ c.virtualHost = virtualHost
+ c.pConnection.SetHostname(virtualHost)
+ }
+}
+
+// Password returns a ConnectionOption to set the password used to establish a
+// connection. Only applies to outbound client connection.
+//
+// The connection will erase its copy of the password from memory as soon as it
+// has been used to authenticate. If you are concerned about paswords staying in
+// memory you should never store them as strings, and should overwrite your
+// copy as soon as you are done with it.
+//
+func Password(password []byte) ConnectionOption {
+ return func(c *connection) { c.pConnection.SetPassword(password) }
+}
+
+// Server returns a ConnectionOption to put the connection in server mode for incoming connections.
+//
+// A server connection will do protocol negotiation to accept a incoming AMQP
+// connection. Normally you would call this for a connection created by
+// net.Listener.Accept()
+//
+func Server() ConnectionOption {
+ return func(c *connection) { c.engine.Server(); c.server = true; AllowIncoming()(c) }
+}
+
+// AllowIncoming returns a ConnectionOption to enable incoming endpoints, see
+// Connection.Incoming() This is automatically set for Server() connections.
+func AllowIncoming() ConnectionOption {
+ return func(c *connection) { c.incoming = make(chan Incoming) }
+}
+
+// Parent returns a ConnectionOption that associates the Connection with it's Container
+// If not set a connection will create its own default container.
+func Parent(cont Container) ConnectionOption {
+ return func(c *connection) { c.container = cont.(*container) }
+}
+
+type connection struct {
+ endpoint
+ connectionSettings
+
+ defaultSessionOnce, closeOnce sync.Once
+
+ container *container
+ conn net.Conn
+ server bool
+ incoming chan Incoming
+ handler *handler
+ engine *proton.Engine
+ pConnection proton.Connection
+
+ defaultSession Session
+}
+
+// NewConnection creates a connection with the given options.
+func NewConnection(conn net.Conn, opts ...ConnectionOption) (*connection, error) {
+ c := &connection{
+ conn: conn,
+ }
+ c.handler = newHandler(c)
+ var err error
+ c.engine, err = proton.NewEngine(c.conn, c.handler.delegator)
+ if err != nil {
+ return nil, err
+ }
+ c.pConnection = c.engine.Connection()
+ for _, set := range opts {
+ set(c)
+ }
+ if c.container == nil {
+ c.container = NewContainer("").(*container)
+ }
+ c.pConnection.SetContainer(c.container.Id())
+ globalSASLInit(c.engine)
+
+ c.endpoint.init(c.engine.String())
+ go c.run()
+ return c, nil
+}
+
+func (c *connection) run() {
+ if !c.server {
+ c.pConnection.Open()
+ }
+ _ = c.engine.Run()
+ if c.incoming != nil {
+ close(c.incoming)
+ }
+ _ = c.closed(Closed)
+}
+
+func (c *connection) Close(err error) {
+ c.err.Set(err)
+ c.engine.Close(err)
+}
+
+func (c *connection) Disconnect(err error) {
+ c.err.Set(err)
+ c.engine.Disconnect(err)
+}
+
+func (c *connection) Session(opts ...SessionOption) (Session, error) {
+ var s Session
+ err := c.engine.InjectWait(func() error {
+ if c.Error() != nil {
+ return c.Error()
+ }
+ pSession, err := c.engine.Connection().Session()
+ if err == nil {
+ pSession.Open()
+ if err == nil {
+ s = newSession(c, pSession, opts...)
+ }
+ }
+ return err
+ })
+ return s, err
+}
+
+func (c *connection) Container() Container { return c.container }
+
+func (c *connection) DefaultSession() (s Session, err error) {
+ c.defaultSessionOnce.Do(func() {
+ c.defaultSession, err = c.Session()
+ })
+ if err == nil {
+ err = c.Error()
+ }
+ return c.defaultSession, err
+}
+
+func (c *connection) Sender(opts ...LinkOption) (Sender, error) {
+ if s, err := c.DefaultSession(); err == nil {
+ return s.Sender(opts...)
+ } else {
+ return nil, err
+ }
+}
+
+func (c *connection) Receiver(opts ...LinkOption) (Receiver, error) {
+ if s, err := c.DefaultSession(); err == nil {
+ return s.Receiver(opts...)
+ } else {
+ return nil, err
+ }
+}
+
+func (c *connection) Connection() Connection { return c }
+
+func (c *connection) Wait() error { return c.WaitTimeout(Forever) }
+func (c *connection) WaitTimeout(timeout time.Duration) error {
+ _, err := timedReceive(c.done, timeout)
+ if err == Timeout {
+ return Timeout
+ }
+ return c.Error()
+}
+
+func (c *connection) Incoming() <-chan Incoming {
+ assert(c.incoming != nil, "Incoming() is only allowed for a Connection created with the Server() option: %s", c)
+ return c.incoming
+}
+
+type IncomingConnection struct {
+ incoming
+ connectionSettings
+ c *connection
+}
+
+func newIncomingConnection(c *connection) *IncomingConnection {
+ c.user = c.pConnection.Transport().User()
+ c.virtualHost = c.pConnection.RemoteHostname()
+ return &IncomingConnection{
+ incoming: makeIncoming(c.pConnection),
+ connectionSettings: c.connectionSettings,
+ c: c}
+}
+
+// AcceptConnection is like Accept() but takes ConnectionOption s
+// For example you can set the Heartbeat() for the accepted connection.
+func (in *IncomingConnection) AcceptConnection(opts ...ConnectionOption) Connection {
+ return in.accept(func() Endpoint {
+ for _, opt := range opts {
+ opt(in.c)
+ }
+ in.c.pConnection.Open()
+ return in.c
+ }).(Connection)
+}
+
+func (in *IncomingConnection) Accept() Endpoint {
+ return in.AcceptConnection()
+}
+
+func sasl(c *connection) proton.SASL { return c.engine.Transport().SASL() }
+
+// SASLEnable returns a ConnectionOption that enables SASL authentication.
+// Only required if you don't set any other SASL options.
+func SASLEnable() ConnectionOption { return func(c *connection) { sasl(c) } }
+
+// SASLAllowedMechs returns a ConnectionOption to set the list of allowed SASL
+// mechanisms.
+//
+// Can be used on the client or the server to restrict the SASL for a connection.
+// mechs is a space-separated list of mechanism names.
+//
+func SASLAllowedMechs(mechs string) ConnectionOption {
+ return func(c *connection) { sasl(c).AllowedMechs(mechs) }
+}
+
+// SASLAllowInsecure returns a ConnectionOption that allows or disallows clear
+// text SASL authentication mechanisms
+//
+// By default the SASL layer is configured not to allow mechanisms that disclose
+// the clear text of the password over an unencrypted AMQP connection. This specifically
+// will disallow the use of the PLAIN mechanism without using SSL encryption.
+//
+// This default is to avoid disclosing password information accidentally over an
+// insecure network.
+//
+func SASLAllowInsecure(b bool) ConnectionOption {
+ return func(c *connection) { sasl(c).SetAllowInsecureMechs(b) }
+}
+
+// Heartbeat returns a ConnectionOption that requests the maximum delay
+// between sending frames for the remote peer. If we don't receive any frames
+// within 2*delay we will close the connection.
+//
+func Heartbeat(delay time.Duration) ConnectionOption {
+ // Proton-C divides the idle-timeout by 2 before sending, so compensate.
+ return func(c *connection) { c.engine.Transport().SetIdleTimeout(2 * delay) }
+}
+
+// GlobalSASLConfigDir sets the SASL configuration directory for every
+// Connection created in this process. If not called, the default is determined
+// by your SASL installation.
+//
+// You can set SASLAllowInsecure and SASLAllowedMechs on individual connections.
+//
+func GlobalSASLConfigDir(dir string) { globalSASLConfigDir = dir }
+
+// GlobalSASLConfigName sets the SASL configuration name for every Connection
+// created in this process. If not called the default is "proton-server".
+//
+// The complete configuration file name is
+// <sasl-config-dir>/<sasl-config-name>.conf
+//
+// You can set SASLAllowInsecure and SASLAllowedMechs on individual connections.
+//
+func GlobalSASLConfigName(dir string) { globalSASLConfigName = dir }
+
+var (
+ globalSASLConfigName string
+ globalSASLConfigDir string
+)
+
+// TODO aconway 2016-09-15: Current pn_sasl C impl config is broken, so all we
+// can realistically offer is global configuration. Later if/when the pn_sasl C
+// impl is fixed we can offer per connection over-rides.
+func globalSASLInit(eng *proton.Engine) {
+ sasl := eng.Transport().SASL()
+ if globalSASLConfigName != "" {
+ sasl.ConfigName(globalSASLConfigName)
+ }
+ if globalSASLConfigDir != "" {
+ sasl.ConfigPath(globalSASLConfigDir)
+ }
+}
+
+// Dial is shorthand for using net.Dial() then NewConnection()
- func Dial(network, addr string, opts ...ConnectionOption) (c Connection, err error) {
- conn, err := net.Dial(network, addr)
++// See net.Dial() for the meaning of the network, address arguments.
++func Dial(network, address string, opts ...ConnectionOption) (c Connection, err error) {
++ conn, err := net.Dial(network, address)
+ if err == nil {
+ c, err = NewConnection(conn, opts...)
+ }
+ return
+}
+
+// DialWithDialer is shorthand for using dialer.Dial() then NewConnection()
- func DialWithDialer(dialer *net.Dialer, network, addr string, opts ...ConnectionOption) (c Connection, err error) {
- conn, err := dialer.Dial(network, addr)
++// See net.Dial() for the meaning of the network, address arguments.
++func DialWithDialer(dialer *net.Dialer, network, address string, opts ...ConnectionOption) (c Connection, err error) {
++ conn, err := dialer.Dial(network, address)
+ if err == nil {
+ c, err = NewConnection(conn, opts...)
+ }
+ return
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/14f7ca56/electron/container.go
----------------------------------------------------------------------
diff --cc electron/container.go
index efb24ff,0000000..7c19aa5
mode 100644,000000..100644
--- a/electron/container.go
+++ b/electron/container.go
@@@ -1,104 -1,0 +1,105 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
+ "net"
+ "qpid.apache.org/proton"
+ "strconv"
+ "sync/atomic"
+)
+
+// Container is an AMQP container, it represents a single AMQP "application"
+// which can have multiple client or server connections.
+//
+// Each Container in a distributed AMQP application must have a unique
+// container-id which is applied to its connections.
+//
+// Create with NewContainer()
+//
+type Container interface {
+ // Id is a unique identifier for the container in your distributed application.
+ Id() string
+
+ // Connection creates a connection associated with this container.
+ Connection(conn net.Conn, opts ...ConnectionOption) (Connection, error)
+
+ // Dial is shorthand for
+ // conn, err := net.Dial(); c, err := Connection(conn, opts...)
- Dial(network string, addr string, opts ...ConnectionOption) (Connection, error)
++ // See net.Dial() for the meaning of the network, address arguments.
++ Dial(network string, address string, opts ...ConnectionOption) (Connection, error)
+
+ // Accept is shorthand for:
+ // conn, err := l.Accept(); c, err := Connection(conn, append(opts, Server()...)
+ Accept(l net.Listener, opts ...ConnectionOption) (Connection, error)
+
+ // String returns Id()
+ String() string
+}
+
+type container struct {
+ id string
+ tagCounter uint64
+}
+
+func (cont *container) nextTag() string {
+ return strconv.FormatUint(atomic.AddUint64(&cont.tagCounter, 1), 32)
+}
+
+// NewContainer creates a new container. The id must be unique in your
+// distributed application, all connections created by the container
+// will have this container-id.
+//
+// If id == "" a random UUID will be generated for the id.
+func NewContainer(id string) Container {
+ if id == "" {
+ id = proton.UUID4().String()
+ }
+ cont := &container{id: id}
+ return cont
+}
+
+func (cont *container) Id() string { return cont.id }
+
+func (cont *container) String() string { return cont.Id() }
+
+func (cont *container) nextLinkName() string {
+ return cont.id + "@" + cont.nextTag()
+}
+
+func (cont *container) Connection(conn net.Conn, opts ...ConnectionOption) (Connection, error) {
+ return NewConnection(conn, append(opts, Parent(cont))...)
+}
+
+func (cont *container) Dial(network, address string, opts ...ConnectionOption) (c Connection, err error) {
+ conn, err := net.Dial(network, address)
+ if err == nil {
+ c, err = cont.Connection(conn, opts...)
+ }
+ return
+}
+
+func (cont *container) Accept(l net.Listener, opts ...ConnectionOption) (c Connection, err error) {
+ conn, err := l.Accept()
+ if err == nil {
+ c, err = cont.Connection(conn, append(opts, Server())...)
+ }
+ return
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/14f7ca56/electron/example_client_server_test.go
----------------------------------------------------------------------
diff --cc electron/example_client_server_test.go
index 0000000,0000000..3aa5892
new file mode 100644
--- /dev/null
+++ b/electron/example_client_server_test.go
@@@ -1,0 -1,0 +1,85 @@@
++package electron_test
++
++import (
++ "fmt"
++ "net"
++ "qpid.apache.org/amqp"
++ "qpid.apache.org/electron"
++ "sync"
++)
++
++// Example Server that accepts a single Connection, Session and Receiver link
++// and prints messages received until the link closes.
++func Server(l net.Listener) {
++ cont := electron.NewContainer("server")
++ c, _ := cont.Accept(l) // Ignoring error handling
++ l.Close() // This server only accepts one connection
++ // Process incoming endpoints till we get a Receiver link
++ var r electron.Receiver
++ for r == nil {
++ in := <-c.Incoming()
++ switch in := in.(type) {
++ case *electron.IncomingSession, *electron.IncomingConnection:
++ in.Accept() // Accept the incoming connection and session for the receiver
++ case *electron.IncomingReceiver:
++ in.SetCapacity(10)
++ in.SetPrefetch(true) // Automatic flow control for a buffer of 10 messages.
++ r = in.Accept().(electron.Receiver)
++ case nil:
++ return // Connection is closed
++ default:
++ in.Reject(amqp.Errorf("example-server", "unexpected endpoint %v", in))
++ }
++ }
++ go func() { // Reject any further incoming endpoints
++ for in := range c.Incoming() {
++ in.Reject(amqp.Errorf("example-server", "unexpected endpoint %v", in))
++ }
++ }()
++ // Receive messages till the Receiver closes
++ rm, err := r.Receive()
++ for ; err == nil; rm, err = r.Receive() {
++ fmt.Printf("server received: %q\n", rm.Message.Body())
++ rm.Accept() // Signal to the client that the message was accepted
++ }
++ fmt.Printf("server receiver closed: %v\n", err)
++}
++
++// Example client sending messages to a server running in a goroutine.
++//
++// Normally client and server would be separate processes. For more realistic and detailed examples:
++// https://github.com/apache/qpid-proton/blob/master/examples/go/README.md
++//
++func Example_clientServer() {
++ // NOTE: We ignoring error handling in this example
++ l, _ := net.Listen("tcp", "") // Open a listening port for server, client connect to this port
++
++ // SERVER: start the server running in a separate goroutine
++ var waitServer sync.WaitGroup // We will wait for the server goroutine to finish before exiting
++ waitServer.Add(1)
++ go func() { // Run the server in the background
++ defer waitServer.Done()
++ Server(l)
++ }()
++
++ // CLIENT: Send messages to the server
++ addr := l.Addr()
++ c, _ := electron.Dial(addr.Network(), addr.String())
++ s, _ := c.Sender()
++ for i := 0; i < 3; i++ {
++ msg := fmt.Sprintf("hello %v", i)
++ // Send and wait for the Outcome from the server.
++ // Note: For higher throughput, use SendAsync() to send a stream of messages
++ // and process the returning stream of Outcomes concurrently.
++ s.SendSync(amqp.NewMessageWith(msg))
++ }
++ c.Close(nil) // Closing the connection will stop the server
++
++ waitServer.Wait() // Let the server finish
++
++ // Output:
++ // server received: "hello 0"
++ // server received: "hello 1"
++ // server received: "hello 2"
++ // server receiver closed: EOF
++}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org