You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2015/10/23 16:36:50 UTC
[48/50] [abbrv] qpid-proton git commit: Merge branch 'master' into
go1 - 0.11 alpa
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/electron/container.go
----------------------------------------------------------------------
diff --cc electron/container.go
index 0000000,0000000..7bbc4b0
new file mode 100644
--- /dev/null
+++ b/electron/container.go
@@@ -1,0 -1,0 +1,71 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements. See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership. The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied. See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package electron
++
++import (
++ "net"
++ "qpid.apache.org/internal"
++)
++
++// Container is an AMQP container, it represents a single AMQP "application".It
++// provides functions to create new Connections to remote containers.
++//
++// Create with NewContainer()
++//
++type Container interface {
++ // Id is a unique identifier for the container in your distributed application.
++ Id() string
++
++ // Create a new AMQP Connection over the supplied net.Conn connection.
++ //
++ // You must call Connection.Open() on the returned Connection, after
++ // setting any Connection properties you need to set. Note the net.Conn
++ // can be an outgoing connection (e.g. made with net.Dial) or an incoming
++ // connection (e.g. made with net.Listener.Accept())
++ Connection(net.Conn, ...ConnectionSetting) (Connection, error)
++}
++
++type container struct {
++ id string
++ linkNames internal.IdCounter
++}
++
++// NewContainer creates a new container. The id must be unique in your
++// distributed application, all connections created by the container
++// will have this container-id.
++//
++// If id == "" a random UUID will be generated for the id.
++func NewContainer(id string) Container {
++ if id == "" {
++ id = internal.UUID4().String()
++ }
++ cont := &container{id: id}
++ return cont
++}
++
++func (cont *container) Id() string { return cont.id }
++
++func (cont *container) nextLinkName() string {
++ return cont.id + "@" + cont.linkNames.Next()
++}
++
++func (cont *container) Connection(conn net.Conn, setting ...ConnectionSetting) (Connection, error) {
++ return newConnection(conn, cont, setting...)
++}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/electron/doc.go
----------------------------------------------------------------------
diff --cc electron/doc.go
index 0000000,0000000..eaa6e7a
new file mode 100644
--- /dev/null
+++ b/electron/doc.go
@@@ -1,0 -1,0 +1,57 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements. See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership. The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied. See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++/*
++Package electron is a procedural, concurrent-safe Go library for AMQP messaging.
++You can write clients and servers using this library.
++
++Start by creating a Container with NewContainer. A Container represents a client
++or server application that can contain many incoming or outgoing connections.
++
++Create connections with the standard Go 'net' package using net.Dial or
++net.Listen. Create an AMQP connection over a net.Conn with
++Container.Connection() and open it with Connection.Open().
++
++AMQP sends messages over "links". Each link has a Sender end and a Receiver
++end. Connection.Sender() and Connection.Receiver() allow you to create links to
++Send() and Receive() messages.
++
++You can create an AMQP server connection by calling Connection.Server() and
++Connection.Listen() before calling Connection.Open(). A server connection can
++negotiate protocol security details and can accept incoming links opened from
++the remote end of the connection
++*/
++package electron
++
++//#cgo LDFLAGS: -lqpid-proton
++import "C"
++
++// Just for package comment
++
++/* DEVELOPER NOTES
++
++There is a single proton.Engine per connection, each driving it's own event-loop goroutine,
++and each with a 'handler'. Most state for a connection is maintained on the handler, and
++only accessed in the event-loop goroutine, so no locks are required.
++
++The handler sets up channels as needed to get or send data from user goroutines
++using electron types like Sender or Receiver. We also use Engine.Inject to inject
++actions into the event loop from user goroutines.
++
++*/
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/electron/endpoint.go
----------------------------------------------------------------------
diff --cc electron/endpoint.go
index 0000000,0000000..745fd04
new file mode 100644
--- /dev/null
+++ b/electron/endpoint.go
@@@ -1,0 -1,0 +1,68 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements. See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership. The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied. See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package electron
++
++import (
++ "io"
++ "qpid.apache.org/internal"
++ "qpid.apache.org/proton"
++)
++
++// Closed is an alias for io.EOF. It is returned as an error when an endpoint
++// was closed cleanly.
++var Closed = io.EOF
++
++// Endpoint is the common interface for Connection, Session, Link, Sender and Receiver.
++//
++// Endpoints can be created locally or by the remote peer. You must Open() an
++// endpoint before you can use it. Some endpoints have additional Set*() methods
++// that must be called before Open() to take effect, see Connection, Session,
++// Link, Sender and Receiver for details.
++//
++type Endpoint interface {
++ // Close an endpoint and signal an error to the remote end if error != nil.
++ Close(error)
++
++ // String is a human readable identifier, useful for debugging and logging.
++ String() string
++
++ // Error returns nil if the endpoint is open, otherwise returns an error.
++ // Error() == Closed means the endpoint was closed without error.
++ Error() error
++
++ // Connection containing the endpoint
++ Connection() Connection
++}
++
++type endpoint struct {
++ err internal.ErrorHolder
++ str string // Must be set by the value that embeds endpoint.
++}
++
++func (e *endpoint) String() string { return e.str }
++func (e *endpoint) Error() error { return e.err.Get() }
++
++// Call in proton goroutine to close an endpoint locally
++// handler will complete the close when remote end closes.
++func localClose(ep proton.Endpoint, err error) {
++ if ep.State().LocalActive() {
++ proton.CloseError(ep, err)
++ }
++}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/electron/handler.go
----------------------------------------------------------------------
diff --cc electron/handler.go
index 0000000,0000000..b518e42
new file mode 100644
--- /dev/null
+++ b/electron/handler.go
@@@ -1,0 -1,0 +1,158 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements. See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership. The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied. See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package electron
++
++import (
++ "qpid.apache.org/amqp"
++ "qpid.apache.org/proton"
++)
++
++// NOTE: methods in this file are called only in the proton goroutine unless otherwise indicated.
++
++type handler struct {
++ delegator *proton.MessagingAdapter
++ connection *connection
++ links map[proton.Link]Link
++ sentMessages map[proton.Delivery]*sentMessage
++ sessions map[proton.Session]*session
++}
++
++func newHandler(c *connection) *handler {
++ h := &handler{
++ connection: c,
++ links: make(map[proton.Link]Link),
++ sentMessages: make(map[proton.Delivery]*sentMessage),
++ sessions: make(map[proton.Session]*session),
++ }
++ h.delegator = proton.NewMessagingAdapter(h)
++ // Disable auto features of MessagingAdapter, we do these ourselves.
++ h.delegator.Prefetch = 0
++ h.delegator.AutoAccept = false
++ h.delegator.AutoSettle = false
++ h.delegator.AutoOpen = false
++ return h
++}
++
++func (h *handler) internalError(fmt string, arg ...interface{}) {
++ proton.CloseError(h.connection.eConnection, amqp.Errorf(amqp.InternalError, fmt, arg...))
++}
++
++func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event) {
++ switch t {
++
++ case proton.MMessage:
++ if r, ok := h.links[e.Link()].(*receiver); ok {
++ r.message(e.Delivery())
++ } else {
++ h.internalError("no receiver for link %s", e.Link())
++ }
++
++ case proton.MSettled:
++ if sm := h.sentMessages[e.Delivery()]; sm != nil {
++ sm.settled(nil)
++ }
++
++ case proton.MSendable:
++ if s, ok := h.links[e.Link()].(*sender); ok {
++ s.sendable()
++ } else {
++ h.internalError("no receiver for link %s", e.Link())
++ }
++
++ case proton.MSessionOpening:
++ if e.Session().State().LocalUninit() { // Remotely opened
++ incoming := &IncomingSession{h: h, pSession: e.Session()}
++ h.connection.accept(incoming)
++ if err := incoming.error(); err != nil {
++ proton.CloseError(e.Session(), err)
++ }
++ }
++
++ case proton.MSessionClosed:
++ err := proton.EndpointError(e.Session())
++ for l, _ := range h.links {
++ if l.Session() == e.Session() {
++ h.linkClosed(l, err)
++ }
++ }
++ delete(h.sessions, e.Session())
++
++ case proton.MLinkOpening:
++ l := e.Link()
++ if l.State().LocalActive() { // Already opened locally.
++ break
++ }
++ ss := h.sessions[l.Session()]
++ if ss == nil {
++ h.internalError("no session for link %s", e.Link())
++ break
++ }
++ var incoming Incoming
++ if l.IsReceiver() {
++ incoming = &IncomingReceiver{makeIncomingLink(ss, l)}
++ } else {
++ incoming = &IncomingSender{makeIncomingLink(ss, l)}
++ }
++ h.connection.accept(incoming)
++ if err := incoming.error(); err != nil {
++ proton.CloseError(l, err)
++ break
++ }
++
++ case proton.MLinkClosing:
++ e.Link().Close()
++
++ case proton.MLinkClosed:
++ h.linkClosed(e.Link(), proton.EndpointError(e.Link()))
++
++ case proton.MConnectionClosing:
++ h.connection.err.Set(e.Connection().RemoteCondition().Error())
++
++ case proton.MConnectionClosed:
++ h.connection.err.Set(Closed) // If no error already set, this is an orderly close.
++
++ case proton.MDisconnected:
++ h.connection.err.Set(e.Transport().Condition().Error())
++ // If err not set at this point (e.g. to Closed) then this is unexpected.
++ h.connection.err.Set(amqp.Errorf(amqp.IllegalState, "unexpected disconnect on %s", h.connection))
++
++ err := h.connection.Error()
++ for l, _ := range h.links {
++ h.linkClosed(l, err)
++ }
++ for _, s := range h.sessions {
++ s.closed(err)
++ }
++ for _, sm := range h.sentMessages {
++ sm.settled(err)
++ }
++ }
++}
++
++func (h *handler) linkClosed(l proton.Link, err error) {
++ if link := h.links[l]; link != nil {
++ link.closed(err)
++ delete(h.links, l)
++ }
++}
++
++func (h *handler) addLink(rl proton.Link, ll Link) {
++ h.links[rl] = ll
++}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/electron/link.go
----------------------------------------------------------------------
diff --cc electron/link.go
index 0000000,0000000..4bef53b
new file mode 100644
--- /dev/null
+++ b/electron/link.go
@@@ -1,0 -1,0 +1,247 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements. See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership. The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied. See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package electron
++
++import (
++ "qpid.apache.org/internal"
++ "qpid.apache.org/proton"
++)
++
++// Link is the common interface for AMQP links. Sender and Receiver provide
++// more methods for the sending or receiving end of a link respectively.
++type Link interface {
++ Endpoint
++
++ // Source address that messages are coming from.
++ Source() string
++
++ // Target address that messages are going to.
++ Target() string
++
++ // Name is a unique name for the link among links between the same
++ // containers in the same direction. By default generated automatically.
++ LinkName() string
++
++ // IsSender is true if this is the sending end of the link.
++ IsSender() bool
++
++ // IsReceiver is true if this is the receiving end of the link.
++ IsReceiver() bool
++
++ // SndSettle defines when the sending end of the link settles message delivery.
++ SndSettle() SndSettleMode
++
++ // RcvSettle defines when the sending end of the link settles message delivery.
++ RcvSettle() RcvSettleMode
++
++ // Session containing the Link
++ Session() Session
++
++ // Called in event loop on closed event.
++ closed(err error)
++ // Called to open a link (local or accepted incoming link)
++ open()
++}
++
++// LinkSetting can be passed when creating a sender or receiver.
++// See functions that return LinkSetting for details
++type LinkSetting func(*link)
++
++// Source sets address that messages are coming from.
++func Source(s string) LinkSetting { return func(l *link) { l.source = s } }
++
++// Target sets address that messages are going to.
++func Target(s string) LinkSetting { return func(l *link) { l.target = s } }
++
++// LinkName sets the link name.
++func LinkName(s string) LinkSetting { return func(l *link) { l.target = s } }
++
++// SndSettle sets the send settle mode
++func SndSettle(m SndSettleMode) LinkSetting { return func(l *link) { l.sndSettle = m } }
++
++// RcvSettle sets the send settle mode
++func RcvSettle(m RcvSettleMode) LinkSetting { return func(l *link) { l.rcvSettle = m } }
++
++// SndSettleMode defines when the sending end of the link settles message delivery.
++type SndSettleMode proton.SndSettleMode
++
++// Capacity sets the link capacity
++func Capacity(n int) LinkSetting { return func(l *link) { l.capacity = n } }
++
++// Prefetch sets a receivers pre-fetch flag. Not relevant for a sender.
++func Prefetch(p bool) LinkSetting { return func(l *link) { l.prefetch = p } }
++
++// AtMostOnce sets "fire and forget" mode, messages are sent but no
++// acknowledgment is received, messages can be lost if there is a network
++// failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst
++func AtMostOnce() LinkSetting {
++ return func(l *link) {
++ SndSettle(SndSettled)(l)
++ RcvSettle(RcvFirst)(l)
++ }
++}
++
++// AtLeastOnce requests acknowledgment for every message, acknowledgment
++// indicates the message was definitely received. In the event of a
++// failure, unacknowledged messages can be re-sent but there is a chance
++// that the message will be received twice in this case.
++// Sets SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst
++func AtLeastOnce() LinkSetting {
++ return func(l *link) {
++ SndSettle(SndUnsettled)(l)
++ RcvSettle(RcvFirst)(l)
++ }
++}
++
++const (
++ // Messages are sent unsettled
++ SndUnsettled = SndSettleMode(proton.SndUnsettled)
++ // Messages are sent already settled
++ SndSettled = SndSettleMode(proton.SndSettled)
++ // Sender can send either unsettled or settled messages.
++ SendMixed = SndSettleMode(proton.SndMixed)
++)
++
++// RcvSettleMode defines when the receiving end of the link settles message delivery.
++type RcvSettleMode proton.RcvSettleMode
++
++const (
++ // Receiver settles first.
++ RcvFirst = RcvSettleMode(proton.RcvFirst)
++ // Receiver waits for sender to settle before settling.
++ RcvSecond = RcvSettleMode(proton.RcvSecond)
++)
++
++type link struct {
++ endpoint
++
++ // Link settings.
++ source string
++ target string
++ linkName string
++ isSender bool
++ sndSettle SndSettleMode
++ rcvSettle RcvSettleMode
++ capacity int
++ prefetch bool
++
++ session *session
++ eLink proton.Link
++ done chan struct{} // Closed when link is closed
++}
++
++func (l *link) Source() string { return l.source }
++func (l *link) Target() string { return l.target }
++func (l *link) LinkName() string { return l.linkName }
++func (l *link) IsSender() bool { return l.isSender }
++func (l *link) IsReceiver() bool { return !l.isSender }
++func (l *link) SndSettle() SndSettleMode { return l.sndSettle }
++func (l *link) RcvSettle() RcvSettleMode { return l.rcvSettle }
++func (l *link) Session() Session { return l.session }
++func (l *link) Connection() Connection { return l.session.Connection() }
++
++func (l *link) engine() *proton.Engine { return l.session.connection.engine }
++func (l *link) handler() *handler { return l.session.connection.handler }
++
++// Set up link fields and open the proton.Link
++func localLink(sn *session, isSender bool, setting ...LinkSetting) (link, error) {
++ l := link{
++ session: sn,
++ isSender: isSender,
++ capacity: 1,
++ prefetch: false,
++ done: make(chan struct{}),
++ }
++ for _, set := range setting {
++ set(&l)
++ }
++ if l.linkName == "" {
++ l.linkName = l.session.connection.container.nextLinkName()
++ }
++ if l.IsSender() {
++ l.eLink = l.session.eSession.Sender(l.linkName)
++ } else {
++ l.eLink = l.session.eSession.Receiver(l.linkName)
++ }
++ if l.eLink.IsNil() {
++ l.err.Set(internal.Errorf("cannot create link %s", l))
++ return l, l.err.Get()
++ }
++ l.eLink.Source().SetAddress(l.source)
++ l.eLink.Target().SetAddress(l.target)
++ l.eLink.SetSndSettleMode(proton.SndSettleMode(l.sndSettle))
++ l.eLink.SetRcvSettleMode(proton.RcvSettleMode(l.rcvSettle))
++ l.str = l.eLink.String()
++ l.eLink.Open()
++ return l, nil
++}
++
++type incomingLink struct {
++ incoming
++ link
++}
++
++// Set up a link from an incoming proton.Link.
++func makeIncomingLink(sn *session, eLink proton.Link) incomingLink {
++ l := incomingLink{
++ link: link{
++ session: sn,
++ isSender: eLink.IsSender(),
++ eLink: eLink,
++ source: eLink.RemoteSource().Address(),
++ target: eLink.RemoteTarget().Address(),
++ linkName: eLink.Name(),
++ sndSettle: SndSettleMode(eLink.RemoteSndSettleMode()),
++ rcvSettle: RcvSettleMode(eLink.RemoteRcvSettleMode()),
++ capacity: 1,
++ prefetch: false,
++ done: make(chan struct{}),
++ },
++ }
++ l.str = eLink.String()
++ return l
++}
++
++// Called in proton goroutine on closed or disconnected
++func (l *link) closed(err error) {
++ l.err.Set(err)
++ l.err.Set(Closed) // If no error set, mark as closed.
++ close(l.done)
++}
++
++// Not part of Link interface but use by Sender and Receiver.
++func (l *link) Credit() (credit int, err error) {
++ err = l.engine().InjectWait(func() error {
++ credit = l.eLink.Credit()
++ return nil
++ })
++ return
++}
++
++// Not part of Link interface but use by Sender and Receiver.
++func (l *link) Capacity() int { return l.capacity }
++
++func (l *link) Close(err error) {
++ l.engine().Inject(func() { localClose(l.eLink, err) })
++}
++
++func (l *link) open() {
++ l.eLink.Open()
++}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/electron/messaging_test.go
----------------------------------------------------------------------
diff --cc electron/messaging_test.go
index 0000000,0000000..36b0c24
new file mode 100644
--- /dev/null
+++ b/electron/messaging_test.go
@@@ -1,0 -1,0 +1,416 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements. See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership. The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied. See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package electron
++
++import (
++ "fmt"
++ "net"
++ "path"
++ "qpid.apache.org/amqp"
++ "runtime"
++ "testing"
++ "time"
++)
++
++func fatalIf(t *testing.T, err error) {
++ if err != nil {
++ _, file, line, ok := runtime.Caller(1) // annotate with location of caller.
++ if ok {
++ _, file = path.Split(file)
++ }
++ t.Fatalf("(from %s:%d) %v", file, line, err)
++ }
++}
++
++// Start a server, return listening addr and channel for incoming Connection.
++func newServer(t *testing.T, cont Container, accept func(Endpoint) error) (net.Addr, <-chan Connection) {
++ listener, err := net.Listen("tcp", "")
++ fatalIf(t, err)
++ addr := listener.Addr()
++ ch := make(chan Connection)
++ go func() {
++ conn, err := listener.Accept()
++ c, err := cont.Connection(conn)
++ fatalIf(t, err)
++ c.Server()
++ c.Listen(accept)
++ fatalIf(t, c.Open())
++ ch <- c
++ }()
++ return addr, ch
++}
++
++// Return open an client connection and session, return the session.
++func newClient(t *testing.T, cont Container, addr net.Addr) Session {
++ conn, err := net.Dial(addr.Network(), addr.String())
++ fatalIf(t, err)
++ c, err := cont.Connection(conn)
++ fatalIf(t, err)
++ c.Open()
++ sn, err := c.Session()
++ fatalIf(t, err)
++ return sn
++}
++
++// Return client and server ends of the same connection.
++func newClientServer(t *testing.T, accept func(Endpoint) error) (client Session, server Connection) {
++ addr, ch := newServer(t, NewContainer(""), accept)
++ client = newClient(t, NewContainer(""), addr)
++ return client, <-ch
++}
++
++// Close client and server
++func closeClientServer(client Session, server Connection) {
++ client.Connection().Close(nil)
++ server.Close(nil)
++}
++
++// Send a message one way with a client sender and server receiver, verify ack.
++func TestClientSendServerReceive(t *testing.T) {
++ timeout := time.Second * 2
++ nLinks := 3
++ nMessages := 3
++
++ rchan := make(chan Receiver, nLinks)
++ client, server := newClientServer(t, func(ep Endpoint) error {
++ if r, ok := ep.(Receiver); ok {
++ r.SetCapacity(1, false)
++ rchan <- r
++ }
++ return nil
++ })
++
++ defer func() {
++ closeClientServer(client, server)
++ }()
++
++ s := make([]Sender, nLinks)
++ for i := 0; i < nLinks; i++ {
++ var err error
++ s[i], err = client.Sender(Target(fmt.Sprintf("foo%d", i)))
++ if err != nil {
++ t.Fatal(err)
++ }
++ }
++ r := make([]Receiver, nLinks)
++ for i := 0; i < nLinks; i++ {
++ r[i] = <-rchan
++ }
++
++ for i := 0; i < nLinks; i++ {
++ for j := 0; j < nMessages; j++ {
++ var sm SentMessage
++
++ // Client send
++ sendDone := make(chan struct{})
++ go func() {
++ defer close(sendDone)
++ m := amqp.NewMessageWith(fmt.Sprintf("foobar%v-%v", i, j))
++ var err error
++ sm, err = s[i].Send(m)
++ if err != nil {
++ t.Fatal(err)
++ }
++ }()
++
++ // Server recieve
++ rm, err := r[i].Receive()
++ if err != nil {
++ t.Fatal(err)
++ }
++ if want, got := interface{}(fmt.Sprintf("foobar%v-%v", i, j)), rm.Message.Body(); want != got {
++ t.Errorf("%#v != %#v", want, got)
++ }
++
++ // Should not be acknowledged on client yet
++ <-sendDone
++ if d, err := sm.DispositionTimeout(0); err != Timeout || NoDisposition != d {
++ t.Errorf("want [no-disposition/timeout] got [%v/%v]", d, err)
++ }
++ // Server ack
++ if err := rm.Acknowledge(Rejected); err != nil {
++ t.Error(err)
++ }
++ // Client get ack.
++ if d, err := sm.DispositionTimeout(timeout); err != nil || Rejected != d {
++ t.Errorf("want [rejected/nil] got [%v/%v]", d, err)
++ }
++ }
++ }
++}
++
++func TestClientReceiver(t *testing.T) {
++ nMessages := 3
++ client, server := newClientServer(t, func(ep Endpoint) error {
++ if s, ok := ep.(Sender); ok {
++ go func() {
++ for i := int32(0); i < int32(nMessages); i++ {
++ sm, err := s.Send(amqp.NewMessageWith(i))
++ if err != nil {
++ t.Error(err)
++ return
++ } else {
++ sm.Disposition() // Sync send.
++ }
++ }
++ s.Close(nil)
++ }()
++ }
++ return nil
++ })
++
++ r, err := client.Receiver(Source("foo"))
++ if err != nil {
++ t.Fatal(err)
++ }
++ for i := int32(0); i < int32(nMessages); i++ {
++ rm, err := r.Receive()
++ if err != nil {
++ if err != Closed {
++ t.Error(err)
++ }
++ break
++ }
++ if err := rm.Accept(); err != nil {
++ t.Error(err)
++ }
++ if b, ok := rm.Message.Body().(int32); !ok || b != i {
++ t.Errorf("want %v, true got %v, %v", i, b, ok)
++ }
++ }
++ server.Close(nil)
++ client.Connection().Close(nil)
++}
++
++// Test timeout versions of waiting functions.
++func TestTimeouts(t *testing.T) {
++ var err error
++ rchan := make(chan Receiver, 1)
++ client, server := newClientServer(t, func(ep Endpoint) error {
++ if r, ok := ep.(Receiver); ok {
++ r.SetCapacity(1, false) // Issue credit only on receive
++ rchan <- r
++ }
++ return nil
++ })
++ defer func() { closeClientServer(client, server) }()
++
++ // Open client sender
++ snd, err := client.Sender(Target("test"))
++ if err != nil {
++ t.Fatal(err)
++ }
++ rcv := <-rchan
++
++ // Test send with timeout
++ short := time.Millisecond
++ long := time.Second
++ m := amqp.NewMessage()
++ if _, err = snd.SendTimeout(m, 0); err != Timeout { // No credit, expect timeout.
++ t.Error("want Timeout got", err)
++ }
++ if _, err = snd.SendTimeout(m, short); err != Timeout { // No credit, expect timeout.
++ t.Error("want Timeout got", err)
++ }
++ // Test receive with timeout
++ if _, err = rcv.ReceiveTimeout(0); err != Timeout { // No credit, expect timeout.
++ t.Error("want Timeout got", err)
++ }
++ // Test receive with timeout
++ if _, err = rcv.ReceiveTimeout(short); err != Timeout { // No credit, expect timeout.
++ t.Error("want Timeout got", err)
++ }
++ // There is now a credit on the link due to receive
++ sm, err := snd.SendTimeout(m, long)
++ if err != nil {
++ t.Fatal(err)
++ }
++ // Disposition should timeout
++ if _, err = sm.DispositionTimeout(long); err != Timeout {
++ t.Error("want Timeout got", err)
++ }
++ if _, err = sm.DispositionTimeout(short); err != Timeout {
++ t.Error("want Timeout got", err)
++ }
++ // Receive and accept
++ rm, err := rcv.ReceiveTimeout(long)
++ if err != nil {
++ t.Fatal(err)
++ }
++ rm.Accept()
++ // Sender get ack
++ d, err := sm.DispositionTimeout(long)
++ if err != nil || d != Accepted {
++ t.Errorf("want (rejected, nil) got (%v, %v)", d, err)
++ }
++}
++
++// clientServer that returns sender/receiver pairs at opposite ends of link.
++type pairs struct {
++ t *testing.T
++ client Session
++ server Connection
++ rchan chan Receiver
++ schan chan Sender
++}
++
++func newPairs(t *testing.T) *pairs {
++ p := &pairs{t: t, rchan: make(chan Receiver, 1), schan: make(chan Sender, 1)}
++ p.client, p.server = newClientServer(t, func(ep Endpoint) error {
++ switch ep := ep.(type) {
++ case Receiver:
++ ep.SetCapacity(1, false)
++ p.rchan <- ep
++ case Sender:
++ p.schan <- ep
++ }
++ return nil
++ })
++ return p
++}
++
++func (p *pairs) close() {
++ closeClientServer(p.client, p.server)
++}
++
++func (p *pairs) senderReceiver() (Sender, Receiver) {
++ snd, err := p.client.Sender()
++ fatalIf(p.t, err)
++ rcv := <-p.rchan
++ return snd, rcv
++}
++
++func (p *pairs) receiverSender() (Receiver, Sender) {
++ rcv, err := p.client.Receiver()
++ fatalIf(p.t, err)
++ snd := <-p.schan
++ return rcv, snd
++}
++
++type result struct {
++ label string
++ err error
++}
++
++func (r result) String() string { return fmt.Sprintf("%s(%s)", r.err, r.label) }
++
++func doSend(snd Sender, results chan result) {
++ _, err := snd.Send(amqp.NewMessage())
++ results <- result{"send", err}
++}
++
++func doReceive(rcv Receiver, results chan result) {
++ _, err := rcv.Receive()
++ results <- result{"receive", err}
++}
++
++func doDisposition(sm SentMessage, results chan result) {
++ _, err := sm.Disposition()
++ results <- result{"disposition", err}
++}
++
++// Test that closing Links interrupts blocked link functions.
++func TestLinkCloseInterrupt(t *testing.T) {
++ want := amqp.Errorf("x", "all bad")
++ pairs := newPairs(t)
++ results := make(chan result) // Collect expected errors
++
++ // Sender.Close() interrupts Send()
++ snd, rcv := pairs.senderReceiver()
++ go doSend(snd, results)
++ snd.Close(want)
++ if r := <-results; want != r.err {
++ t.Errorf("want %#v got %#v", want, r)
++ }
++
++ // Remote Receiver.Close() interrupts Send()
++ snd, rcv = pairs.senderReceiver()
++ go doSend(snd, results)
++ rcv.Close(want)
++ if r := <-results; want != r.err {
++ t.Errorf("want %#v got %#v", want, r)
++ }
++
++ // Receiver.Close() interrupts Receive()
++ snd, rcv = pairs.senderReceiver()
++ go doReceive(rcv, results)
++ rcv.Close(want)
++ if r := <-results; want != r.err {
++ t.Errorf("want %#v got %#v", want, r)
++ }
++
++ // Remote Sender.Close() interrupts Receive()
++ snd, rcv = pairs.senderReceiver()
++ go doReceive(rcv, results)
++ snd.Close(want)
++ if r := <-results; want != r.err {
++ t.Errorf("want %#v got %#v", want, r)
++ }
++}
++
++// Test closing the server end of a connection.
++func TestConnectionCloseInterrupt1(t *testing.T) {
++ want := amqp.Errorf("x", "bad")
++ pairs := newPairs(t)
++ results := make(chan result) // Collect expected errors
++
++ // Connection.Close() interrupts Send, Receive, Disposition.
++ snd, rcv := pairs.senderReceiver()
++ go doReceive(rcv, results)
++ sm, err := snd.Send(amqp.NewMessage())
++ fatalIf(t, err)
++ go doDisposition(sm, results)
++ snd, rcv = pairs.senderReceiver()
++ go doSend(snd, results)
++ rcv, snd = pairs.receiverSender()
++ go doReceive(rcv, results)
++ pairs.server.Close(want)
++ for i := 0; i < 3; i++ {
++ if r := <-results; want != r.err {
++ // TODO aconway 2015-10-06: Not propagating the correct error, seeing nil and EOF.
++ t.Logf("want %v got %v", want, r)
++ }
++ }
++}
++
++// Test closing the client end of the connection.
++func TestConnectionCloseInterrupt2(t *testing.T) {
++ want := amqp.Errorf("x", "bad")
++ pairs := newPairs(t)
++ results := make(chan result) // Collect expected errors
++
++ // Connection.Close() interrupts Send, Receive, Disposition.
++ snd, rcv := pairs.senderReceiver()
++ go doReceive(rcv, results)
++ sm, err := snd.Send(amqp.NewMessage())
++ fatalIf(t, err)
++ go doDisposition(sm, results)
++ snd, rcv = pairs.senderReceiver()
++ go doSend(snd, results)
++ rcv, snd = pairs.receiverSender()
++ go doReceive(rcv, results)
++ pairs.client.Close(want)
++ for i := 0; i < 3; i++ {
++ if r := <-results; want != r.err {
++ // TODO aconway 2015-10-06: Not propagating the correct error, seeing nil.
++ t.Logf("want %v got %v", want, r)
++ }
++ }
++}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/electron/receiver.go
----------------------------------------------------------------------
diff --cc electron/receiver.go
index 0000000,0000000..59ac018
new file mode 100644
--- /dev/null
+++ b/electron/receiver.go
@@@ -1,0 -1,0 +1,238 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements. See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership. The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied. See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package electron
++
++import (
++ "qpid.apache.org/amqp"
++ "qpid.apache.org/internal"
++ "qpid.apache.org/proton"
++ "time"
++)
++
++// Receiver is a Link that receives messages.
++//
++type Receiver interface {
++ Link
++
++ // Receive blocks until a message is available or until the Receiver is closed
++ // and has no more buffered messages.
++ Receive() (ReceivedMessage, error)
++
++ // ReceiveTimeout is like Receive but gives up after timeout, see Timeout.
++ //
++ // Note that that if Prefetch is false, after a Timeout the credit issued by
++ // Receive remains on the link. It will be used by the next call to Receive.
++ ReceiveTimeout(timeout time.Duration) (ReceivedMessage, error)
++
++ // Prefetch==true means the Receiver will automatically issue credit to the
++ // remote sender to keep its buffer as full as possible, i.e. it will
++ // "pre-fetch" messages independently of the application calling
++ // Receive(). This gives good throughput for applications that handle a
++ // continuous stream of messages. Larger capacity may improve throughput, the
++ // optimal value depends on the characteristics of your application.
++ //
++ // Prefetch==false means the Receiver will issue only issue credit when you
++ // call Receive(), and will only issue enough credit to satisfy the calls
++ // actually made. This gives lower throughput but will not fetch any messages
++ // in advance. It is good for synchronous applications that need to evaluate
++ // each message before deciding whether to receive another. The
++ // request-response pattern is a typical example. If you make concurrent
++ // calls to Receive with pre-fetch disabled, you can improve performance by
++ // setting the capacity close to the expected number of concurrent calls.
++ //
++ Prefetch() bool
++
++ // Capacity is the size (number of messages) of the local message buffer
++ // These are messages received but not yet returned to the application by a call to Receive()
++ Capacity() int
++}
++
++// Flow control policy for a receiver.
++type policy interface {
++ // Called at the start of Receive() to adjust credit before fetching a message.
++ Pre(*receiver)
++ // Called after Receive() has received a message from Buffer() before it returns.
++ // Non-nil error means no message was received because of an error.
++ Post(*receiver, error)
++}
++
++type prefetchPolicy struct{}
++
++func (p prefetchPolicy) Flow(r *receiver) {
++ r.engine().Inject(func() {
++ _, _, max := r.credit()
++ if max > 0 {
++ r.eLink.Flow(max)
++ }
++ })
++}
++func (p prefetchPolicy) Pre(r *receiver) { p.Flow(r) }
++func (p prefetchPolicy) Post(r *receiver, err error) {
++ if err == nil {
++ p.Flow(r)
++ }
++}
++
++type noPrefetchPolicy struct{ waiting int }
++
++func (p noPrefetchPolicy) Flow(r *receiver) { // Not called in proton goroutine
++ r.engine().Inject(func() {
++ len, credit, max := r.credit()
++ add := p.waiting - (len + credit)
++ if add > max {
++ add = max // Don't overflow
++ }
++ if add > 0 {
++ r.eLink.Flow(add)
++ }
++ })
++}
++func (p noPrefetchPolicy) Pre(r *receiver) { p.waiting++; p.Flow(r) }
++func (p noPrefetchPolicy) Post(r *receiver, err error) {
++ p.waiting--
++ if err == nil {
++ p.Flow(r)
++ }
++}
++
++// Receiver implementation
++type receiver struct {
++ link
++ buffer chan ReceivedMessage
++ policy policy
++}
++
++func newReceiver(l link) *receiver {
++ r := &receiver{link: l}
++ if r.capacity < 1 {
++ r.capacity = 1
++ }
++ if r.prefetch {
++ r.policy = &prefetchPolicy{}
++ } else {
++ r.policy = &noPrefetchPolicy{}
++ }
++ r.buffer = make(chan ReceivedMessage, r.capacity)
++ r.handler().addLink(r.eLink, r)
++ r.link.open()
++ return r
++}
++
++// call in proton goroutine.
++func (r *receiver) credit() (buffered, credit, max int) {
++ return len(r.buffer), r.eLink.Credit(), cap(r.buffer) - len(r.buffer)
++}
++
++func (r *receiver) Capacity() int { return cap(r.buffer) }
++func (r *receiver) Prefetch() bool { return r.prefetch }
++
++func (r *receiver) Receive() (rm ReceivedMessage, err error) {
++ return r.ReceiveTimeout(Forever)
++}
++
++func (r *receiver) ReceiveTimeout(timeout time.Duration) (rm ReceivedMessage, err error) {
++ internal.Assert(r.buffer != nil, "Receiver is not open: %s", r)
++ r.policy.Pre(r)
++ defer func() { r.policy.Post(r, err) }()
++ rmi, err := timedReceive(r.buffer, timeout)
++ switch err {
++ case Timeout:
++ return ReceivedMessage{}, Timeout
++ case Closed:
++ return ReceivedMessage{}, r.Error()
++ default:
++ return rmi.(ReceivedMessage), nil
++ }
++}
++
++// Called in proton goroutine on MMessage event.
++func (r *receiver) message(delivery proton.Delivery) {
++ if r.eLink.State().RemoteClosed() {
++ localClose(r.eLink, r.eLink.RemoteCondition().Error())
++ return
++ }
++ if delivery.HasMessage() {
++ m, err := delivery.Message()
++ if err != nil {
++ localClose(r.eLink, err)
++ return
++ }
++ internal.Assert(m != nil)
++ r.eLink.Advance()
++ if r.eLink.Credit() < 0 {
++ localClose(r.eLink, internal.Errorf("received message in excess of credit limit"))
++ } else {
++ // We never issue more credit than cap(buffer) so this will not block.
++ r.buffer <- ReceivedMessage{m, delivery, r}
++ }
++ }
++}
++
++func (r *receiver) closed(err error) {
++ r.link.closed(err)
++ if r.buffer != nil {
++ close(r.buffer)
++ }
++}
++
++// ReceivedMessage contains an amqp.Message and allows the message to be acknowledged.
++type ReceivedMessage struct {
++ // Message is the received message.
++ Message amqp.Message
++
++ eDelivery proton.Delivery
++ receiver Receiver
++}
++
++// Acknowledge a ReceivedMessage with the given disposition code.
++func (rm *ReceivedMessage) Acknowledge(disposition Disposition) error {
++ return rm.receiver.(*receiver).engine().InjectWait(func() error {
++ // Settle doesn't return an error but if the receiver is broken the settlement won't happen.
++ rm.eDelivery.SettleAs(uint64(disposition))
++ return rm.receiver.Error()
++ })
++}
++
++// Accept is short for Acknowledge(Accpeted)
++func (rm *ReceivedMessage) Accept() error { return rm.Acknowledge(Accepted) }
++
++// Reject is short for Acknowledge(Rejected)
++func (rm *ReceivedMessage) Reject() error { return rm.Acknowledge(Rejected) }
++
++// IncomingReceiver is passed to the accept() function given to Connection.Listen()
++// when there is an incoming request for a receiver link.
++type IncomingReceiver struct {
++ incomingLink
++}
++
++// Link provides information about the incoming link.
++func (i *IncomingReceiver) Link() Link { return i }
++
++// AcceptReceiver sets Capacity and Prefetch of the accepted Receiver.
++func (i *IncomingReceiver) AcceptReceiver(capacity int, prefetch bool) Receiver {
++ i.capacity = capacity
++ i.prefetch = prefetch
++ return i.Accept().(Receiver)
++}
++
++func (i *IncomingReceiver) Accept() Endpoint {
++ i.accepted = true
++ return newReceiver(i.link)
++}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/electron/sender.go
----------------------------------------------------------------------
diff --cc electron/sender.go
index 0000000,0000000..68cfbb3
new file mode 100644
--- /dev/null
+++ b/electron/sender.go
@@@ -1,0 -1,0 +1,315 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements. See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership. The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied. See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package electron
++
++// #include <proton/disposition.h>
++import "C"
++
++import (
++ "qpid.apache.org/amqp"
++ "qpid.apache.org/internal"
++ "qpid.apache.org/proton"
++ "reflect"
++ "time"
++)
++
++// Sender is a Link that sends messages.
++type Sender interface {
++ Link
++
++ // Send a message without waiting for acknowledgement. Returns a SentMessage.
++ // use SentMessage.Disposition() to wait for acknowledgement and get the
++ // disposition code.
++ //
++ // If the send buffer is full, send blocks until there is space in the buffer.
++ Send(m amqp.Message) (sm SentMessage, err error)
++
++ // SendTimeout is like send but only waits up to timeout for buffer space.
++ //
++ // Returns Timeout error if the timeout expires and the message has not been sent.
++ SendTimeout(m amqp.Message, timeout time.Duration) (sm SentMessage, err error)
++
++ // Send a message and forget it, there will be no acknowledgement.
++ // If the send buffer is full, send blocks until there is space in the buffer.
++ SendForget(m amqp.Message) error
++
++ // SendForgetTimeout is like send but only waits up to timeout for buffer space.
++ // Returns Timeout error if the timeout expires and the message has not been sent.
++ SendForgetTimeout(m amqp.Message, timeout time.Duration) error
++
++ // Credit indicates how many messages the receiving end of the link can accept.
++ //
++ // On a Sender credit can be negative, meaning that messages in excess of the
++ // receiver's credit limit have been buffered locally till credit is available.
++ Credit() (int, error)
++}
++
++type sendMessage struct {
++ m amqp.Message
++ sm SentMessage
++}
++
++type sender struct {
++ link
++ credit chan struct{} // Signal available credit.
++}
++
++// Disposition indicates the outcome of a settled message delivery.
++type Disposition uint64
++
++const (
++ // No disposition available: pre-settled, not yet acknowledged or an error occurred
++ NoDisposition Disposition = 0
++ // Message was accepted by the receiver
++ Accepted = proton.Accepted
++ // Message was rejected as invalid by the receiver
++ Rejected = proton.Rejected
++ // Message was not processed by the receiver but may be processed by some other receiver.
++ Released = proton.Released
++)
++
++// String human readable name for a Disposition.
++func (d Disposition) String() string {
++ switch d {
++ case NoDisposition:
++ return "no-disposition"
++ case Accepted:
++ return "accepted"
++ case Rejected:
++ return "rejected"
++ case Released:
++ return "released"
++ default:
++ return "unknown"
++ }
++}
++
++func (s *sender) Send(m amqp.Message) (SentMessage, error) {
++ return s.SendTimeout(m, Forever)
++}
++
++func (s *sender) SendTimeout(m amqp.Message, timeout time.Duration) (SentMessage, error) {
++ var sm SentMessage
++ if s.sndSettle == SndSettled {
++ sm = nil
++ } else {
++ sm = newSentMessage(s.session.connection)
++ }
++ return s.sendInternal(sendMessage{m, sm}, timeout)
++}
++
++func (s *sender) SendForget(m amqp.Message) error {
++ return s.SendForgetTimeout(m, Forever)
++}
++
++func (s *sender) SendForgetTimeout(m amqp.Message, timeout time.Duration) error {
++ snd := sendMessage{m, nil}
++ _, err := s.sendInternal(snd, timeout)
++ return err
++}
++
++func (s *sender) sendInternal(snd sendMessage, timeout time.Duration) (SentMessage, error) {
++ if _, err := timedReceive(s.credit, timeout); err != nil { // Wait for credit
++ if err == Closed {
++ err = s.Error()
++ internal.Assert(err != nil)
++ }
++ return nil, err
++ }
++ if err := s.engine().Inject(func() { s.doSend(snd) }); err != nil {
++ return nil, err
++ }
++ return snd.sm, nil
++}
++
++// Send a message. Handler goroutine
++func (s *sender) doSend(snd sendMessage) {
++ delivery, err := s.eLink.Send(snd.m)
++ switch sm := snd.sm.(type) {
++ case nil:
++ delivery.Settle()
++ case *sentMessage:
++ sm.delivery = delivery
++ if err != nil {
++ sm.settled(err)
++ } else {
++ s.handler().sentMessages[delivery] = sm
++ }
++ default:
++ internal.Assert(false, "bad SentMessage type %T", snd.sm)
++ }
++ if s.eLink.Credit() > 0 {
++ s.sendable() // Signal credit.
++ }
++}
++
++// Signal the sender has credit. Any goroutine.
++func (s *sender) sendable() {
++ select { // Non-blocking
++ case s.credit <- struct{}{}: // Set the flag if not already set.
++ default:
++ }
++}
++
++func (s *sender) closed(err error) {
++ s.link.closed(err)
++ close(s.credit)
++}
++
++func newSender(l link) *sender {
++ s := &sender{link: l, credit: make(chan struct{}, 1)}
++ s.handler().addLink(s.eLink, s)
++ s.link.open()
++ return s
++}
++
++// SentMessage represents a previously sent message. It allows you to wait for acknowledgement.
++type SentMessage interface {
++
++ // Disposition blocks till the message is acknowledged and returns the
++ // disposition state.
++ //
++ // NoDisposition with Error() != nil means the Connection was closed before
++ // the message was acknowledged.
++ //
++ // NoDisposition with Error() == nil means the message was pre-settled or
++ // Forget() was called.
++ Disposition() (Disposition, error)
++
++ // DispositionTimeout is like Disposition but gives up after timeout, see Timeout.
++ DispositionTimeout(time.Duration) (Disposition, error)
++
++ // Forget interrupts any call to Disposition on this SentMessage and tells the
++ // peer we are no longer interested in the disposition of this message.
++ Forget()
++
++ // Error returns the error that closed the disposition, or nil if there was no error.
++ // If the disposition closed because the connection closed, it will return Closed.
++ Error() error
++
++ // Value is an optional value you wish to associate with the SentMessage. It
++ // can be the message itself or some form of identifier.
++ Value() interface{}
++ SetValue(interface{})
++}
++
++// SentMessageSet is a concurrent-safe set of sent messages that can be checked
++// to get the next completed sent message
++type SentMessageSet struct {
++ cases []reflect.SelectCase
++ sm []SentMessage
++ done chan SentMessage
++}
++
++func (s *SentMessageSet) Add(sm SentMessage) {
++ s.sm = append(s.sm, sm)
++ s.cases = append(s.cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sm.(*sentMessage).done)})
++}
++
++// Wait waits up to timeout and returns the next SentMessage that has a valid dispositionb
++// or an error.
++func (s *SentMessageSet) Wait(sm SentMessage, timeout time.Duration) (SentMessage, error) {
++ s.cases = s.cases[:len(s.sm)] // Remove previous timeout cases
++ if timeout == 0 { // Non-blocking
++ s.cases = append(s.cases, reflect.SelectCase{Dir: reflect.SelectDefault})
++ } else {
++ s.cases = append(s.cases,
++ reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(After(timeout))})
++ }
++ chosen, _, _ := reflect.Select(s.cases)
++ if chosen > len(s.sm) {
++ return nil, Timeout
++ } else {
++ sm := s.sm[chosen]
++ s.sm = append(s.sm[:chosen], s.sm[chosen+1:]...)
++ return sm, nil
++ }
++}
++
++// SentMessage implementation
++type sentMessage struct {
++ connection *connection
++ done chan struct{}
++ delivery proton.Delivery
++ disposition Disposition
++ err error
++ value interface{}
++}
++
++func newSentMessage(c *connection) *sentMessage {
++ return &sentMessage{connection: c, done: make(chan struct{})}
++}
++
++func (sm *sentMessage) SetValue(v interface{}) { sm.value = v }
++func (sm *sentMessage) Value() interface{} { return sm.value }
++func (sm *sentMessage) Disposition() (Disposition, error) {
++ <-sm.done
++ return sm.disposition, sm.err
++}
++
++func (sm *sentMessage) DispositionTimeout(timeout time.Duration) (Disposition, error) {
++ if _, err := timedReceive(sm.done, timeout); err == Timeout {
++ return sm.disposition, Timeout
++ } else {
++ return sm.disposition, sm.err
++ }
++}
++
++func (sm *sentMessage) Forget() {
++ sm.connection.engine.Inject(func() {
++ sm.delivery.Settle()
++ delete(sm.connection.handler.sentMessages, sm.delivery)
++ })
++ sm.finish()
++}
++
++func (sm *sentMessage) settled(err error) {
++ if sm.delivery.Settled() {
++ sm.disposition = Disposition(sm.delivery.Remote().Type())
++ }
++ sm.err = err
++ sm.finish()
++}
++
++func (sm *sentMessage) finish() {
++ select {
++ case <-sm.done: // No-op if already closed
++ default:
++ close(sm.done)
++ }
++}
++
++func (sm *sentMessage) Error() error { return sm.err }
++
++// IncomingSender is passed to the accept() function given to Connection.Listen()
++// when there is an incoming request for a sender link.
++type IncomingSender struct {
++ incomingLink
++}
++
++// Link provides information about the incoming link.
++func (i *IncomingSender) Link() Link { return i }
++
++func (i *IncomingSender) AcceptSender() Sender { return i.Accept().(Sender) }
++
++func (i *IncomingSender) Accept() Endpoint {
++ i.accepted = true
++ return newSender(i.link)
++}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/electron/session.go
----------------------------------------------------------------------
diff --cc electron/session.go
index 0000000,0000000..3531da6
new file mode 100644
--- /dev/null
+++ b/electron/session.go
@@@ -1,0 -1,0 +1,125 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements. See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership. The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied. See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package electron
++
++import (
++ "qpid.apache.org/proton"
++)
++
++// Session is an AMQP session, it contains Senders and Receivers.
++type Session interface {
++ Endpoint
++
++ // Sender opens a new sender. v can be a string, which is used as the Target
++ // address, or a SenderSettings struct containing more details settings.
++ Sender(...LinkSetting) (Sender, error)
++
++ // Receiver opens a new Receiver. v can be a string, which is used as the
++ // Source address, or a ReceiverSettings struct containing more details
++ // settings.
++ Receiver(...LinkSetting) (Receiver, error)
++}
++
++type session struct {
++ endpoint
++ eSession proton.Session
++ connection *connection
++ capacity uint
++}
++
++// SessionSetting can be passed when creating a sender or receiver.
++// See functions that return SessionSetting for details
++type SessionSetting func(*session)
++
++// IncomingCapacity sets the size (in bytes) of the sessions incoming data buffer..
++func IncomingCapacity(cap uint) SessionSetting { return func(s *session) { s.capacity = cap } }
++
++// in proton goroutine
++func newSession(c *connection, es proton.Session, setting ...SessionSetting) *session {
++ s := &session{
++ connection: c,
++ eSession: es,
++ endpoint: endpoint{str: es.String()},
++ }
++ for _, set := range setting {
++ set(s)
++ }
++ c.handler.sessions[s.eSession] = s
++ s.eSession.SetIncomingCapacity(s.capacity)
++ s.eSession.Open()
++ return s
++}
++
++func (s *session) Connection() Connection { return s.connection }
++func (s *session) eEndpoint() proton.Endpoint { return s.eSession }
++func (s *session) engine() *proton.Engine { return s.connection.engine }
++func (s *session) Close(err error) {
++ s.engine().Inject(func() { localClose(s.eSession, err) })
++}
++
++func (s *session) SetCapacity(bytes uint) { s.capacity = bytes }
++
++func (s *session) Sender(setting ...LinkSetting) (snd Sender, err error) {
++ err = s.engine().InjectWait(func() error {
++ l, err := localLink(s, true, setting...)
++ if err == nil {
++ snd = newSender(l)
++ }
++ return err
++ })
++ return
++}
++
++func (s *session) Receiver(setting ...LinkSetting) (rcv Receiver, err error) {
++ err = s.engine().InjectWait(func() error {
++ l, err := localLink(s, false, setting...)
++ if err == nil {
++ rcv = newReceiver(l)
++ }
++ return err
++ })
++ return
++}
++
++// Called from handler on closed.
++func (s *session) closed(err error) {
++ s.err.Set(err)
++ s.err.Set(Closed)
++}
++
++// IncomingSession is passed to the accept() function given to Connection.Listen()
++// when there is an incoming session request.
++type IncomingSession struct {
++ incoming
++ h *handler
++ pSession proton.Session
++ capacity uint
++}
++
++// AcceptCapacity sets the session buffer capacity of an incoming session in bytes.
++func (i *IncomingSession) AcceptSession(bytes uint) Session {
++ i.capacity = bytes
++ return i.Accept().(Session)
++}
++
++func (i *IncomingSession) Accept() Endpoint {
++ i.accepted = true
++ return newSession(i.h.connection, i.pSession, IncomingCapacity(i.capacity))
++}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/electron/time.go
----------------------------------------------------------------------
diff --cc electron/time.go
index 0000000,0000000..3407b82
new file mode 100644
--- /dev/null
+++ b/electron/time.go
@@@ -1,0 -1,0 +1,82 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements. See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership. The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied. See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package electron
++
++import (
++ "qpid.apache.org/internal"
++ "reflect"
++ "time"
++)
++
++// Timeout is the error returned if an operation does not complete on time.
++//
++// Methods named *Timeout in this package take time.Duration timeout parameter.
++//
++// If timeout > 0 and there is no result available before the timeout, they
++// return a zero or nil value and Timeout as an error.
++//
++// If timeout == 0 they will return a result if one is immediatley available or
++// nil/zero and Timeout as an error if not.
++//
++// If timeout == Forever the function will return only when there is a result or
++// some non-timeout error occurs.
++//
++var Timeout = internal.Errorf("timeout")
++
++// Forever can be used as a timeout parameter to indicate wait forever.
++const Forever time.Duration = -1
++
++// timedReceive receives on channel (which can be a chan of any type), waiting
++// up to timeout.
++//
++// timeout==0 means do a non-blocking receive attempt. timeout < 0 means block
++// forever. Other values mean block up to the timeout.
++//
++// Returns error Timeout on timeout, Closed on channel close.
++func timedReceive(channel interface{}, timeout time.Duration) (interface{}, error) {
++ cases := []reflect.SelectCase{
++ reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(channel)},
++ }
++ if timeout == 0 { // Non-blocking
++ cases = append(cases, reflect.SelectCase{Dir: reflect.SelectDefault})
++ } else { // Block up to timeout
++ cases = append(cases,
++ reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(After(timeout))})
++ }
++ chosen, value, ok := reflect.Select(cases)
++ switch {
++ case !ok:
++ return nil, Closed
++ case chosen == 0:
++ return value.Interface(), nil
++ default:
++ return nil, Timeout
++ }
++}
++
++// After is like time.After but returns a nil channel if timeout == Forever
++// since selecting on a nil channel will never return.
++func After(timeout time.Duration) <-chan time.Time {
++ if timeout == Forever {
++ return nil
++ } else {
++ return time.After(timeout)
++ }
++}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/internal/error.go
----------------------------------------------------------------------
diff --cc internal/error.go
index 0000000,0000000..1b108e6
new file mode 100644
--- /dev/null
+++ b/internal/error.go
@@@ -1,0 -1,0 +1,118 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements. See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership. The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied. See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++// Internal implementation details - ignore.
++package internal
++
++// #cgo LDFLAGS: -lqpid-proton
++// #include <proton/error.h>
++// #include <proton/codec.h>
++import "C"
++
++import (
++ "fmt"
++ "sync"
++ "sync/atomic"
++ "unsafe"
++)
++
++// Error type for proton runtime errors returned as error values.
++type Error string
++
++// Error prefixes error message with proton:
++func (e Error) Error() string {
++ return "proton: " + string(e)
++}
++
++// Errorf creates an Error with a formatted message
++func Errorf(format string, a ...interface{}) Error {
++ return Error(fmt.Sprintf(format, a...))
++}
++
++type PnErrorCode int
++
++func (e PnErrorCode) String() string {
++ switch e {
++ case C.PN_EOS:
++ return "end-of-data"
++ case C.PN_ERR:
++ return "error"
++ case C.PN_OVERFLOW:
++ return "overflow"
++ case C.PN_UNDERFLOW:
++ return "underflow"
++ case C.PN_STATE_ERR:
++ return "bad-state"
++ case C.PN_ARG_ERR:
++ return "invalid-argument"
++ case C.PN_TIMEOUT:
++ return "timeout"
++ case C.PN_INTR:
++ return "interrupted"
++ case C.PN_INPROGRESS:
++ return "in-progress"
++ default:
++ return fmt.Sprintf("unknown-error(%d)", e)
++ }
++}
++
++func PnError(p unsafe.Pointer) error {
++ e := (*C.pn_error_t)(p)
++ if e == nil || C.pn_error_code(e) == 0 {
++ return nil
++ }
++ return Errorf("%s: %s", PnErrorCode(C.pn_error_code(e)), C.GoString(C.pn_error_text(e)))
++}
++
++// panicIf panics if condition is true, the panic value is Errorf(fmt, args...)
++func panicIf(condition bool, fmt string, args ...interface{}) {
++ if condition {
++ panic(Errorf(fmt, args...))
++ }
++}
++
++// ErrorHolder is a goroutine-safe error holder that keeps the first error that is set.
++type ErrorHolder struct {
++ once sync.Once
++ value atomic.Value
++}
++
++// Set the error if not already set, return the error in the Holder.
++func (e *ErrorHolder) Set(err error) {
++ if err != nil {
++ e.once.Do(func() { e.value.Store(err) })
++ }
++}
++
++// Get the error.
++func (e *ErrorHolder) Get() (err error) {
++ err, _ = e.value.Load().(error)
++ return
++}
++
++// Assert panics if condition is false with optional formatted message
++func Assert(condition bool, format ...interface{}) {
++ if !condition {
++ if len(format) > 0 {
++ panic(Errorf(format[0].(string), format[1:]...))
++ } else {
++ panic(Errorf("assertion failed"))
++ }
++ }
++}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/internal/flexchannel.go
----------------------------------------------------------------------
diff --cc internal/flexchannel.go
index 0000000,0000000..77b524c
new file mode 100644
--- /dev/null
+++ b/internal/flexchannel.go
@@@ -1,0 -1,0 +1,82 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements. See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership. The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied. See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package internal
++
++// FlexChannel acts like a channel with an automatically sized buffer, see NewFlexChannel().
++type FlexChannel struct {
++ // In channel to send to. close(In) will close the FlexChannel once buffer has drained.
++ In chan<- interface{}
++ // Out channel to receive from. Out closes when In has closed and the buffer is empty.
++ Out <-chan interface{}
++
++ in, out chan interface{}
++ buffer []interface{}
++ limit int
++}
++
++// NewFlexChannel creates a FlexChannel, a channel with an automatically-sized buffer.
++//
++// Initially the buffer size is 0, the buffer grows as needed up to limit. limit < 0 means
++// there is no limit.
++//
++func NewFlexChannel(limit int) *FlexChannel {
++ fc := &FlexChannel{
++ in: make(chan interface{}),
++ out: make(chan interface{}),
++ buffer: make([]interface{}, 0),
++ limit: limit,
++ }
++ fc.In = fc.in
++ fc.Out = fc.out
++ go fc.run()
++ return fc
++}
++
++func (fc *FlexChannel) run() {
++ defer func() { // Flush the channel on exit
++ for _, data := range fc.buffer {
++ fc.out <- data
++ }
++ close(fc.out)
++ }()
++
++ for {
++ var usein, useout chan interface{}
++ var outvalue interface{}
++ if len(fc.buffer) > 0 {
++ useout = fc.out
++ outvalue = fc.buffer[0]
++ }
++ if len(fc.buffer) < fc.limit || fc.limit < 0 {
++ usein = fc.in
++ }
++ Assert(usein != nil || useout != nil)
++ select {
++ case useout <- outvalue:
++ fc.buffer = fc.buffer[1:]
++ case data, ok := <-usein:
++ if ok {
++ fc.buffer = append(fc.buffer, data)
++ } else {
++ return
++ }
++ }
++ }
++}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/internal/flexchannel_test.go
----------------------------------------------------------------------
diff --cc internal/flexchannel_test.go
index 0000000,0000000..d0e1a44
new file mode 100644
--- /dev/null
+++ b/internal/flexchannel_test.go
@@@ -1,0 -1,0 +1,89 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements. See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership. The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied. See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package internal
++
++import (
++ "testing"
++)
++
++func recvall(ch <-chan interface{}) (result []interface{}) {
++ for {
++ select {
++ case x := <-ch:
++ result = append(result, x)
++ default:
++ return
++ }
++ }
++}
++
++func sendall(data []interface{}, ch chan<- interface{}) {
++}
++
++func TestFlex(t *testing.T) {
++ fc := NewFlexChannel(5)
++
++ // Test send/receve
++ go func() {
++ for i := 0; i < 4; i++ {
++ fc.In <- i
++ }
++ }()
++
++ for i := 0; i < 4; i++ {
++ j := <-fc.Out
++ if i != j {
++ t.Error("%v != %v", i, j)
++ }
++ }
++ select {
++ case x, ok := <-fc.Out:
++ t.Error("receive empty channel got", x, ok)
++ default:
++ }
++
++ // Test buffer limit
++ for i := 10; i < 15; i++ {
++ fc.In <- i
++ }
++ select {
++ case fc.In <- 0:
++ t.Error("send to full channel did not block")
++ default:
++ }
++ i := <-fc.Out
++ if i != 10 {
++ t.Error("%v != %v", i, 10)
++ }
++ fc.In <- 15
++ close(fc.In)
++
++ for i := 11; i < 16; i++ {
++ j := <-fc.Out
++ if i != j {
++ t.Error("%v != %v", i, j)
++ }
++ }
++
++ x, ok := <-fc.Out
++ if ok {
++ t.Error("Unexpected value on Out", x)
++ }
++}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/internal/safemap.go
----------------------------------------------------------------------
diff --cc internal/safemap.go
index 0000000,0000000..3a1fe2b
new file mode 100644
--- /dev/null
+++ b/internal/safemap.go
@@@ -1,0 -1,0 +1,57 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements. See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership. The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied. See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package internal
++
++import (
++ "sync"
++)
++
++// SafeMap is a goroutine-safe map of interface{} to interface{}.
++type SafeMap struct {
++ m map[interface{}]interface{}
++ lock sync.Mutex
++}
++
++func MakeSafeMap() SafeMap { return SafeMap{m: make(map[interface{}]interface{})} }
++
++func (m *SafeMap) Get(key interface{}) interface{} {
++ m.lock.Lock()
++ defer m.lock.Unlock()
++ return m.m[key]
++}
++
++func (m *SafeMap) GetOk(key interface{}) (interface{}, bool) {
++ m.lock.Lock()
++ defer m.lock.Unlock()
++ v, ok := m.m[key]
++ return v, ok
++}
++
++func (m *SafeMap) Put(key, value interface{}) {
++ m.lock.Lock()
++ defer m.lock.Unlock()
++ m.m[key] = value
++}
++
++func (m *SafeMap) Delete(key interface{}) {
++ m.lock.Lock()
++ defer m.lock.Unlock()
++ delete(m.m, key)
++}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/internal/uuid.go
----------------------------------------------------------------------
diff --cc internal/uuid.go
index 0000000,0000000..ef941a1
new file mode 100644
--- /dev/null
+++ b/internal/uuid.go
@@@ -1,0 -1,0 +1,70 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements. See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership. The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied. See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package internal
++
++import (
++ "fmt"
++ "math/rand"
++ "strconv"
++ "sync"
++ "sync/atomic"
++ "time"
++)
++
++type UUID [16]byte
++
++func (u UUID) String() string {
++ return fmt.Sprintf("%X-%X-%X-%X-%X", u[0:4], u[4:6], u[6:8], u[8:10], u[10:])
++}
++
++// Don't mess with the default random source.
++var randomSource = rand.NewSource(time.Now().UnixNano())
++var randomLock sync.Mutex
++
++func random() byte {
++ randomLock.Lock()
++ defer randomLock.Unlock()
++ return byte(randomSource.Int63())
++}
++
++func UUID4() UUID {
++ var u UUID
++ for i := 0; i < len(u); i++ {
++ u[i] = random()
++ }
++ // See /https://tools.ietf.org/html/rfc4122#section-4.4
++ u[6] = (u[6] & 0x0F) | 0x40 // Version bits to 4
++ u[8] = (u[8] & 0x3F) | 0x80 // Reserved bits (top two) set to 01
++ return u
++}
++
++// A simple atomic counter to generate unique 64 bit IDs.
++type IdCounter struct{ count uint64 }
++
++// NextInt gets the next uint64 value from the atomic counter.
++func (uc *IdCounter) NextInt() uint64 {
++ return atomic.AddUint64(&uc.count, 1)
++}
++
++// Next gets the next integer value encoded as a base32 string, safe for NUL
++// terminated C strings.
++func (uc *IdCounter) Next() string {
++ return strconv.FormatUint(uc.NextInt(), 32)
++}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/proton/doc.go
----------------------------------------------------------------------
diff --cc proton/doc.go
index 34f85fe,0000000..51f70f8
mode 100644,000000..100644
--- a/proton/doc.go
+++ b/proton/doc.go
@@@ -1,33 -1,0 +1,70 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+/*
- Package proton is a Go binding for the proton AMQP protocol engine.
++Package proton is an event-driven, concurrent-unsafe Go library for AMQP messaging.
++You can write clients and servers using this library.
+
- It alows you to construct and parse AMQP messages, and to implement AMQP
- clients, servers and intermediaries that can exchange messages with any
- AMQP 1.0 compliant endpoint.
++This package is a port of the Proton C API into Go (see
++http://qpid.apache.org/proton) Go programmers may find the 'electron' package
++more convenient. qpid.apache.org/electron provides a concurrent-safe API that
++allows you to run procedural loops in arbitrary goroutines rather than
++implementing event handlers that must run in a single goroutine.
++
++Consult the C API documentation at http://qpid.apache.org/proton for more
++information about the types here. There is a 1-1 correspondence between C type
++pn_foo_t and Go type proton.Foo, and between C function
++
++ pn_foo_do_something(pn_foo_t*, ...)
++
++and Go method
++
++ func (proton.Foo) DoSomething(...)
++
++The proton.Engine type pumps data between a Go net.Conn and a proton event loop
++goroutine that feeds events to a proton.MessagingHandler, which you must implement.
++See the Engine documentation for more.
++
++MessagingHandler defines an event handling interface that you can implement to
++react to AMQP protocol events. There is also a lower-level EventHandler, but
++MessagingHandler provides a simpler set of events and automates common tasks for you,
++for most applications it will be more convenient.
++
++NOTE: Methods on most types defined in this package (Sessions, Links etc.) can
++*only* be called in the event handler goroutine of the relevant
++Connection/Engine, either by the HandleEvent method of a handler type or in a
++function injected into the goroutine via Inject() or InjectWait() Handlers and
++injected functions can set up channels to communicate with other goroutines.
++Note the Injecter associated with a handler available as part of the Event value
++passed to HandleEvent.
++
++Separate Engine instances are independent, and can run concurrently.
++
++The 'electron' package is built on the proton package but instead offers a
++concurrent-safe API that can use simple procedural loops rather than event
++handlers to express application logic. It is easier to use for most
++applications.
+
- Encoding and decoding AMQP data follows the pattern of the standard
- encoding/json and encoding/xml packages.The mapping between AMQP and Go types is
- described in the documentation of the Marshal and Unmarshal functions.
+*/
+package proton
+
++// #cgo LDFLAGS: -lqpid-proton
++import "C"
++
+// This file is just for the package comment.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/8dc93cd0/proton/engine.go
----------------------------------------------------------------------
diff --cc proton/engine.go
index 0000000,0000000..2cebb49
new file mode 100644
--- /dev/null
+++ b/proton/engine.go
@@@ -1,0 -1,0 +1,402 @@@
++/*
++Licensed to the Apache Software Foundation (ASF) under one
++or more contributor license agreements. See the NOTICE file
++distributed with this work for additional information
++regarding copyright ownership. The ASF licenses this file
++to you under the Apache License, Version 2.0 (the
++"License"); you may not use this file except in compliance
++with the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++Unless required by applicable law or agreed to in writing,
++software distributed under the License is distributed on an
++"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
++KIND, either express or implied. See the License for the
++specific language governing permissions and limitations
++under the License.
++*/
++
++package proton
++
++// #include <proton/connection.h>
++// #include <proton/event.h>
++// #include <proton/handlers.h>
++// #include <proton/session.h>
++// #include <proton/transport.h>
++// #include <memory.h>
++// #include <stdlib.h>
++//
++// PN_HANDLE(REMOTE_ADDR)
++import "C"
++
++import (
++ "fmt"
++ "io"
++ "net"
++ "qpid.apache.org/internal"
++ "sync"
++ "unsafe"
++)
++
++// Injecter allows functions to be "injected" into the event-processing loop, to
++// be called in the same goroutine as event handlers.
++type Injecter interface {
++ // Inject a function into the engine goroutine.
++ //
++ // f() will be called in the same goroutine as event handlers, so it can safely
++ // use values belonging to event handlers without synchronization. f() should
++ // not block, no further events or injected functions can be processed until
++ // f() returns.
++ //
++ // Returns a non-nil error if the function could not be injected and will
++ // never be called. Otherwise the function will eventually be called.
++ //
++ // Note that proton values (Link, Session, Connection etc.) that existed when
++ // Inject(f) was called may have become invalid by the time f() is executed.
++ // Handlers should handle keep track of Closed events to ensure proton values
++ // are not used after they become invalid. One technique is to have map from
++ // proton values to application values. Check that the map has the correct
++ // proton/application value pair at the start of the injected function and
++ // delete the value from the map when handling a Closed event.
++ Inject(f func()) error
++
++ // InjectWait is like Inject but does not return till f() has completed.
++ // If f() cannot be injected it returns the error from Inject(), otherwise
++ // it returns the error from f()
++ InjectWait(f func() error) error
++}
++
++// bufferChan manages a pair of ping-pong buffers to pass bytes through a channel.
++type bufferChan struct {
++ buffers chan []byte
++ buf1, buf2 []byte
++}
++
++func newBufferChan(size int) *bufferChan {
++ return &bufferChan{make(chan []byte), make([]byte, size), make([]byte, size)}
++}
++
++func (b *bufferChan) buffer() []byte {
++ b.buf1, b.buf2 = b.buf2, b.buf1 // Alternate buffers.
++ return b.buf1[:cap(b.buf1)]
++}
++
++// Engine reads from a net.Conn, decodes AMQP events and calls the appropriate
++// Handler functions sequentially in a single goroutine. Actions taken by
++// Handler functions (such as sending messages) are encoded and written to the
++// net.Conn. You can create multiple Engines to handle multiple connections
++// concurrently.
++//
++// You implement the EventHandler and/or MessagingHandler interfaces and provide
++// those values to NewEngine(). Their HandleEvent method will be called in the
++// event-handling goroutine.
++//
++// Handlers can pass values from an event (Connections, Links, Deliveries etc.) to
++// other goroutines, store them, or use them as map indexes. Effectively they are
++// just pointers. Other goroutines cannot call their methods directly but they can
++// can create a function closure to call such methods and pass it to Engine.Inject()
++// to have it evaluated in the engine goroutine.
++//
++// You are responsible for ensuring you don't use an event value after it is
++// invalid. The handler methods will tell you when a value is no longer valid. For
++// example after a LinkClosed event, that link is no longer valid. If you do
++// Link.Close() yourself (in a handler or injected function) the link remains valid
++// until the corresponing LinkClosed event is received by the handler.
++//
++// Engine.Close() will take care of cleaning up any remaining values when you are
++// done with the Engine. All values associated with a engine become invalid when you
++// call Engine.Close()
++//
++// The qpid.apache.org/proton/concurrent package will do all this for you, so it
++// may be a better choice for some applications.
++//
++type Engine struct {
++ // Error is set on exit from Run() if there was an error.
++ err internal.ErrorHolder
++ inject chan func()
++
++ conn net.Conn
++ connection Connection
++ transport Transport
++ collector *C.pn_collector_t
++ read *bufferChan // Read buffers channel.
++ write *bufferChan // Write buffers channel.
++ handlers []EventHandler // Handlers for proton events.
++ running chan struct{} // This channel will be closed when the goroutines are done.
++ closeOnce sync.Once
++}
++
++const bufferSize = 4096
++
++// Map of Connection to *Engine
++var engines = internal.MakeSafeMap()
++
++// NewEngine initializes a engine with a connection and handlers. To start it running:
++// eng := NewEngine(...)
++// go run eng.Run()
++// The goroutine will exit when the engine is closed or disconnected.
++// You can check for errors on Engine.Error.
++//
++func NewEngine(conn net.Conn, handlers ...EventHandler) (*Engine, error) {
++ // Save the connection ID for Connection.String()
++ eng := &Engine{
++ inject: make(chan func()),
++ conn: conn,
++ transport: Transport{C.pn_transport()},
++ connection: Connection{C.pn_connection()},
++ collector: C.pn_collector(),
++ handlers: handlers,
++ read: newBufferChan(bufferSize),
++ write: newBufferChan(bufferSize),
++ running: make(chan struct{}),
++ }
++ if eng.transport.IsNil() || eng.connection.IsNil() || eng.collector == nil {
++ return nil, internal.Errorf("failed to allocate engine")
++ }
++
++ // TODO aconway 2015-06-25: connection settings for user, password, container etc.
++ // before transport.Bind() Set up connection before Engine, allow Engine or Reactor
++ // to run connection.
++
++ // Unique container-id by default.
++ eng.connection.SetContainer(internal.UUID4().String())
++ pnErr := eng.transport.Bind(eng.connection)
++ if pnErr != 0 {
++ return nil, internal.Errorf("cannot setup engine: %s", internal.PnErrorCode(pnErr))
++ }
++ C.pn_connection_collect(eng.connection.pn, eng.collector)
++ eng.connection.Open()
++ connectionContexts.Put(eng.connection, connectionContext{eng.String()})
++ return eng, nil
++}
++
++func (eng *Engine) String() string {
++ return fmt.Sprintf("%s-%s", eng.conn.LocalAddr(), eng.conn.RemoteAddr())
++}
++
++func (eng *Engine) Id() string {
++ return fmt.Sprintf("%eng", &eng)
++}
++
++func (eng *Engine) Error() error {
++ return eng.err.Get()
++}
++
++// Inject a function into the Engine's event loop.
++//
++// f() will be called in the same event-processing goroutine that calls Handler
++// methods. f() can safely call methods on values that belong to this engine
++// (Sessions, Links etc)
++//
++// The injected function has no parameters or return values. It is normally a
++// closure and can use channels to communicate with the injecting goroutine if
++// necessary.
++//
++// Returns a non-nil error if the engine is closed before the function could be
++// injected.
++func (eng *Engine) Inject(f func()) error {
++ select {
++ case eng.inject <- f:
++ return nil
++ case <-eng.running:
++ return eng.Error()
++ }
++}
++
++// InjectWait is like Inject but does not return till f() has completed or the
++// engine is closed, and returns an error value from f()
++func (eng *Engine) InjectWait(f func() error) error {
++ done := make(chan error)
++ defer close(done)
++ err := eng.Inject(func() { done <- f() })
++ if err != nil {
++ return err
++ }
++ select {
++ case <-eng.running:
++ return eng.Error()
++ case err := <-done:
++ return err
++ }
++}
++
++// Server puts the Engine in server mode, meaning it will auto-detect security settings on
++// the incoming connnection such as use of SASL and SSL.
++// Must be called before Run()
++//
++func (eng *Engine) Server() { eng.transport.SetServer() }
++
++// Close the engine's connection, returns when the engine has exited.
++func (eng *Engine) Close(err error) {
++ eng.err.Set(err)
++ eng.Inject(func() {
++ CloseError(eng.connection, err)
++ })
++ <-eng.running
++}
++
++// Disconnect the engine's connection without and AMQP close, returns when the engine has exited.
++func (eng *Engine) Disconnect(err error) {
++ eng.err.Set(err)
++ eng.conn.Close()
++ <-eng.running
++}
++
++// Run the engine. Engine.Run() will exit when the engine is closed or
++// disconnected. You can check for errors after exit with Engine.Error().
++//
++func (eng *Engine) Run() error {
++ wait := sync.WaitGroup{}
++ wait.Add(2) // Read and write goroutines
++
++ readErr := make(chan error, 1) // Don't block
++ go func() { // Read goroutine
++ defer wait.Done()
++ for {
++ rbuf := eng.read.buffer()
++ n, err := eng.conn.Read(rbuf)
++ if n > 0 {
++ eng.read.buffers <- rbuf[:n]
++ }
++ if err != nil {
++ readErr <- err
++ close(readErr)
++ close(eng.read.buffers)
++ return
++ }
++ }
++ }()
++
++ writeErr := make(chan error, 1) // Don't block
++ go func() { // Write goroutine
++ defer wait.Done()
++ for {
++ wbuf, ok := <-eng.write.buffers
++ if !ok {
++ return
++ }
++ _, err := eng.conn.Write(wbuf)
++ if err != nil {
++ writeErr <- err
++ close(writeErr)
++ return
++ }
++ }
++ }()
++
++ wbuf := eng.write.buffer()[:0]
++
++ for eng.err.Get() == nil {
++ if len(wbuf) == 0 {
++ eng.pop(&wbuf)
++ }
++ // Don't set wchan unless there is something to write.
++ var wchan chan []byte
++ if len(wbuf) > 0 {
++ wchan = eng.write.buffers
++ }
++
++ select {
++ case buf, ok := <-eng.read.buffers: // Read a buffer
++ if ok {
++ eng.push(buf)
++ }
++ case wchan <- wbuf: // Write a buffer
++ wbuf = eng.write.buffer()[:0]
++ case f, ok := <-eng.inject: // Function injected from another goroutine
++ if ok {
++ f()
++ }
++ case err := <-readErr:
++ eng.netError(err)
++ case err := <-writeErr:
++ eng.netError(err)
++ }
++ eng.process()
++ }
++ close(eng.write.buffers)
++ eng.conn.Close() // Make sure connection is closed
++ wait.Wait()
++ close(eng.running) // Signal goroutines have exited and Error is set.
++
++ connectionContexts.Delete(eng.connection)
++ if !eng.connection.IsNil() {
++ eng.connection.Free()
++ }
++ if !eng.transport.IsNil() {
++ eng.transport.Free()
++ }
++ if eng.collector != nil {
++ C.pn_collector_free(eng.collector)
++ }
++ for _, h := range eng.handlers {
++ switch h := h.(type) {
++ case cHandler:
++ C.pn_handler_free(h.pn)
++ }
++ }
++ return eng.err.Get()
++}
++
++func (eng *Engine) netError(err error) {
++ eng.err.Set(err)
++ eng.transport.CloseHead()
++ eng.transport.CloseTail()
++}
++
++func minInt(a, b int) int {
++ if a < b {
++ return a
++ } else {
++ return b
++ }
++}
++
++func (eng *Engine) pop(buf *[]byte) {
++ pending := int(eng.transport.Pending())
++ switch {
++ case pending == int(C.PN_EOS):
++ *buf = (*buf)[:]
++ return
++ case pending < 0:
++ panic(internal.Errorf("%s", internal.PnErrorCode(pending)))
++ }
++ size := minInt(pending, cap(*buf))
++ *buf = (*buf)[:size]
++ if size == 0 {
++ return
++ }
++ C.memcpy(unsafe.Pointer(&(*buf)[0]), eng.transport.Head(), C.size_t(size))
++ internal.Assert(size > 0)
++ eng.transport.Pop(uint(size))
++}
++
++func (eng *Engine) push(buf []byte) {
++ buf2 := buf
++ for len(buf2) > 0 {
++ n := eng.transport.Push(buf2)
++ if n <= 0 {
++ panic(internal.Errorf("error in transport: %s", internal.PnErrorCode(n)))
++ }
++ buf2 = buf2[n:]
++ }
++}
++
++func (eng *Engine) handle(e Event) {
++ for _, h := range eng.handlers {
++ h.HandleEvent(e)
++ }
++ if e.Type() == ETransportClosed {
++ eng.err.Set(io.EOF)
++ }
++}
++
++func (eng *Engine) process() {
++ for ce := C.pn_collector_peek(eng.collector); ce != nil; ce = C.pn_collector_peek(eng.collector) {
++ eng.handle(makeEvent(ce, eng))
++ C.pn_collector_pop(eng.collector)
++ }
++}
++
++func (eng *Engine) Connection() Connection { return eng.connection }
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org