You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2015/10/23 16:36:04 UTC
[02/50] [abbrv] qpid-proton git commit: PROTON-827: Doc updates,
minor API cleanup.
PROTON-827: Doc updates, minor API cleanup.
Added DefaultSession, Sender()/Receiver() on Connection.
Improved documentation and package README
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/43c5cff3
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/43c5cff3
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/43c5cff3
Branch: refs/heads/go1
Commit: 43c5cff3274f4d4e9150202255ee9cafea83ed47
Parents: 3a48863
Author: Alan Conway <ac...@redhat.com>
Authored: Tue Sep 29 16:07:27 2015 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Tue Sep 29 16:12:14 2015 -0400
----------------------------------------------------------------------
examples/go/broker.go | 2 +-
examples/go/receive.go | 10 +---
examples/go/send.go | 10 +---
.../go/src/qpid.apache.org/proton/README.md | 12 ++++
.../proton/concurrent/connection.go | 62 +++++++++++++++++---
.../proton/concurrent/container.go | 4 +-
.../qpid.apache.org/proton/concurrent/doc.go | 22 ++++---
.../proton/concurrent/endpoint.go | 3 +-
.../proton/concurrent/messaging_test.go | 6 +-
.../proton/concurrent/receiver.go | 1 -
.../proton/concurrent/session.go | 5 +-
.../go/src/qpid.apache.org/proton/doc.go | 50 +++++++++++-----
.../go/src/qpid.apache.org/proton/engine.go | 7 +--
13 files changed, 134 insertions(+), 60 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/43c5cff3/examples/go/broker.go
----------------------------------------------------------------------
diff --git a/examples/go/broker.go b/examples/go/broker.go
index 3f85e9e..47d0a76 100644
--- a/examples/go/broker.go
+++ b/examples/go/broker.go
@@ -81,7 +81,7 @@ func (b *broker) listen(addr string) (err error) {
if err != nil {
return err
}
- c, err := b.container.NewConnection(conn)
+ c, err := b.container.Connection(conn)
if err != nil {
return err
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/43c5cff3/examples/go/receive.go
----------------------------------------------------------------------
diff --git a/examples/go/receive.go b/examples/go/receive.go
index a45ffe3..86244d7 100644
--- a/examples/go/receive.go
+++ b/examples/go/receive.go
@@ -73,19 +73,13 @@ func main() {
// Open a new connection
conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port"
util.ExitIf(err)
- c, err := container.NewConnection(conn)
+ c, err := container.Connection(conn)
util.ExitIf(err)
util.ExitIf(c.Open())
connections <- c // Save connection so we can Close() when main() ends
- // Create and open a session
- ss, err := c.NewSession()
- util.ExitIf(err)
- err = ss.Open()
- util.ExitIf(err)
-
// Create a Receiver using the path of the URL as the source address
- r, err := ss.Receiver(url.Path)
+ r, err := c.Receiver(url.Path)
util.ExitIf(err)
// Loop receiving messages and sending them to the main() goroutine
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/43c5cff3/examples/go/send.go
----------------------------------------------------------------------
diff --git a/examples/go/send.go b/examples/go/send.go
index 7fa5416..edac2ae 100644
--- a/examples/go/send.go
+++ b/examples/go/send.go
@@ -76,20 +76,14 @@ func main() {
// Open a new connection
conn, err := net.Dial("tcp", url.Host) // Note net.URL.Host is actually "host:port"
util.ExitIf(err)
- c, err := container.NewConnection(conn)
+ c, err := container.Connection(conn)
util.ExitIf(err)
err = c.Open()
util.ExitIf(err)
connections = append(connections, c) // Save connection so it will be closed when main() ends
- // Create and open a session
- ss, err := c.NewSession()
- util.ExitIf(err)
- err = ss.Open()
- util.ExitIf(err)
-
// Create a Sender using the path of the URL as the AMQP address
- s, err := ss.Sender(url.Path)
+ s, err := c.Sender(url.Path)
util.ExitIf(err)
// Loop sending messages.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/43c5cff3/proton-c/bindings/go/src/qpid.apache.org/proton/README.md
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/README.md b/proton-c/bindings/go/src/qpid.apache.org/proton/README.md
new file mode 100644
index 0000000..ad57b47
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/README.md
@@ -0,0 +1,12 @@
+# Go binding for proton
+
+This is a a [Go](http://golang.org) binding for proton.
+Package documentation is available at: <http://godoc.org/qpid.apache.org/proton>
+
+See the [examples](https://github.com/apache/qpid-proton/blob/master/examples/cpp/README.md)
+for working examples and practical instructions on how to get started.
+
+Feedback is encouraged at:
+
+- Email <pr...@qpid.apache.org>
+- Create issues <https://issues.apache.org/jira/browse/PROTON>, attach patches to an issue.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/43c5cff3/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/connection.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/connection.go b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/connection.go
index 9e82760..63fd3fc 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/connection.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/connection.go
@@ -33,6 +33,19 @@ import (
type Connection interface {
Endpoint
+ // Sender opens a new sender on the DefaultSession.
+ //
+ // v can be a string, which is used as the Target address, or a SenderSettings
+ // struct containing more details settings.
+ Sender(v interface{}) (Sender, error)
+
+ // Receiver opens a new Receiver on the DefaultSession().
+ //
+ // v can be a string, which is used as the
+ // Source address, or a ReceiverSettings struct containing more details
+ // settings.
+ Receiver(v interface{}) (Receiver, error)
+
// Server puts the connection in server mode, must be called before Open().
//
// A server connection will do protocol negotiation to accept a incoming AMQP
@@ -45,9 +58,12 @@ type Connection interface {
// Must be called before Open().
Listen()
- // NewSession creates a new local session, you must call Session.Open()
- // to open it with the remote peer.
- NewSession() (s Session, err error)
+ // DefaultSession() returns a default session for the connection. It is opened
+ // on the first call to DefaultSession and returned on subsequent calls.
+ DefaultSession() (Session, error)
+
+ // Session opens a new session.
+ Session() (Session, error)
// Accept returns the next Endpoint (Session, Sender or Receiver) opened by
// the remote peer. It returns (nil, error) if the connection closes.
@@ -70,13 +86,13 @@ type Connection interface {
// Container for the connection.
Container() Container
- // Disconnect the connection abrubtly.
+ // Disconnect the connection abruptly with an error.
Disconnect(error)
}
type connection struct {
endpoint
- listenOnce sync.Once
+ listenOnce, defaultSessionOnce sync.Once
// Set before Open()
container *container
@@ -88,6 +104,8 @@ type connection struct {
engine *proton.Engine
err internal.FirstError
eConnection proton.Connection
+
+ defaultSession Session
}
func newConnection(conn net.Conn, cont *container) (*connection, error) {
@@ -129,18 +147,20 @@ func (c *connection) Disconnect(err error) {
}
}
-// FIXME aconway 2015-09-24: needed?
func (c *connection) closed(err error) {
// Call from another goroutine to initiate close without deadlock.
go c.Close(err)
}
-func (c *connection) NewSession() (Session, error) {
+func (c *connection) Session() (Session, error) {
var s Session
err := c.engine.InjectWait(func() error {
eSession, err := c.engine.Connection().Session()
if err == nil {
- s = newSession(c, eSession)
+ eSession.Open()
+ if err == nil {
+ s = newSession(c, eSession)
+ }
}
return err
})
@@ -165,3 +185,29 @@ func (c *connection) Accept() (Endpoint, error) {
}
func (c *connection) Container() Container { return c.container }
+
+func (c *connection) DefaultSession() (s Session, err error) {
+ c.defaultSessionOnce.Do(func() {
+ c.defaultSession, err = c.Session()
+ })
+ if err == nil {
+ err = c.Error()
+ }
+ return c.defaultSession, err
+}
+
+func (c *connection) Sender(v interface{}) (Sender, error) {
+ if s, err := c.DefaultSession(); err == nil {
+ return s.Sender(v)
+ } else {
+ return nil, err
+ }
+}
+
+func (c *connection) Receiver(v interface{}) (Receiver, error) {
+ if s, err := c.DefaultSession(); err == nil {
+ return s.Receiver(v)
+ } else {
+ return nil, err
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/43c5cff3/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/container.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/container.go b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/container.go
index 5c090e3..5edecfc 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/container.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/container.go
@@ -39,7 +39,7 @@ type Container interface {
// setting any Connection properties you need to set. Note the net.Conn
// can be an outgoing connection (e.g. made with net.Dial) or an incoming
// connection (e.g. made with net.Listener.Accept())
- NewConnection(conn net.Conn) (Connection, error)
+ Connection(conn net.Conn) (Connection, error)
}
type container struct {
@@ -66,6 +66,6 @@ func (cont *container) nextLinkName() string {
return cont.id + "@" + cont.linkNames.Next()
}
-func (cont *container) NewConnection(conn net.Conn) (Connection, error) {
+func (cont *container) Connection(conn net.Conn) (Connection, error) {
return newConnection(conn, cont)
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/43c5cff3/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/doc.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/doc.go b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/doc.go
index 3e7756c..810a5da 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/doc.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/doc.go
@@ -20,15 +20,23 @@ under the License.
/*
Package concurrent provides a procedural, concurrent Go API for exchanging AMQP
-messages.
+messages. You can write clients or servers using this API.
-AMPQ defines a credit-based scheme for flow control of messages over a
-link. Credit is the number of messages the receiver is willing to accept. The
-receiver gives credit to the sender. The sender can send messages without
-waiting for a response from the receiver until it runs out of credit, at which
-point it must wait for more credit to send more messages.
+Start by creating a Container with NewContainer. A Container represents a client
+or server application that can contain incoming or outgoing connections.
+
+You can create connections with the standard Go 'net' package using net.Dial or
+net.Listen. Create an AMQP connection over a net.Conn with
+Container.Connection() and open it with Connection.Open().
+
+AMQP sends messages over "links", each link has a Sender and Receiver
+end. Connection.Sender() and Connection.Receiver() allow you to create links to
+send and receive messages.
+
+You can also create an AMQP server connection by calling Connection.Listen()
+before calling Open() on the connection. You can then call Connection.Accept()
+after calling Connection.Open() to accept incoming sessions and links.
-See the documentation of Sender and Receiver for details of how this API uses credit.
*/
package concurrent
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/43c5cff3/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/endpoint.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/endpoint.go b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/endpoint.go
index 717cac1..f647058 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/endpoint.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/endpoint.go
@@ -37,7 +37,8 @@ var Closed = io.EOF
// Link, Sender and Receiver for details.
//
type Endpoint interface {
- // Open the endpoint.
+ // Open the local end of a remotely-initiated endpoint. You must Open()
+ // endpoints returned by Connection.Accept() before using them.
Open() error
// Close an endpoint and signal an error to the remote end if error != nil.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/43c5cff3/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/messaging_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/messaging_test.go b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/messaging_test.go
index aa806d7..0ee9f1a 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/messaging_test.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/messaging_test.go
@@ -41,7 +41,7 @@ func newServer(cont Container) (net.Addr, <-chan Connection) {
ch := make(chan Connection)
go func() {
conn, err := listener.Accept()
- c, err := cont.NewConnection(conn)
+ c, err := cont.Connection(conn)
panicIf(err)
c.Server()
c.Listen()
@@ -55,10 +55,10 @@ func newServer(cont Container) (net.Addr, <-chan Connection) {
func newClient(cont Container, addr net.Addr) Session {
conn, err := net.Dial(addr.Network(), addr.String())
panicIf(err)
- c, err := cont.NewConnection(conn)
+ c, err := cont.Connection(conn)
panicIf(err)
c.Open()
- sn, err := c.NewSession()
+ sn, err := c.Session()
panicIf(err)
panicIf(sn.Open())
return sn
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/43c5cff3/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/receiver.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/receiver.go b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/receiver.go
index ad033a6..5bcf9f2 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/receiver.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/receiver.go
@@ -190,7 +190,6 @@ func (r *receiver) ReceiveTimeout(timeout time.Duration) (rm ReceivedMessage, er
// Called in proton goroutine
func (r *receiver) handleDelivery(delivery proton.Delivery) {
- // FIXME aconway 2015-09-24: how can this happen if we are remote closed?
if r.eLink.State().RemoteClosed() {
localClose(r.eLink, r.eLink.RemoteCondition().Error())
return
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/43c5cff3/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/session.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/session.go b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/session.go
index ba09690..2f609be 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/session.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/concurrent/session.go
@@ -28,6 +28,8 @@ import (
//
type Session interface {
Endpoint
+
+ // Connection owning this session.
Connection() Connection
// Sender opens a new sender. v can be a string, which is used as the Target
@@ -63,8 +65,6 @@ func (s *session) Close(err error) {
s.engine().Inject(func() { localClose(s.eSession, err) })
}
-// NewSender create a link sending to target.
-// You must call snd.Open() before calling snd.Send().
func (s *session) Sender(v interface{}) (snd Sender, err error) {
var settings LinkSettings
switch v := v.(type) {
@@ -86,7 +86,6 @@ func (s *session) Sender(v interface{}) (snd Sender, err error) {
return
}
-// Receiver opens a receiving link.
func (s *session) Receiver(v interface{}) (rcv Receiver, err error) {
var settings ReceiverSettings
switch v := v.(type) {
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/43c5cff3/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go b/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go
index b175cf6..25b43af 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/doc.go
@@ -18,24 +18,46 @@ under the License.
*/
/*
-Package proton provides a Go binding for the Qpid proton AMQP library.
-AMQP is an open standard for inter-operable message exchange, see <http://www.amqp.org/>
-Proton is an event-driven, concurrent-unsafe AMQP protocol library that allows
-you to send and receive messages using the standard AMQP concurrent protocol.
+Package proton is a Go binding for the Qpid Proton AMQP messaging toolkit (see
+http://qpid.apache.org/proton) It is a concurrent-unsafe, event-driven API that
+closely follows the Proton C API.
-For most tasks, consider using package `qpid.apache.org/proton/concurrent`. It
-provides a concurrent-safe API that is easier and more natural to use in Go.
+Package qpid.apache.org/proton/concurrent provides an alternative,
+concurrent-safe, procedural API. Most applications will find the concurrent API
+easier to use.
-The raw proton API is event-driven and not concurrent-safe. You implement a
-MessagingHandler event handler to react to AMQP protocol events. You must ensure
-that all events are handled in a single goroutine or that you serialize all all
-uses of the proton objects associated with a single connection using a lock.
-You can use channels to communicate between application goroutines and the
-event-handling goroutine, see type Event fro more detail.
+If you need direct access to the underlying proton library for some reason, this
+package provides it. The types in this package are simple wrappers for C
+pointers. They provide access to C functions as Go methods and do some trivial
+conversions, for example between Go string and C null-terminated char* strings.
-Package `qpid.apache.org/proton/concurrent` does all this for you and presents
-a simple concurrent-safe interface.
+Consult the C API documentation at http://qpid.apache.org/proton for more
+information about the types here. There is a 1-1 correspondence between C type
+pn_foo_t and Go type proton.Foo, and between C function
+
+ pn_foo_do_something(pn_foo_t*, ...)
+
+and Go method
+
+ func (proton.Foo) DoSomething(...)
+
+The proton.Engine type pumps data between a Go net.Conn connection and a
+proton.Connection goroutine that feeds events to a proton.MessagingHandler. See
+the proton.Engine documentation for more detail.
+
+EventHandler and MessagingHandler define an event handling interfaces that you
+can implement to react to protocol events. MessagingHandler provides a somewhat
+simpler set of events and automates some common tasks for you.
+
+You must ensure that all events are handled in a single goroutine or that you
+serialize all all uses of the proton objects associated with a single connection
+using a lock. You can use channels to communicate between application
+goroutines and the event-handling goroutine, see Engine documentation for more details.
+
+Package qpid.apache.org/proton/concurrent does all this for you and presents a
+simple concurrent-safe interface, for most applications you should use that
+instead.
*/
package proton
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/43c5cff3/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
index 3096280..63dc452 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/engine.go
@@ -76,10 +76,9 @@ func (b *bufferChan) buffer() []byte {
// functions (such as sending messages) are encoded and written to the
// net.Conn. Create a engine with NewEngine()
//
-// The proton protocol engine is single threaded (per connection). The Engine runs
-// proton in the goroutine that calls Engine.Run() and creates goroutines to feed
-// data to/from a net.Conn. You can create multiple Engines to handle multiple
-// connections concurrently.
+// The Engine runs a proton event loop in the goroutine that calls Engine.Run()
+// and creates goroutines to feed data to/from a net.Conn. You can create
+// multiple Engines to handle multiple connections concurrently.
//
// Methods on proton values defined in this package (Sessions, Links etc.) can
// only be called in the goroutine that executes the corresponding
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org