You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/11/15 21:05:55 UTC
[31/31] qpid-proton git commit: Updated to 0.18.1 release
Updated to 0.18.1 release
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/6c48527c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/6c48527c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/6c48527c
Branch: refs/heads/go1
Commit: 6c48527c1514dfd5add06353eb0b9e43b3b8938e
Parents: 14f7ca5 42f67b6
Author: Alan Conway <ac...@redhat.com>
Authored: Wed Nov 15 16:04:15 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Wed Nov 15 16:04:15 2017 -0500
----------------------------------------------------------------------
electron/auth_test.go | 9 +++++++++
electron/connection.go | 8 ++++++++
electron/electron_test.go | 2 +-
electron/example_client_server_test.go | 24 ++++++++++++++++++------
proton/wrappers.go | 10 ++++++++++
5 files changed, 46 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6c48527c/electron/auth_test.go
----------------------------------------------------------------------
diff --cc electron/auth_test.go
index 73a9299,0000000..9eb48c0
mode 100644,000000..100644
--- a/electron/auth_test.go
+++ b/electron/auth_test.go
@@@ -1,124 -1,0 +1,133 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
+ "fmt"
+ "io/ioutil"
+ "os"
+ "os/exec"
+ "path/filepath"
+ "strings"
+ "testing"
+)
+
+func testAuthClientServer(t *testing.T, copts []ConnectionOption, sopts []ConnectionOption) (got connectionSettings, err error) {
+ client, server := newClientServerOpts(t, copts, sopts)
+ defer closeClientServer(client, server)
+
+ go func() {
+ for in := range server.Incoming() {
+ switch in := in.(type) {
+ case *IncomingConnection:
+ got = connectionSettings{user: in.User(), virtualHost: in.VirtualHost()}
+ }
+ in.Accept()
+ }
+ }()
+
+ err = client.Sync()
+ return
+}
+
+func TestAuthAnonymous(t *testing.T) {
+ fatalIf(t, configureSASL())
+ got, err := testAuthClientServer(t,
+ []ConnectionOption{User("fred"), VirtualHost("vhost"), SASLAllowInsecure(true)},
+ []ConnectionOption{SASLAllowedMechs("ANONYMOUS"), SASLAllowInsecure(true)})
+ fatalIf(t, err)
+ errorIf(t, checkEqual(connectionSettings{user: "anonymous", virtualHost: "vhost"}, got))
+}
+
+func TestAuthPlain(t *testing.T) {
++ if !SASLExtended() {
++ t.Skip()
++ }
+ fatalIf(t, configureSASL())
+ got, err := testAuthClientServer(t,
+ []ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN"), User("fred@proton"), Password([]byte("xxx"))},
+ []ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN")})
+ fatalIf(t, err)
+ errorIf(t, checkEqual(connectionSettings{user: "fred@proton"}, got))
+}
+
+func TestAuthBadPass(t *testing.T) {
++ if !SASLExtended() {
++ t.Skip()
++ }
+ fatalIf(t, configureSASL())
+ _, err := testAuthClientServer(t,
+ []ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN"), User("fred@proton"), Password([]byte("yyy"))},
+ []ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN")})
+ if err == nil {
+ t.Error("Expected auth failure for bad pass")
+ }
+}
+
+func TestAuthBadUser(t *testing.T) {
++ if !SASLExtended() {
++ t.Skip()
++ }
+ fatalIf(t, configureSASL())
+ _, err := testAuthClientServer(t,
+ []ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN"), User("foo@bar"), Password([]byte("yyy"))},
+ []ConnectionOption{SASLAllowInsecure(true), SASLAllowedMechs("PLAIN")})
+ if err == nil {
+ t.Error("Expected auth failure for bad user")
+ }
+}
+
+var confDir string
+var confErr error
+
+func configureSASL() error {
+ if confDir != "" || confErr != nil {
+ return confErr
+ }
+ confDir, confErr = ioutil.TempDir("", "")
+ if confErr != nil {
+ return confErr
+ }
+
+ GlobalSASLConfigDir(confDir)
+ GlobalSASLConfigName("test")
+ conf := filepath.Join(confDir, "test.conf")
+
+ db := filepath.Join(confDir, "proton.sasldb")
+ cmd := exec.Command("saslpasswd2", "-c", "-p", "-f", db, "-u", "proton", "fred")
+ cmd.Stdin = strings.NewReader("xxx") // Password
+ if out, err := cmd.CombinedOutput(); err != nil {
+ confErr = fmt.Errorf("saslpasswd2 failed: %s\n%s", err, out)
+ return confErr
+ }
+ confStr := "sasldb_path: " + db + "\nmech_list: EXTERNAL DIGEST-MD5 SCRAM-SHA-1 CRAM-MD5 PLAIN ANONYMOUS\n"
+ if err := ioutil.WriteFile(conf, []byte(confStr), os.ModePerm); err != nil {
+ confErr = fmt.Errorf("write conf file %s failed: %s", conf, err)
+ }
+ return confErr
+}
+
+func TestMain(m *testing.M) {
+ status := m.Run()
+ if confDir != "" {
+ _ = os.RemoveAll(confDir)
+ }
+ os.Exit(status)
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6c48527c/electron/connection.go
----------------------------------------------------------------------
diff --cc electron/connection.go
index 267ee1e,0000000..2749b2b
mode 100644,000000..100644
--- a/electron/connection.go
+++ b/electron/connection.go
@@@ -1,413 -1,0 +1,421 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+// #include <proton/disposition.h>
+import "C"
+
+import (
+ "net"
+ "qpid.apache.org/proton"
+ "sync"
+ "time"
+)
+
+// Settings associated with a Connection.
+type ConnectionSettings interface {
+ // Authenticated user name associated with the connection.
+ User() string
+
+ // The AMQP virtual host name for the connection.
+ //
+ // Optional, useful when the server has multiple names and provides different
+ // service based on the name the client uses to connect.
+ //
+ // By default it is set to the DNS host name that the client uses to connect,
+ // but it can be set to something different at the client side with the
+ // VirtualHost() option.
+ //
+ // Returns error if the connection fails to authenticate.
+ VirtualHost() string
+
+ // Heartbeat is the maximum delay between sending frames that the remote peer
+ // has requested of us. If the interval expires an empty "heartbeat" frame
+ // will be sent automatically to keep the connection open.
+ Heartbeat() time.Duration
+}
+
+// Connection is an AMQP connection, created by a Container.
+type Connection interface {
+ Endpoint
+ ConnectionSettings
+
+ // Sender opens a new sender on the DefaultSession.
+ Sender(...LinkOption) (Sender, error)
+
+ // Receiver opens a new Receiver on the DefaultSession().
+ Receiver(...LinkOption) (Receiver, error)
+
+ // DefaultSession() returns a default session for the connection. It is opened
+ // on the first call to DefaultSession and returned on subsequent calls.
+ DefaultSession() (Session, error)
+
+ // Session opens a new session.
+ Session(...SessionOption) (Session, error)
+
+ // Container for the connection.
+ Container() Container
+
+ // Disconnect the connection abruptly with an error.
+ Disconnect(error)
+
+ // Wait waits for the connection to be disconnected.
+ Wait() error
+
+ // WaitTimeout is like Wait but returns Timeout if the timeout expires.
+ WaitTimeout(time.Duration) error
+
+ // Incoming returns a channel for incoming endpoints opened by the remote peer.
+ // See the Incoming interface for more detail.
+ //
+ // Note: this channel will first return an *IncomingConnection for the
+ // connection itself which allows you to look at security information and
+ // decide whether to Accept() or Reject() the connection. Then it will return
+ // *IncomingSession, *IncomingSender and *IncomingReceiver as they are opened
+ // by the remote end.
+ //
+ // Note 2: you must receiving from Incoming() and call Accept/Reject to avoid
+ // blocking electron event loop. Normally you would run a loop in a goroutine
+ // to handle incoming types that interest and Accept() those that don't.
+ Incoming() <-chan Incoming
+}
+
+type connectionSettings struct {
+ user, virtualHost string
+ heartbeat time.Duration
+}
+
+func (c connectionSettings) User() string { return c.user }
+func (c connectionSettings) VirtualHost() string { return c.virtualHost }
+func (c connectionSettings) Heartbeat() time.Duration { return c.heartbeat }
+
+// ConnectionOption can be passed when creating a connection to configure various options
+type ConnectionOption func(*connection)
+
+// User returns a ConnectionOption sets the user name for a connection
+func User(user string) ConnectionOption {
+ return func(c *connection) {
+ c.user = user
+ c.pConnection.SetUser(user)
+ }
+}
+
+// VirtualHost returns a ConnectionOption to set the AMQP virtual host for the connection.
+// Only applies to outbound client connection.
+func VirtualHost(virtualHost string) ConnectionOption {
+ return func(c *connection) {
+ c.virtualHost = virtualHost
+ c.pConnection.SetHostname(virtualHost)
+ }
+}
+
+// Password returns a ConnectionOption to set the password used to establish a
+// connection. Only applies to outbound client connection.
+//
+// The connection will erase its copy of the password from memory as soon as it
+// has been used to authenticate. If you are concerned about paswords staying in
+// memory you should never store them as strings, and should overwrite your
+// copy as soon as you are done with it.
+//
+func Password(password []byte) ConnectionOption {
+ return func(c *connection) { c.pConnection.SetPassword(password) }
+}
+
+// Server returns a ConnectionOption to put the connection in server mode for incoming connections.
+//
+// A server connection will do protocol negotiation to accept a incoming AMQP
+// connection. Normally you would call this for a connection created by
+// net.Listener.Accept()
+//
+func Server() ConnectionOption {
+ return func(c *connection) { c.engine.Server(); c.server = true; AllowIncoming()(c) }
+}
+
+// AllowIncoming returns a ConnectionOption to enable incoming endpoints, see
+// Connection.Incoming() This is automatically set for Server() connections.
+func AllowIncoming() ConnectionOption {
+ return func(c *connection) { c.incoming = make(chan Incoming) }
+}
+
+// Parent returns a ConnectionOption that associates the Connection with it's Container
+// If not set a connection will create its own default container.
+func Parent(cont Container) ConnectionOption {
+ return func(c *connection) { c.container = cont.(*container) }
+}
+
+type connection struct {
+ endpoint
+ connectionSettings
+
+ defaultSessionOnce, closeOnce sync.Once
+
+ container *container
+ conn net.Conn
+ server bool
+ incoming chan Incoming
+ handler *handler
+ engine *proton.Engine
+ pConnection proton.Connection
+
+ defaultSession Session
+}
+
+// NewConnection creates a connection with the given options.
+func NewConnection(conn net.Conn, opts ...ConnectionOption) (*connection, error) {
+ c := &connection{
+ conn: conn,
+ }
+ c.handler = newHandler(c)
+ var err error
+ c.engine, err = proton.NewEngine(c.conn, c.handler.delegator)
+ if err != nil {
+ return nil, err
+ }
+ c.pConnection = c.engine.Connection()
+ for _, set := range opts {
+ set(c)
+ }
+ if c.container == nil {
+ c.container = NewContainer("").(*container)
+ }
+ c.pConnection.SetContainer(c.container.Id())
+ globalSASLInit(c.engine)
+
+ c.endpoint.init(c.engine.String())
+ go c.run()
+ return c, nil
+}
+
+func (c *connection) run() {
+ if !c.server {
+ c.pConnection.Open()
+ }
+ _ = c.engine.Run()
+ if c.incoming != nil {
+ close(c.incoming)
+ }
+ _ = c.closed(Closed)
+}
+
+func (c *connection) Close(err error) {
+ c.err.Set(err)
+ c.engine.Close(err)
+}
+
+func (c *connection) Disconnect(err error) {
+ c.err.Set(err)
+ c.engine.Disconnect(err)
+}
+
+func (c *connection) Session(opts ...SessionOption) (Session, error) {
+ var s Session
+ err := c.engine.InjectWait(func() error {
+ if c.Error() != nil {
+ return c.Error()
+ }
+ pSession, err := c.engine.Connection().Session()
+ if err == nil {
+ pSession.Open()
+ if err == nil {
+ s = newSession(c, pSession, opts...)
+ }
+ }
+ return err
+ })
+ return s, err
+}
+
+func (c *connection) Container() Container { return c.container }
+
+func (c *connection) DefaultSession() (s Session, err error) {
+ c.defaultSessionOnce.Do(func() {
+ c.defaultSession, err = c.Session()
+ })
+ if err == nil {
+ err = c.Error()
+ }
+ return c.defaultSession, err
+}
+
+func (c *connection) Sender(opts ...LinkOption) (Sender, error) {
+ if s, err := c.DefaultSession(); err == nil {
+ return s.Sender(opts...)
+ } else {
+ return nil, err
+ }
+}
+
+func (c *connection) Receiver(opts ...LinkOption) (Receiver, error) {
+ if s, err := c.DefaultSession(); err == nil {
+ return s.Receiver(opts...)
+ } else {
+ return nil, err
+ }
+}
+
+func (c *connection) Connection() Connection { return c }
+
+func (c *connection) Wait() error { return c.WaitTimeout(Forever) }
+func (c *connection) WaitTimeout(timeout time.Duration) error {
+ _, err := timedReceive(c.done, timeout)
+ if err == Timeout {
+ return Timeout
+ }
+ return c.Error()
+}
+
+func (c *connection) Incoming() <-chan Incoming {
+ assert(c.incoming != nil, "Incoming() is only allowed for a Connection created with the Server() option: %s", c)
+ return c.incoming
+}
+
+type IncomingConnection struct {
+ incoming
+ connectionSettings
+ c *connection
+}
+
+func newIncomingConnection(c *connection) *IncomingConnection {
+ c.user = c.pConnection.Transport().User()
+ c.virtualHost = c.pConnection.RemoteHostname()
+ return &IncomingConnection{
+ incoming: makeIncoming(c.pConnection),
+ connectionSettings: c.connectionSettings,
+ c: c}
+}
+
+// AcceptConnection is like Accept() but takes ConnectionOption s
+// For example you can set the Heartbeat() for the accepted connection.
+func (in *IncomingConnection) AcceptConnection(opts ...ConnectionOption) Connection {
+ return in.accept(func() Endpoint {
+ for _, opt := range opts {
+ opt(in.c)
+ }
+ in.c.pConnection.Open()
+ return in.c
+ }).(Connection)
+}
+
+func (in *IncomingConnection) Accept() Endpoint {
+ return in.AcceptConnection()
+}
+
+func sasl(c *connection) proton.SASL { return c.engine.Transport().SASL() }
+
+// SASLEnable returns a ConnectionOption that enables SASL authentication.
+// Only required if you don't set any other SASL options.
+func SASLEnable() ConnectionOption { return func(c *connection) { sasl(c) } }
+
+// SASLAllowedMechs returns a ConnectionOption to set the list of allowed SASL
+// mechanisms.
+//
+// Can be used on the client or the server to restrict the SASL for a connection.
+// mechs is a space-separated list of mechanism names.
+//
+func SASLAllowedMechs(mechs string) ConnectionOption {
+ return func(c *connection) { sasl(c).AllowedMechs(mechs) }
+}
+
+// SASLAllowInsecure returns a ConnectionOption that allows or disallows clear
+// text SASL authentication mechanisms
+//
+// By default the SASL layer is configured not to allow mechanisms that disclose
+// the clear text of the password over an unencrypted AMQP connection. This specifically
+// will disallow the use of the PLAIN mechanism without using SSL encryption.
+//
+// This default is to avoid disclosing password information accidentally over an
+// insecure network.
+//
+func SASLAllowInsecure(b bool) ConnectionOption {
+ return func(c *connection) { sasl(c).SetAllowInsecureMechs(b) }
+}
+
+// Heartbeat returns a ConnectionOption that requests the maximum delay
+// between sending frames for the remote peer. If we don't receive any frames
+// within 2*delay we will close the connection.
+//
+func Heartbeat(delay time.Duration) ConnectionOption {
+ // Proton-C divides the idle-timeout by 2 before sending, so compensate.
+ return func(c *connection) { c.engine.Transport().SetIdleTimeout(2 * delay) }
+}
+
+// GlobalSASLConfigDir sets the SASL configuration directory for every
+// Connection created in this process. If not called, the default is determined
+// by your SASL installation.
+//
+// You can set SASLAllowInsecure and SASLAllowedMechs on individual connections.
+//
+func GlobalSASLConfigDir(dir string) { globalSASLConfigDir = dir }
+
+// GlobalSASLConfigName sets the SASL configuration name for every Connection
+// created in this process. If not called the default is "proton-server".
+//
+// The complete configuration file name is
+// <sasl-config-dir>/<sasl-config-name>.conf
+//
+// You can set SASLAllowInsecure and SASLAllowedMechs on individual connections.
+//
+func GlobalSASLConfigName(dir string) { globalSASLConfigName = dir }
+
++// Do we support extended SASL negotiation?
++// All implementations of Proton support ANONYMOUS and EXTERNAL on both
++// client and server sides and PLAIN on the client side.
++//
++// Extended SASL implememtations use an external library (Cyrus SASL)
++// to support other mechanisms beyond these basic ones.
++func SASLExtended() bool { return proton.SASLExtended() }
++
+var (
+ globalSASLConfigName string
+ globalSASLConfigDir string
+)
+
+// TODO aconway 2016-09-15: Current pn_sasl C impl config is broken, so all we
+// can realistically offer is global configuration. Later if/when the pn_sasl C
+// impl is fixed we can offer per connection over-rides.
+func globalSASLInit(eng *proton.Engine) {
+ sasl := eng.Transport().SASL()
+ if globalSASLConfigName != "" {
+ sasl.ConfigName(globalSASLConfigName)
+ }
+ if globalSASLConfigDir != "" {
+ sasl.ConfigPath(globalSASLConfigDir)
+ }
+}
+
+// Dial is shorthand for using net.Dial() then NewConnection()
+// See net.Dial() for the meaning of the network, address arguments.
+func Dial(network, address string, opts ...ConnectionOption) (c Connection, err error) {
+ conn, err := net.Dial(network, address)
+ if err == nil {
+ c, err = NewConnection(conn, opts...)
+ }
+ return
+}
+
+// DialWithDialer is shorthand for using dialer.Dial() then NewConnection()
+// See net.Dial() for the meaning of the network, address arguments.
+func DialWithDialer(dialer *net.Dialer, network, address string, opts ...ConnectionOption) (c Connection, err error) {
+ conn, err := dialer.Dial(network, address)
+ if err == nil {
+ c, err = NewConnection(conn, opts...)
+ }
+ return
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6c48527c/electron/electron_test.go
----------------------------------------------------------------------
diff --cc electron/electron_test.go
index 294e952,0000000..4cd8453
mode 100644,000000..100644
--- a/electron/electron_test.go
+++ b/electron/electron_test.go
@@@ -1,546 -1,0 +1,546 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
+ "fmt"
+ "net"
+ "path"
+ "qpid.apache.org/amqp"
+ "reflect"
+ "runtime"
+ "testing"
+ "time"
+)
+
+func fatalIf(t *testing.T, err error) {
+ if err != nil {
+ _, file, line, ok := runtime.Caller(1) // annotate with location of caller.
+ if ok {
+ _, file = path.Split(file)
+ }
+ t.Fatalf("(from %s:%d) %v", file, line, err)
+ }
+}
+
+func errorIf(t *testing.T, err error) {
+ if err != nil {
+ _, file, line, ok := runtime.Caller(1) // annotate with location of caller.
+ if ok {
+ _, file = path.Split(file)
+ }
+ t.Errorf("(from %s:%d) %v", file, line, err)
+ }
+}
+
+func checkEqual(want interface{}, got interface{}) error {
+ if !reflect.DeepEqual(want, got) {
+ return fmt.Errorf("%#v != %#v", want, got)
+ }
+ return nil
+}
+
+// Start a server, return listening addr and channel for incoming Connections.
+func newServer(t *testing.T, cont Container, opts ...ConnectionOption) (net.Addr, <-chan Connection) {
- listener, err := net.Listen("tcp", "")
++ listener, err := net.Listen("tcp4", "") // For systems with ipv6 disabled
+ fatalIf(t, err)
+ addr := listener.Addr()
+ ch := make(chan Connection)
+ go func() {
+ conn, err := listener.Accept()
+ c, err := cont.Connection(conn, append([]ConnectionOption{Server()}, opts...)...)
+ fatalIf(t, err)
+ ch <- c
+ }()
+ return addr, ch
+}
+
+// Open a client connection and session, return the session.
+func newClient(t *testing.T, cont Container, addr net.Addr, opts ...ConnectionOption) Session {
+ conn, err := net.Dial(addr.Network(), addr.String())
+ fatalIf(t, err)
+ c, err := cont.Connection(conn, opts...)
+ fatalIf(t, err)
+ sn, err := c.Session()
+ fatalIf(t, err)
+ return sn
+}
+
+// Return client and server ends of the same connection.
+func newClientServerOpts(t *testing.T, copts []ConnectionOption, sopts []ConnectionOption) (client Session, server Connection) {
+ addr, ch := newServer(t, NewContainer("test-server"), sopts...)
+ client = newClient(t, NewContainer("test-client"), addr, copts...)
+ return client, <-ch
+}
+
+// Return client and server ends of the same connection.
+func newClientServer(t *testing.T) (client Session, server Connection) {
+ return newClientServerOpts(t, nil, nil)
+}
+
+// Close client and server
+func closeClientServer(client Session, server Connection) {
+ client.Connection().Close(nil)
+ server.Close(nil)
+}
+
+// Send a message one way with a client sender and server receiver, verify ack.
+func TestClientSendServerReceive(t *testing.T) {
+ nLinks := 3
+ nMessages := 3
+
+ rchan := make(chan Receiver, nLinks)
+ client, server := newClientServer(t)
+ go func() {
+ for in := range server.Incoming() {
+ switch in := in.(type) {
+ case *IncomingReceiver:
+ in.SetCapacity(1)
+ in.SetPrefetch(false)
+ rchan <- in.Accept().(Receiver)
+ default:
+ in.Accept()
+ }
+ }
+ }()
+
+ defer func() { closeClientServer(client, server) }()
+
+ s := make([]Sender, nLinks)
+ for i := 0; i < nLinks; i++ {
+ var err error
+ s[i], err = client.Sender(Target(fmt.Sprintf("foo%d", i)))
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+ r := make([]Receiver, nLinks)
+ for i := 0; i < nLinks; i++ {
+ r[i] = <-rchan
+ }
+
+ for i := 0; i < nLinks; i++ {
+ for j := 0; j < nMessages; j++ {
+ // Client send
+ ack := make(chan Outcome, 1)
+ sendDone := make(chan struct{})
+ go func() {
+ defer close(sendDone)
+ m := amqp.NewMessageWith(fmt.Sprintf("foobar%v-%v", i, j))
+ var err error
+ s[i].SendAsync(m, ack, "testing")
+ if err != nil {
+ t.Fatal(err)
+ }
+ }()
+
+ // Server recieve
+ rm, err := r[i].Receive()
+ if err != nil {
+ t.Fatal(err)
+ }
+ if want, got := interface{}(fmt.Sprintf("foobar%v-%v", i, j)), rm.Message.Body(); want != got {
+ t.Errorf("%#v != %#v", want, got)
+ }
+
+ // Should not be acknowledged on client yet
+ <-sendDone
+ select {
+ case <-ack:
+ t.Errorf("unexpected ack")
+ default:
+ }
+
+ // Server send ack
+ if err := rm.Reject(); err != nil {
+ t.Error(err)
+ }
+ // Client get ack.
+ if a := <-ack; a.Value != "testing" || a.Error != nil || a.Status != Rejected {
+ t.Error("unexpected ack: ", a.Status, a.Error, a.Value)
+ }
+ }
+ }
+}
+
+func TestClientReceiver(t *testing.T) {
+ nMessages := 3
+ client, server := newClientServer(t)
+ go func() {
+ for in := range server.Incoming() {
+ switch in := in.(type) {
+ case *IncomingSender:
+ s := in.Accept().(Sender)
+ go func() {
+ for i := int32(0); i < int32(nMessages); i++ {
+ out := s.SendSync(amqp.NewMessageWith(i))
+ if out.Error != nil {
+ t.Error(out.Error)
+ return
+ }
+ }
+ s.Close(nil)
+ }()
+ default:
+ in.Accept()
+ }
+ }
+ }()
+
+ r, err := client.Receiver(Source("foo"))
+ if err != nil {
+ t.Fatal(err)
+ }
+ for i := int32(0); i < int32(nMessages); i++ {
+ rm, err := r.Receive()
+ if err != nil {
+ if err != Closed {
+ t.Error(err)
+ }
+ break
+ }
+ if err := rm.Accept(); err != nil {
+ t.Error(err)
+ }
+ if b, ok := rm.Message.Body().(int32); !ok || b != i {
+ t.Errorf("want %v, true got %v, %v", i, b, ok)
+ }
+ }
+ server.Close(nil)
+ client.Connection().Close(nil)
+}
+
+// Test timeout versions of waiting functions.
+func TestTimeouts(t *testing.T) {
+ var err error
+ rchan := make(chan Receiver, 1)
+ client, server := newClientServer(t)
+ go func() {
+ for i := range server.Incoming() {
+ switch i := i.(type) {
+ case *IncomingReceiver:
+ i.SetCapacity(1)
+ i.SetPrefetch(false)
+ rchan <- i.Accept().(Receiver) // Issue credit only on receive
+ default:
+ i.Accept()
+ }
+ }
+ }()
+ defer func() { closeClientServer(client, server) }()
+
+ // Open client sender
+ snd, err := client.Sender(Target("test"))
+ if err != nil {
+ t.Fatal(err)
+ }
+ rcv := <-rchan
+
+ // Test send with timeout
+ short := time.Millisecond
+ long := time.Second
+ m := amqp.NewMessage()
+ if err := snd.SendSyncTimeout(m, 0).Error; err != Timeout { // No credit, expect timeout.
+ t.Error("want Timeout got", err)
+ }
+ if err := snd.SendSyncTimeout(m, short).Error; err != Timeout { // No credit, expect timeout.
+ t.Error("want Timeout got", err)
+ }
+ // Test receive with timeout
+ if _, err = rcv.ReceiveTimeout(0); err != Timeout { // No credit, expect timeout.
+ t.Error("want Timeout got", err)
+ }
+ // Test receive with timeout
+ if _, err = rcv.ReceiveTimeout(short); err != Timeout { // No credit, expect timeout.
+ t.Error("want Timeout got", err)
+ }
+ // There is now a credit on the link due to receive
+ ack := make(chan Outcome)
+ snd.SendAsyncTimeout(m, ack, nil, short)
+ // Disposition should timeout
+ select {
+ case <-ack:
+ t.Errorf("want Timeout got %#v", ack)
+ case <-time.After(short):
+ }
+
+ // Receive and accept
+ rm, err := rcv.ReceiveTimeout(long)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if err := rm.Accept(); err != nil {
+ t.Fatal(err)
+ }
+ // Sender get ack
+ if a := <-ack; a.Status != Accepted || a.Error != nil {
+ t.Errorf("want (accepted, nil) got %#v", a)
+ }
+}
+
+// A server that returns the opposite end of each client link via channels.
+type pairs struct {
+ t *testing.T
+ client Session
+ server Connection
+ rchan chan Receiver
+ schan chan Sender
+ capacity int
+ prefetch bool
+}
+
+func newPairs(t *testing.T, capacity int, prefetch bool) *pairs {
+ p := &pairs{t: t, rchan: make(chan Receiver, 1), schan: make(chan Sender, 1)}
+ p.client, p.server = newClientServer(t)
+ go func() {
+ for i := range p.server.Incoming() {
+ switch i := i.(type) {
+ case *IncomingReceiver:
+ i.SetCapacity(capacity)
+ i.SetPrefetch(prefetch)
+ p.rchan <- i.Accept().(Receiver)
+ case *IncomingSender:
+ p.schan <- i.Accept().(Sender)
+ default:
+ i.Accept()
+ }
+ }
+ }()
+ return p
+}
+
+func (p *pairs) close() {
+ closeClientServer(p.client, p.server)
+}
+
+// Return a client sender and server receiver
+func (p *pairs) senderReceiver() (Sender, Receiver) {
+ snd, err := p.client.Sender()
+ fatalIf(p.t, err)
+ rcv := <-p.rchan
+ return snd, rcv
+}
+
+// Return a client receiver and server sender
+func (p *pairs) receiverSender() (Receiver, Sender) {
+ rcv, err := p.client.Receiver()
+ fatalIf(p.t, err)
+ snd := <-p.schan
+ return rcv, snd
+}
+
+type result struct {
+ label string
+ err error
+ value interface{}
+}
+
+func (r result) String() string { return fmt.Sprintf("%v(%v)", r.err, r.label) }
+
+func doSend(snd Sender, results chan result) {
+ err := snd.SendSync(amqp.NewMessage()).Error
+ results <- result{"send", err, nil}
+}
+
+func doReceive(rcv Receiver, results chan result) {
+ msg, err := rcv.Receive()
+ results <- result{"receive", err, msg}
+}
+
+func doDisposition(ack <-chan Outcome, results chan result) {
+ results <- result{"disposition", (<-ack).Error, nil}
+}
+
+// Senders get credit immediately if receivers have prefetch set
+func TestSendReceivePrefetch(t *testing.T) {
+ pairs := newPairs(t, 1, true)
+ s, r := pairs.senderReceiver()
+ s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // Should not block for credit.
+ if _, err := r.Receive(); err != nil {
+ t.Error(err)
+ }
+}
+
+// Senders do not get credit till Receive() if receivers don't have prefetch
+func TestSendReceiveNoPrefetch(t *testing.T) {
+ pairs := newPairs(t, 1, false)
+ s, r := pairs.senderReceiver()
+ done := make(chan struct{}, 1)
+ go func() {
+ s.SendAsyncTimeout(amqp.NewMessage(), nil, nil, time.Second) // Should block for credit.
+ close(done)
+ }()
+ select {
+ case <-done:
+ t.Errorf("send should be blocked on credit")
+ default:
+ if _, err := r.Receive(); err != nil {
+ t.Error(err)
+ } else {
+ <-done
+ } // Should be unblocked now
+ }
+}
+
+// Test that closing Links interrupts blocked link functions.
+func TestLinkCloseInterrupt(t *testing.T) {
+ want := amqp.Error{Name: "x", Description: "all bad"}
+ pairs := newPairs(t, 1, false)
+ results := make(chan result) // Collect expected errors
+
+ // Note closing the link does not interrupt Send() calls, the AMQP spec says
+ // that deliveries can be settled after the link is closed.
+
+ // Receiver.Close() interrupts Receive()
+ snd, rcv := pairs.senderReceiver()
+ go doReceive(rcv, results)
+ rcv.Close(want)
+ if r := <-results; want != r.err {
+ t.Errorf("want %#v got %#v", want, r)
+ }
+
+ // Remote Sender.Close() interrupts Receive()
+ snd, rcv = pairs.senderReceiver()
+ go doReceive(rcv, results)
+ snd.Close(want)
+ if r := <-results; want != r.err {
+ t.Errorf("want %#v got %#v", want, r)
+ }
+}
+
+// Test closing the server end of a connection.
+func TestConnectionCloseInterrupt1(t *testing.T) {
+ want := amqp.Error{Name: "x", Description: "bad"}
+ pairs := newPairs(t, 1, true)
+ results := make(chan result) // Collect expected errors
+
+ // Connection.Close() interrupts Send, Receive, Disposition.
+ snd, rcv := pairs.senderReceiver()
+ go doSend(snd, results)
+
+ if _, err := rcv.Receive(); err != nil {
+ t.Error("receive", err)
+ }
+ rcv, snd = pairs.receiverSender()
+ go doReceive(rcv, results)
+
+ snd, rcv = pairs.senderReceiver()
+ ack := snd.SendWaitable(amqp.NewMessage())
+ if _, err := rcv.Receive(); err != nil {
+ t.Error("receive", err)
+ }
+ go doDisposition(ack, results)
+
+ pairs.server.Close(want)
+ for i := 0; i < 3; i++ {
+ if r := <-results; want != r.err {
+ t.Errorf("want %v got %v", want, r)
+ }
+ }
+}
+
+// Test closing the client end of the connection.
+func TestConnectionCloseInterrupt2(t *testing.T) {
+ want := amqp.Error{Name: "x", Description: "bad"}
+ pairs := newPairs(t, 1, true)
+ results := make(chan result) // Collect expected errors
+
+ // Connection.Close() interrupts Send, Receive, Disposition.
+ snd, rcv := pairs.senderReceiver()
+ go doSend(snd, results)
+ if _, err := rcv.Receive(); err != nil {
+ t.Error("receive", err)
+ }
+
+ rcv, snd = pairs.receiverSender()
+ go doReceive(rcv, results)
+
+ snd, rcv = pairs.senderReceiver()
+ ack := snd.SendWaitable(amqp.NewMessage())
+ go doDisposition(ack, results)
+
+ pairs.client.Connection().Close(want)
+ for i := 0; i < 3; i++ {
+ if r := <-results; want != r.err {
+ t.Errorf("want %v got %v", want, r.err)
+ }
+ }
+}
+
+func heartbeat(c Connection) time.Duration {
+ return c.(*connection).engine.Transport().RemoteIdleTimeout()
+}
+
+func TestHeartbeat(t *testing.T) {
+ client, server := newClientServerOpts(t,
+ []ConnectionOption{Heartbeat(102 * time.Millisecond)},
+ nil)
+ defer closeClientServer(client, server)
+
+ var serverHeartbeat time.Duration
+
+ go func() {
+ for in := range server.Incoming() {
+ switch in := in.(type) {
+ case *IncomingConnection:
+ serverHeartbeat = in.Heartbeat()
+ in.AcceptConnection(Heartbeat(101 * time.Millisecond))
+ default:
+ in.Accept()
+ }
+ }
+ }()
+
+ // Freeze the server to stop it sending heartbeats.
+ unfreeze := make(chan bool)
+ defer close(unfreeze)
+ freeze := func() error { return server.(*connection).engine.Inject(func() { <-unfreeze }) }
+
+ fatalIf(t, client.Sync())
+ errorIf(t, checkEqual(101*time.Millisecond, heartbeat(client.Connection())))
+ errorIf(t, checkEqual(102*time.Millisecond, serverHeartbeat))
+ errorIf(t, client.Connection().Error())
+
+ // Freeze the server for less than a heartbeat
+ fatalIf(t, freeze())
+ time.Sleep(50 * time.Millisecond)
+ unfreeze <- true
+ // Make sure server is still responding.
+ s, err := client.Sender()
+ errorIf(t, err)
+ errorIf(t, s.Sync())
+
+ // Freeze the server till the client times out the connection
+ fatalIf(t, freeze())
+ select {
+ case <-client.Done():
+ if amqp.ResourceLimitExceeded != client.Error().(amqp.Error).Name {
+ t.Error("bad timeout error:", client.Error())
+ }
+ case <-time.After(400 * time.Millisecond):
+ t.Error("connection failed to time out")
+ }
+
+ unfreeze <- true // Unfreeze the server
+ <-server.Done()
+ if amqp.ResourceLimitExceeded != server.Error().(amqp.Error).Name {
+ t.Error("bad timeout error:", server.Error())
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6c48527c/electron/example_client_server_test.go
----------------------------------------------------------------------
diff --cc electron/example_client_server_test.go
index 3aa5892,0000000..385c865
mode 100644,000000..100644
--- a/electron/example_client_server_test.go
+++ b/electron/example_client_server_test.go
@@@ -1,85 -1,0 +1,97 @@@
+package electron_test
+
+import (
+ "fmt"
++ "log"
+ "net"
+ "qpid.apache.org/amqp"
+ "qpid.apache.org/electron"
+ "sync"
+)
+
+// Example Server that accepts a single Connection, Session and Receiver link
+// and prints messages received until the link closes.
+func Server(l net.Listener) {
+ cont := electron.NewContainer("server")
- c, _ := cont.Accept(l) // Ignoring error handling
- l.Close() // This server only accepts one connection
++ c, err := cont.Accept(l)
++ if err != nil {
++ log.Fatal(err)
++ }
++ l.Close() // This server only accepts one connection
+ // Process incoming endpoints till we get a Receiver link
+ var r electron.Receiver
+ for r == nil {
+ in := <-c.Incoming()
+ switch in := in.(type) {
+ case *electron.IncomingSession, *electron.IncomingConnection:
+ in.Accept() // Accept the incoming connection and session for the receiver
+ case *electron.IncomingReceiver:
+ in.SetCapacity(10)
+ in.SetPrefetch(true) // Automatic flow control for a buffer of 10 messages.
+ r = in.Accept().(electron.Receiver)
+ case nil:
+ return // Connection is closed
+ default:
+ in.Reject(amqp.Errorf("example-server", "unexpected endpoint %v", in))
+ }
+ }
+ go func() { // Reject any further incoming endpoints
+ for in := range c.Incoming() {
+ in.Reject(amqp.Errorf("example-server", "unexpected endpoint %v", in))
+ }
+ }()
+ // Receive messages till the Receiver closes
+ rm, err := r.Receive()
+ for ; err == nil; rm, err = r.Receive() {
+ fmt.Printf("server received: %q\n", rm.Message.Body())
+ rm.Accept() // Signal to the client that the message was accepted
+ }
+ fmt.Printf("server receiver closed: %v\n", err)
+}
+
+// Example client sending messages to a server running in a goroutine.
+//
+// Normally client and server would be separate processes. For more realistic and detailed examples:
+// https://github.com/apache/qpid-proton/blob/master/examples/go/README.md
+//
+func Example_clientServer() {
- // NOTE: We ignoring error handling in this example
- l, _ := net.Listen("tcp", "") // Open a listening port for server, client connect to this port
++ l, err := net.Listen("tcp", "127.0.0.1:0") // tcp4 so example will work on ipv6-disabled platforms
++ if err != nil {
++ log.Fatal(err)
++ }
+
+ // SERVER: start the server running in a separate goroutine
+ var waitServer sync.WaitGroup // We will wait for the server goroutine to finish before exiting
+ waitServer.Add(1)
+ go func() { // Run the server in the background
+ defer waitServer.Done()
+ Server(l)
+ }()
+
+ // CLIENT: Send messages to the server
+ addr := l.Addr()
- c, _ := electron.Dial(addr.Network(), addr.String())
- s, _ := c.Sender()
++ c, err := electron.Dial(addr.Network(), addr.String())
++ if err != nil {
++ log.Fatal(err)
++ }
++ s, err := c.Sender()
++ if err != nil {
++ log.Fatal(err)
++ }
+ for i := 0; i < 3; i++ {
+ msg := fmt.Sprintf("hello %v", i)
+ // Send and wait for the Outcome from the server.
+ // Note: For higher throughput, use SendAsync() to send a stream of messages
+ // and process the returning stream of Outcomes concurrently.
+ s.SendSync(amqp.NewMessageWith(msg))
+ }
+ c.Close(nil) // Closing the connection will stop the server
+
+ waitServer.Wait() // Let the server finish
+
+ // Output:
+ // server received: "hello 0"
+ // server received: "hello 1"
+ // server received: "hello 2"
+ // server receiver closed: EOF
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/6c48527c/proton/wrappers.go
----------------------------------------------------------------------
diff --cc proton/wrappers.go
index 879ad53,0000000..09f3e65
mode 100644,000000..100644
--- a/proton/wrappers.go
+++ b/proton/wrappers.go
@@@ -1,450 -1,0 +1,460 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+// This file contains special-case wrapper functions or wrappers that don't follow
+// the pattern of genwrap.go.
+
+package proton
+
+//#include <proton/codec.h>
+//#include <proton/connection.h>
+//#include <proton/delivery.h>
+//#include <proton/event.h>
+//#include <proton/link.h>
+//#include <proton/link.h>
+//#include <proton/object.h>
+//#include <proton/sasl.h>
+//#include <proton/session.h>
+//#include <proton/transport.h>
+//#include <stdlib.h>
+import "C"
+
+import (
+ "fmt"
+ "qpid.apache.org/amqp"
+ "reflect"
+ "time"
+ "unsafe"
+)
+
+// TODO aconway 2015-05-05: Documentation for generated types.
+
+// CHandle holds an unsafe.Pointer to a proton C struct, the C type depends on the
+// Go type implementing this interface. For low level, at-your-own-risk use only.
+type CHandle interface {
+ // CPtr returns the unsafe C pointer, equivalent to a C void*.
+ CPtr() unsafe.Pointer
+}
+
+// Incref increases the refcount of a proton value, which prevents the
+// underlying C struct being freed until you call Decref().
+//
+// It can be useful to "pin" a proton value in memory while it is in use by
+// goroutines other than the event loop goroutine. For example if you Incref() a
+// Link, the underlying object is not freed when the link is closed, so means
+// other goroutines can continue to safely use it as an index in a map or inject
+// it into the event loop goroutine. There will of course be an error if you try
+// to use a link after it is closed, but not a segmentation fault.
+func Incref(c CHandle) {
+ if p := c.CPtr(); p != nil {
+ C.pn_incref(p)
+ }
+}
+
+// Decref decreases the refcount of a proton value, freeing the underlying C
+// struct if this is the last reference. Only call this if you previously
+// called Incref() for this value.
+func Decref(c CHandle) {
+ if p := c.CPtr(); p != nil {
+ C.pn_decref(p)
+ }
+}
+
+// Event is an AMQP protocol event.
+type Event struct {
+ pn *C.pn_event_t
+ eventType EventType
+ connection Connection
+ transport Transport
+ session Session
+ link Link
+ delivery Delivery
+ injecter Injecter
+}
+
+func makeEvent(pn *C.pn_event_t, injecter Injecter) Event {
+ return Event{
+ pn: pn,
+ eventType: EventType(C.pn_event_type(pn)),
+ connection: Connection{C.pn_event_connection(pn)},
+ transport: Transport{C.pn_event_transport(pn)},
+ session: Session{C.pn_event_session(pn)},
+ link: Link{C.pn_event_link(pn)},
+ delivery: Delivery{C.pn_event_delivery(pn)},
+ injecter: injecter,
+ }
+}
+func (e Event) IsNil() bool { return e.eventType == EventType(0) }
+func (e Event) Type() EventType { return e.eventType }
+func (e Event) Connection() Connection { return e.connection }
+func (e Event) Transport() Transport { return e.transport }
+func (e Event) Session() Session { return e.session }
+func (e Event) Link() Link { return e.link }
+func (e Event) Delivery() Delivery { return e.delivery }
+func (e Event) String() string { return e.Type().String() }
+
+// Injecter should not be used in a handler function, but it can be passed to
+// other goroutines (via a channel or to a goroutine started by handler
+// functions) to let them inject functions back into the handlers goroutine.
+func (e Event) Injecter() Injecter { return e.injecter }
+
+// Data is an intermediate form of decoded AMQP data.
+type Data struct{ pn *C.pn_data_t }
+
+func (d Data) Free() { C.pn_data_free(d.pn) }
+func (d Data) CPtr() unsafe.Pointer { return unsafe.Pointer(d.pn) }
+func (d Data) Clear() { C.pn_data_clear(d.pn) }
+func (d Data) Rewind() { C.pn_data_rewind(d.pn) }
+func (d Data) Next() { C.pn_data_next(d.pn) }
+func (d Data) Error() error { return PnError(C.pn_data_error(d.pn)) }
+func (d Data) Empty() bool { return C.pn_data_size(d.pn) == 0 }
+
+func (d Data) String() string {
+ str := C.pn_string(C.CString(""))
+ defer C.pn_free(unsafe.Pointer(str))
+ C.pn_inspect(unsafe.Pointer(d.pn), str)
+ return C.GoString(C.pn_string_get(str))
+}
+
+// Unmarshal the value of d into value pointed at by ptr, see amqp.Unmarshal() for details
+func (d Data) Unmarshal(ptr interface{}) error {
+ d.Rewind()
+ d.Next()
+ err := amqp.UnmarshalUnsafe(d.CPtr(), ptr)
+ return err
+}
+
+// Marshal the value v into d, see amqp.Marshal() for details
+func (d Data) Marshal(v interface{}) error {
+ d.Clear()
+ return amqp.MarshalUnsafe(v, d.CPtr())
+}
+
+// State holds the state flags for an AMQP endpoint.
+type State byte
+
+const (
+ SLocalUninit State = C.PN_LOCAL_UNINIT
+ SLocalActive = C.PN_LOCAL_ACTIVE
+ SLocalClosed = C.PN_LOCAL_CLOSED
+ SRemoteUninit = C.PN_REMOTE_UNINIT
+ SRemoteActive = C.PN_REMOTE_ACTIVE
+ SRemoteClosed = C.PN_REMOTE_CLOSED
+)
+
+// Has is True if bits & state is non 0.
+func (s State) Has(bits State) bool { return s&bits != 0 }
+
+func (s State) LocalUninit() bool { return s.Has(SLocalUninit) }
+func (s State) LocalActive() bool { return s.Has(SLocalActive) }
+func (s State) LocalClosed() bool { return s.Has(SLocalClosed) }
+func (s State) RemoteUninit() bool { return s.Has(SRemoteUninit) }
+func (s State) RemoteActive() bool { return s.Has(SRemoteActive) }
+func (s State) RemoteClosed() bool { return s.Has(SRemoteClosed) }
+
+// Return a State containig just the local flags
+func (s State) Local() State { return State(s & C.PN_LOCAL_MASK) }
+
+// Return a State containig just the remote flags
+func (s State) Remote() State { return State(s & C.PN_REMOTE_MASK) }
+
+// Endpoint is the common interface for Connection, Link and Session.
+type Endpoint interface {
+ // State is the open/closed state.
+ State() State
+ // Open an endpoint.
+ Open()
+ // Close an endpoint.
+ Close()
+ // Condition holds a local error condition.
+ Condition() Condition
+ // RemoteCondition holds a remote error condition.
+ RemoteCondition() Condition
+ // Human readable name
+ String() string
+ // Human readable endpoint type "sender-link", "session" etc.
+ Type() string
+}
+
+// CloseError sets an error condition (if err != nil) on an endpoint and closes
+// the endpoint if not already closed
+func CloseError(e Endpoint, err error) {
+ if err != nil && !e.Condition().IsSet() {
+ e.Condition().SetError(err)
+ }
+ e.Close()
+}
+
+// EndpointError returns the remote error if there is one, the local error if not
+// nil if there is no error.
+func EndpointError(e Endpoint) error {
+ err := e.RemoteCondition().Error()
+ if err == nil {
+ err = e.Condition().Error()
+ }
+ return err
+}
+
+const (
+ Received uint64 = C.PN_RECEIVED
+ Accepted = C.PN_ACCEPTED
+ Rejected = C.PN_REJECTED
+ Released = C.PN_RELEASED
+ Modified = C.PN_MODIFIED
+)
+
+// SettleAs is equivalent to d.Update(disposition); d.Settle()
+func (d Delivery) SettleAs(disposition uint64) {
+ d.Update(disposition)
+ d.Settle()
+}
+
+// Accept accepts and settles a delivery.
+func (d Delivery) Accept() { d.SettleAs(Accepted) }
+
+// Reject rejects and settles a delivery
+func (d Delivery) Reject() { d.SettleAs(Rejected) }
+
+// Release releases and settles a delivery
+// If delivered is true the delivery count for the message will be increased.
+func (d Delivery) Release(delivered bool) {
+ if delivered {
+ d.SettleAs(Modified)
+ } else {
+ d.SettleAs(Released)
+ }
+}
+
+type DeliveryTag struct{ pn C.pn_delivery_tag_t }
+
+func (t DeliveryTag) String() string { return C.GoStringN(t.pn.start, C.int(t.pn.size)) }
+
+func (l Link) Recv(buf []byte) int {
+ if len(buf) == 0 {
+ return 0
+ }
+ return int(C.pn_link_recv(l.pn, (*C.char)(unsafe.Pointer(&buf[0])), C.size_t(len(buf))))
+}
+
+func (l Link) SendBytes(bytes []byte) int {
+ return int(C.pn_link_send(l.pn, cPtr(bytes), cLen(bytes)))
+}
+
+func pnTag(tag string) C.pn_delivery_tag_t {
+ bytes := []byte(tag)
+ return C.pn_dtag(cPtr(bytes), cLen(bytes))
+}
+
+func (l Link) Delivery(tag string) Delivery {
+ return Delivery{C.pn_delivery(l.pn, pnTag(tag))}
+}
+
+func (l Link) Connection() Connection { return l.Session().Connection() }
+
+// Human-readable link description including name, source, target and direction.
+func (l Link) String() string {
+ switch {
+ case l.IsNil():
+ return fmt.Sprintf("<nil-link>")
+ case l.IsSender():
+ return fmt.Sprintf("%s(%s->%s)", l.Name(), l.Source().Address(), l.Target().Address())
+ default:
+ return fmt.Sprintf("%s(%s<-%s)", l.Name(), l.Target().Address(), l.Source().Address())
+ }
+}
+
+func (l Link) Type() string {
+ if l.IsSender() {
+ return "sender-link"
+ } else {
+ return "receiver-link"
+ }
+}
+
+// IsDrain calls pn_link_get_drain(), it conflicts with pn_link_drain() under the normal mapping.
+func (l Link) IsDrain() bool {
+ return bool(C.pn_link_get_drain(l.pn))
+}
+
+func cPtr(b []byte) *C.char {
+ if len(b) == 0 {
+ return nil
+ }
+ return (*C.char)(unsafe.Pointer(&b[0]))
+}
+
+func cLen(b []byte) C.size_t {
+ return C.size_t(len(b))
+}
+
+func (s Session) Sender(name string) Link {
+ cname := C.CString(name)
+ defer C.free(unsafe.Pointer(cname))
+ return Link{C.pn_sender(s.pn, cname)}
+}
+
+func (s Session) Receiver(name string) Link {
+ cname := C.CString(name)
+ defer C.free(unsafe.Pointer(cname))
+ return Link{C.pn_receiver(s.pn, cname)}
+}
+
+func (t Transport) String() string {
+ return fmt.Sprintf("(Transport)(%p)", t.CPtr())
+}
+
+// Unique (per process) string identifier for a connection, useful for debugging.
+func (c Connection) String() string {
+ // Use the transport address to match the default transport logs from PN_TRACE.
+ return fmt.Sprintf("(Connection)(%p)", c.Transport().CPtr())
+}
+
+func (c Connection) Type() string {
+ return "connection"
+}
+
+// Head functions don't follow the normal naming conventions so missed by the generator.
+
+func (c Connection) LinkHead(s State) Link {
+ return Link{C.pn_link_head(c.pn, C.pn_state_t(s))}
+}
+
+func (c Connection) SessionHead(s State) Session {
+ return Session{C.pn_session_head(c.pn, C.pn_state_t(s))}
+}
+
+func (c Connection) Links(state State) (links []Link) {
+ for l := c.LinkHead(state); !l.IsNil(); l = l.Next(state) {
+ links = append(links, l)
+ }
+ return
+}
+
+func (c Connection) Sessions(state State) (sessions []Session) {
+ for s := c.SessionHead(state); !s.IsNil(); s = s.Next(state) {
+ sessions = append(sessions, s)
+ }
+ return
+}
+
+// SetPassword takes []byte not string because it is impossible to erase a string
+// from memory reliably. Proton will not keep the password in memory longer than
+// needed, the caller should overwrite their copy on return.
+//
+// The password must not contain embedded nul characters, a trailing nul is ignored.
+func (c Connection) SetPassword(password []byte) {
+ if len(password) == 0 || password[len(password)-1] != 0 {
+ password = append(password, 0) // Proton requires a terminating null.
+ }
+ C.pn_connection_set_password(c.pn, (*C.char)(unsafe.Pointer(&password[0])))
+}
+
+func (s Session) String() string {
+ return fmt.Sprintf("(Session)(%p)", s.pn) // TODO aconway 2016-09-12: should print channel number.
+}
+
+func (s Session) Type() string { return "session" }
+
+// Error returns an instance of amqp.Error or nil.
+func (c Condition) Error() error {
+ if c.IsNil() || !c.IsSet() {
+ return nil
+ }
+ return amqp.Error{Name: c.Name(), Description: c.Description()}
+}
+
+// Set a Go error into a condition.
+// If it is not an amqp.Condition use the error type as name, error string as description.
+func (c Condition) SetError(err error) {
+ if err != nil {
+ if cond, ok := err.(amqp.Error); ok {
+ c.SetName(cond.Name)
+ c.SetDescription(cond.Description)
+ } else {
+ c.SetName(reflect.TypeOf(err).Name())
+ c.SetDescription(err.Error())
+ }
+ }
+}
+
+func (c Connection) Session() (Session, error) {
+ s := Session{C.pn_session(c.pn)}
+ if s.IsNil() {
+ return s, Connection(c).Error()
+ }
+ return s, nil
+}
+
+// pnTime converts Go time.Time to Proton millisecond Unix time.
+//
+// Note: t.isZero() is converted to C.pn_timestamp_t(0) and vice-versa. These
+// are used as "not set" sentinel values by the Go and Proton APIs, so it is
+// better to conserve the "zeroness" even though they don't represent the same
+// time instant.
+//
+func pnTime(t time.Time) (pnt C.pn_timestamp_t) {
+ if !t.IsZero() {
+ pnt = C.pn_timestamp_t(t.Unix()*1000 + int64(t.Nanosecond())/int64(time.Millisecond))
+ }
+ return
+}
+
+// goTime converts a pn_timestamp_t to a Go time.Time.
+//
+// Note: C.pn_timestamp_t(0) is converted to a zero time.Time and
+// vice-versa. These are used as "not set" sentinel values by the Go and Proton
+// APIs, so it is better to conserve the "zeroness" even though they don't
+// represent the same time instant.
+//
+func goTime(pnt C.pn_timestamp_t) (t time.Time) {
+ if pnt != 0 {
+ t = time.Unix(int64(pnt/1000), int64(pnt%1000)*int64(time.Millisecond))
+ }
+ return
+}
+
+// Special treatment for Transport.Head, return value is unsafe.Pointer not string
+func (t Transport) Head() unsafe.Pointer {
+ return unsafe.Pointer(C.pn_transport_head(t.pn))
+}
+
+// Special treatment for Transport.Tail, return value is unsafe.Pointer not string
+func (t Transport) Tail() unsafe.Pointer {
+ return unsafe.Pointer(C.pn_transport_tail(t.pn))
+}
+
+// Special treatment for Transport.Push, takes []byte instead of char*, size
+func (t Transport) Push(bytes []byte) int {
+ return int(C.pn_transport_push(t.pn, (*C.char)(unsafe.Pointer(&bytes[0])), C.size_t(len(bytes))))
+}
+
+// Get the SASL object for the transport.
+func (t Transport) SASL() SASL {
+ return SASL{C.pn_sasl(t.pn)}
+}
++
++// Do we support extended SASL negotiation?
++// All implementations of Proton support ANONYMOUS and EXTERNAL on both
++// client and server sides and PLAIN on the client side.
++//
++// Extended SASL implememtations use an external library (Cyrus SASL)
++// to support other mechanisms beyond these basic ones.
++func SASLExtended() bool {
++ return bool(C.pn_sasl_extended())
++}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org