You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2015/11/23 18:58:17 UTC
[49/50] [abbrv] qpid-proton git commit: Merge branch 'master' into go1
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/link.go
----------------------------------------------------------------------
diff --cc electron/link.go
index 4bef53b,0000000..91efa8e
mode 100644,000000..100644
--- a/electron/link.go
+++ b/electron/link.go
@@@ -1,247 -1,0 +1,245 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
- "qpid.apache.org/internal"
++ "fmt"
+ "qpid.apache.org/proton"
+)
+
+// Link is the common interface for AMQP links. Sender and Receiver provide
+// more methods for the sending or receiving end of a link respectively.
+type Link interface {
+ Endpoint
+
+ // Source address that messages are coming from.
+ Source() string
+
+ // Target address that messages are going to.
+ Target() string
+
+ // Name is a unique name for the link among links between the same
+ // containers in the same direction. By default generated automatically.
+ LinkName() string
+
+ // IsSender is true if this is the sending end of the link.
+ IsSender() bool
+
+ // IsReceiver is true if this is the receiving end of the link.
+ IsReceiver() bool
+
+ // SndSettle defines when the sending end of the link settles message delivery.
+ SndSettle() SndSettleMode
+
+ // RcvSettle defines when the sending end of the link settles message delivery.
+ RcvSettle() RcvSettleMode
+
+ // Session containing the Link
+ Session() Session
+
+ // Called in event loop on closed event.
+ closed(err error)
+ // Called to open a link (local or accepted incoming link)
+ open()
+}
+
- // LinkSetting can be passed when creating a sender or receiver.
- // See functions that return LinkSetting for details
- type LinkSetting func(*link)
++// LinkOption can be passed when creating a sender or receiver link to set optional configuration.
++type LinkOption func(*link)
+
- // Source sets address that messages are coming from.
- func Source(s string) LinkSetting { return func(l *link) { l.source = s } }
++// Source returns a LinkOption that sets address that messages are coming from.
++func Source(s string) LinkOption { return func(l *link) { l.source = s } }
+
- // Target sets address that messages are going to.
- func Target(s string) LinkSetting { return func(l *link) { l.target = s } }
++// Target returns a LinkOption that sets address that messages are going to.
++func Target(s string) LinkOption { return func(l *link) { l.target = s } }
+
- // LinkName sets the link name.
- func LinkName(s string) LinkSetting { return func(l *link) { l.target = s } }
++// LinkName returns a LinkOption that sets the link name.
++func LinkName(s string) LinkOption { return func(l *link) { l.target = s } }
+
- // SndSettle sets the send settle mode
- func SndSettle(m SndSettleMode) LinkSetting { return func(l *link) { l.sndSettle = m } }
++// SndSettle returns a LinkOption that sets the send settle mode
++func SndSettle(m SndSettleMode) LinkOption { return func(l *link) { l.sndSettle = m } }
+
- // RcvSettle sets the send settle mode
- func RcvSettle(m RcvSettleMode) LinkSetting { return func(l *link) { l.rcvSettle = m } }
++// RcvSettle returns a LinkOption that sets the send settle mode
++func RcvSettle(m RcvSettleMode) LinkOption { return func(l *link) { l.rcvSettle = m } }
+
- // SndSettleMode defines when the sending end of the link settles message delivery.
++// SndSettleMode returns a LinkOption that defines when the sending end of the
++// link settles message delivery.
+type SndSettleMode proton.SndSettleMode
+
- // Capacity sets the link capacity
- func Capacity(n int) LinkSetting { return func(l *link) { l.capacity = n } }
++// Capacity returns a LinkOption that sets the link capacity
++func Capacity(n int) LinkOption { return func(l *link) { l.capacity = n } }
+
- // Prefetch sets a receivers pre-fetch flag. Not relevant for a sender.
- func Prefetch(p bool) LinkSetting { return func(l *link) { l.prefetch = p } }
++// Prefetch returns a LinkOption that sets a receivers pre-fetch flag. Not relevant for a sender.
++func Prefetch(p bool) LinkOption { return func(l *link) { l.prefetch = p } }
+
- // AtMostOnce sets "fire and forget" mode, messages are sent but no
- // acknowledgment is received, messages can be lost if there is a network
- // failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst
- func AtMostOnce() LinkSetting {
++// AtMostOnce returns a LinkOption that sets "fire and forget" mode, messages
++// are sent but no acknowledgment is received, messages can be lost if there is
++// a network failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst
++func AtMostOnce() LinkOption {
+ return func(l *link) {
+ SndSettle(SndSettled)(l)
+ RcvSettle(RcvFirst)(l)
+ }
+}
+
- // AtLeastOnce requests acknowledgment for every message, acknowledgment
- // indicates the message was definitely received. In the event of a
- // failure, unacknowledged messages can be re-sent but there is a chance
- // that the message will be received twice in this case.
- // Sets SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst
- func AtLeastOnce() LinkSetting {
++// AtLeastOnce returns a LinkOption that requests acknowledgment for every
++// message, acknowledgment indicates the message was definitely received. In the
++// event of a failure, unacknowledged messages can be re-sent but there is a
++// chance that the message will be received twice in this case. Sets
++// SndSettleMode=SndUnsettled and RcvSettleMode=RcvFirst
++func AtLeastOnce() LinkOption {
+ return func(l *link) {
+ SndSettle(SndUnsettled)(l)
+ RcvSettle(RcvFirst)(l)
+ }
+}
+
+const (
+ // Messages are sent unsettled
+ SndUnsettled = SndSettleMode(proton.SndUnsettled)
+ // Messages are sent already settled
+ SndSettled = SndSettleMode(proton.SndSettled)
+ // Sender can send either unsettled or settled messages.
+ SendMixed = SndSettleMode(proton.SndMixed)
+)
+
+// RcvSettleMode defines when the receiving end of the link settles message delivery.
+type RcvSettleMode proton.RcvSettleMode
+
+const (
+ // Receiver settles first.
+ RcvFirst = RcvSettleMode(proton.RcvFirst)
+ // Receiver waits for sender to settle before settling.
+ RcvSecond = RcvSettleMode(proton.RcvSecond)
+)
+
+type link struct {
+ endpoint
+
+ // Link settings.
+ source string
+ target string
+ linkName string
+ isSender bool
+ sndSettle SndSettleMode
+ rcvSettle RcvSettleMode
+ capacity int
+ prefetch bool
+
+ session *session
+ eLink proton.Link
- done chan struct{} // Closed when link is closed
+}
+
+func (l *link) Source() string { return l.source }
+func (l *link) Target() string { return l.target }
+func (l *link) LinkName() string { return l.linkName }
+func (l *link) IsSender() bool { return l.isSender }
+func (l *link) IsReceiver() bool { return !l.isSender }
+func (l *link) SndSettle() SndSettleMode { return l.sndSettle }
+func (l *link) RcvSettle() RcvSettleMode { return l.rcvSettle }
+func (l *link) Session() Session { return l.session }
+func (l *link) Connection() Connection { return l.session.Connection() }
+
+func (l *link) engine() *proton.Engine { return l.session.connection.engine }
+func (l *link) handler() *handler { return l.session.connection.handler }
+
+// Set up link fields and open the proton.Link
- func localLink(sn *session, isSender bool, setting ...LinkSetting) (link, error) {
++func localLink(sn *session, isSender bool, setting ...LinkOption) (link, error) {
+ l := link{
+ session: sn,
+ isSender: isSender,
+ capacity: 1,
+ prefetch: false,
- done: make(chan struct{}),
+ }
+ for _, set := range setting {
+ set(&l)
+ }
+ if l.linkName == "" {
+ l.linkName = l.session.connection.container.nextLinkName()
+ }
+ if l.IsSender() {
+ l.eLink = l.session.eSession.Sender(l.linkName)
+ } else {
+ l.eLink = l.session.eSession.Receiver(l.linkName)
+ }
+ if l.eLink.IsNil() {
- l.err.Set(internal.Errorf("cannot create link %s", l))
++ l.err.Set(fmt.Errorf("cannot create link %s", l))
+ return l, l.err.Get()
+ }
+ l.eLink.Source().SetAddress(l.source)
+ l.eLink.Target().SetAddress(l.target)
+ l.eLink.SetSndSettleMode(proton.SndSettleMode(l.sndSettle))
+ l.eLink.SetRcvSettleMode(proton.RcvSettleMode(l.rcvSettle))
- l.str = l.eLink.String()
+ l.eLink.Open()
++ l.endpoint = makeEndpoint(l.eLink.String())
+ return l, nil
+}
+
+type incomingLink struct {
+ incoming
+ link
+}
+
+// Set up a link from an incoming proton.Link.
+func makeIncomingLink(sn *session, eLink proton.Link) incomingLink {
+ l := incomingLink{
++ incoming: makeIncoming(eLink),
+ link: link{
+ session: sn,
+ isSender: eLink.IsSender(),
+ eLink: eLink,
+ source: eLink.RemoteSource().Address(),
+ target: eLink.RemoteTarget().Address(),
+ linkName: eLink.Name(),
+ sndSettle: SndSettleMode(eLink.RemoteSndSettleMode()),
+ rcvSettle: RcvSettleMode(eLink.RemoteRcvSettleMode()),
+ capacity: 1,
+ prefetch: false,
- done: make(chan struct{}),
++ endpoint: makeEndpoint(eLink.String()),
+ },
+ }
- l.str = eLink.String()
+ return l
+}
+
- // Called in proton goroutine on closed or disconnected
- func (l *link) closed(err error) {
- l.err.Set(err)
- l.err.Set(Closed) // If no error set, mark as closed.
- close(l.done)
- }
-
+// Not part of Link interface but use by Sender and Receiver.
+func (l *link) Credit() (credit int, err error) {
+ err = l.engine().InjectWait(func() error {
++ if l.Error() != nil {
++ return l.Error()
++ }
+ credit = l.eLink.Credit()
+ return nil
+ })
+ return
+}
+
+// Not part of Link interface but use by Sender and Receiver.
+func (l *link) Capacity() int { return l.capacity }
+
+func (l *link) Close(err error) {
- l.engine().Inject(func() { localClose(l.eLink, err) })
++ l.engine().Inject(func() {
++ if l.Error() == nil {
++ localClose(l.eLink, err)
++ }
++ })
+}
+
+func (l *link) open() {
+ l.eLink.Open()
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/messaging_test.go
----------------------------------------------------------------------
diff --cc electron/messaging_test.go
index 36b0c24,0000000..5af57e8
mode 100644,000000..100644
--- a/electron/messaging_test.go
+++ b/electron/messaging_test.go
@@@ -1,416 -1,0 +1,426 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
+ "fmt"
+ "net"
+ "path"
+ "qpid.apache.org/amqp"
+ "runtime"
+ "testing"
+ "time"
+)
+
+func fatalIf(t *testing.T, err error) {
+ if err != nil {
+ _, file, line, ok := runtime.Caller(1) // annotate with location of caller.
+ if ok {
+ _, file = path.Split(file)
+ }
+ t.Fatalf("(from %s:%d) %v", file, line, err)
+ }
+}
+
- // Start a server, return listening addr and channel for incoming Connection.
- func newServer(t *testing.T, cont Container, accept func(Endpoint) error) (net.Addr, <-chan Connection) {
++// Start a server, return listening addr and channel for incoming Connections.
++func newServer(t *testing.T, cont Container) (net.Addr, <-chan Connection) {
+ listener, err := net.Listen("tcp", "")
+ fatalIf(t, err)
+ addr := listener.Addr()
+ ch := make(chan Connection)
+ go func() {
+ conn, err := listener.Accept()
- c, err := cont.Connection(conn)
++ c, err := cont.Connection(conn, Server(), AllowIncoming())
+ fatalIf(t, err)
- c.Server()
- c.Listen(accept)
- fatalIf(t, c.Open())
+ ch <- c
+ }()
+ return addr, ch
+}
+
- // Return open an client connection and session, return the session.
++// Open a client connection and session, return the session.
+func newClient(t *testing.T, cont Container, addr net.Addr) Session {
+ conn, err := net.Dial(addr.Network(), addr.String())
+ fatalIf(t, err)
+ c, err := cont.Connection(conn)
+ fatalIf(t, err)
- c.Open()
+ sn, err := c.Session()
+ fatalIf(t, err)
+ return sn
+}
+
+// Return client and server ends of the same connection.
- func newClientServer(t *testing.T, accept func(Endpoint) error) (client Session, server Connection) {
- addr, ch := newServer(t, NewContainer(""), accept)
- client = newClient(t, NewContainer(""), addr)
++func newClientServer(t *testing.T) (client Session, server Connection) {
++ addr, ch := newServer(t, NewContainer("test-server"))
++ client = newClient(t, NewContainer("test-client"), addr)
+ return client, <-ch
+}
+
+// Close client and server
+func closeClientServer(client Session, server Connection) {
+ client.Connection().Close(nil)
+ server.Close(nil)
+}
+
+// Send a message one way with a client sender and server receiver, verify ack.
+func TestClientSendServerReceive(t *testing.T) {
- timeout := time.Second * 2
+ nLinks := 3
+ nMessages := 3
+
+ rchan := make(chan Receiver, nLinks)
- client, server := newClientServer(t, func(ep Endpoint) error {
- if r, ok := ep.(Receiver); ok {
- r.SetCapacity(1, false)
- rchan <- r
++ client, server := newClientServer(t)
++ go func() {
++ for in := range server.Incoming() {
++ switch in := in.(type) {
++ case *IncomingReceiver:
++ in.SetCapacity(1)
++ in.SetPrefetch(false)
++ rchan <- in.Accept().(Receiver)
++ default:
++ in.Accept()
++ }
+ }
- return nil
- })
-
- defer func() {
- closeClientServer(client, server)
+ }()
+
++ defer func() { closeClientServer(client, server) }()
++
+ s := make([]Sender, nLinks)
+ for i := 0; i < nLinks; i++ {
+ var err error
+ s[i], err = client.Sender(Target(fmt.Sprintf("foo%d", i)))
+ if err != nil {
+ t.Fatal(err)
+ }
+ }
+ r := make([]Receiver, nLinks)
+ for i := 0; i < nLinks; i++ {
+ r[i] = <-rchan
+ }
+
+ for i := 0; i < nLinks; i++ {
+ for j := 0; j < nMessages; j++ {
- var sm SentMessage
-
+ // Client send
++ ack := make(chan Outcome, 1)
+ sendDone := make(chan struct{})
+ go func() {
+ defer close(sendDone)
+ m := amqp.NewMessageWith(fmt.Sprintf("foobar%v-%v", i, j))
+ var err error
- sm, err = s[i].Send(m)
++ s[i].SendAsync(m, ack, "testing")
+ if err != nil {
+ t.Fatal(err)
+ }
+ }()
+
+ // Server recieve
+ rm, err := r[i].Receive()
+ if err != nil {
+ t.Fatal(err)
+ }
+ if want, got := interface{}(fmt.Sprintf("foobar%v-%v", i, j)), rm.Message.Body(); want != got {
+ t.Errorf("%#v != %#v", want, got)
+ }
+
+ // Should not be acknowledged on client yet
+ <-sendDone
- if d, err := sm.DispositionTimeout(0); err != Timeout || NoDisposition != d {
- t.Errorf("want [no-disposition/timeout] got [%v/%v]", d, err)
++ select {
++ case <-ack:
++ t.Errorf("unexpected ack")
++ default:
+ }
- // Server ack
- if err := rm.Acknowledge(Rejected); err != nil {
++
++ // Server send ack
++ if err := rm.Reject(); err != nil {
+ t.Error(err)
+ }
+ // Client get ack.
- if d, err := sm.DispositionTimeout(timeout); err != nil || Rejected != d {
- t.Errorf("want [rejected/nil] got [%v/%v]", d, err)
++ if a := <-ack; a.Value != "testing" || a.Error != nil || a.Status != Rejected {
++ t.Error("unexpected ack: ", a.Status, a.Error, a.Value)
+ }
+ }
+ }
+}
+
+func TestClientReceiver(t *testing.T) {
+ nMessages := 3
- client, server := newClientServer(t, func(ep Endpoint) error {
- if s, ok := ep.(Sender); ok {
- go func() {
- for i := int32(0); i < int32(nMessages); i++ {
- sm, err := s.Send(amqp.NewMessageWith(i))
- if err != nil {
- t.Error(err)
- return
- } else {
- sm.Disposition() // Sync send.
++ client, server := newClientServer(t)
++ go func() {
++ for in := range server.Incoming() {
++ switch in := in.(type) {
++ case *IncomingSender:
++ s := in.Accept().(Sender)
++ go func() {
++ for i := int32(0); i < int32(nMessages); i++ {
++ out := s.SendSync(amqp.NewMessageWith(i))
++ if out.Error != nil {
++ t.Error(out.Error)
++ return
++ }
+ }
- }
- s.Close(nil)
- }()
++ s.Close(nil)
++ }()
++ default:
++ in.Accept()
++ }
+ }
- return nil
- })
++ }()
+
+ r, err := client.Receiver(Source("foo"))
+ if err != nil {
+ t.Fatal(err)
+ }
+ for i := int32(0); i < int32(nMessages); i++ {
+ rm, err := r.Receive()
+ if err != nil {
+ if err != Closed {
+ t.Error(err)
+ }
+ break
+ }
+ if err := rm.Accept(); err != nil {
+ t.Error(err)
+ }
+ if b, ok := rm.Message.Body().(int32); !ok || b != i {
+ t.Errorf("want %v, true got %v, %v", i, b, ok)
+ }
+ }
+ server.Close(nil)
+ client.Connection().Close(nil)
+}
+
+// Test timeout versions of waiting functions.
+func TestTimeouts(t *testing.T) {
+ var err error
+ rchan := make(chan Receiver, 1)
- client, server := newClientServer(t, func(ep Endpoint) error {
- if r, ok := ep.(Receiver); ok {
- r.SetCapacity(1, false) // Issue credit only on receive
- rchan <- r
++ client, server := newClientServer(t)
++ go func() {
++ for i := range server.Incoming() {
++ switch i := i.(type) {
++ case *IncomingReceiver:
++ i.SetCapacity(1)
++ i.SetPrefetch(false)
++ rchan <- i.Accept().(Receiver) // Issue credit only on receive
++ default:
++ i.Accept()
++ }
+ }
- return nil
- })
++ }()
+ defer func() { closeClientServer(client, server) }()
+
+ // Open client sender
+ snd, err := client.Sender(Target("test"))
+ if err != nil {
+ t.Fatal(err)
+ }
+ rcv := <-rchan
+
+ // Test send with timeout
+ short := time.Millisecond
+ long := time.Second
+ m := amqp.NewMessage()
- if _, err = snd.SendTimeout(m, 0); err != Timeout { // No credit, expect timeout.
++ if err := snd.SendSyncTimeout(m, 0).Error; err != Timeout { // No credit, expect timeout.
+ t.Error("want Timeout got", err)
+ }
- if _, err = snd.SendTimeout(m, short); err != Timeout { // No credit, expect timeout.
++ if err := snd.SendSyncTimeout(m, short).Error; err != Timeout { // No credit, expect timeout.
+ t.Error("want Timeout got", err)
+ }
+ // Test receive with timeout
+ if _, err = rcv.ReceiveTimeout(0); err != Timeout { // No credit, expect timeout.
+ t.Error("want Timeout got", err)
+ }
+ // Test receive with timeout
+ if _, err = rcv.ReceiveTimeout(short); err != Timeout { // No credit, expect timeout.
+ t.Error("want Timeout got", err)
+ }
+ // There is now a credit on the link due to receive
- sm, err := snd.SendTimeout(m, long)
- if err != nil {
- t.Fatal(err)
- }
++ ack := make(chan Outcome)
++ snd.SendAsyncTimeout(m, ack, nil, short)
+ // Disposition should timeout
- if _, err = sm.DispositionTimeout(long); err != Timeout {
- t.Error("want Timeout got", err)
- }
- if _, err = sm.DispositionTimeout(short); err != Timeout {
- t.Error("want Timeout got", err)
++ select {
++ case <-ack:
++ t.Errorf("want Timeout got %#v", ack)
++ case <-time.After(short):
+ }
++
+ // Receive and accept
+ rm, err := rcv.ReceiveTimeout(long)
+ if err != nil {
+ t.Fatal(err)
+ }
+ rm.Accept()
+ // Sender get ack
- d, err := sm.DispositionTimeout(long)
- if err != nil || d != Accepted {
- t.Errorf("want (rejected, nil) got (%v, %v)", d, err)
++ if a := <-ack; a.Status != Accepted || a.Error != nil {
++ t.Errorf("want (accepted, nil) got %#v", a)
+ }
+}
+
+// clientServer that returns sender/receiver pairs at opposite ends of link.
+type pairs struct {
+ t *testing.T
+ client Session
+ server Connection
+ rchan chan Receiver
+ schan chan Sender
+}
+
+func newPairs(t *testing.T) *pairs {
+ p := &pairs{t: t, rchan: make(chan Receiver, 1), schan: make(chan Sender, 1)}
- p.client, p.server = newClientServer(t, func(ep Endpoint) error {
- switch ep := ep.(type) {
- case Receiver:
- ep.SetCapacity(1, false)
- p.rchan <- ep
- case Sender:
- p.schan <- ep
++ p.client, p.server = newClientServer(t)
++ go func() {
++ for i := range p.server.Incoming() {
++ switch i := i.(type) {
++ case *IncomingReceiver:
++ i.SetCapacity(1)
++ i.SetPrefetch(false)
++ p.rchan <- i.Accept().(Receiver)
++ case *IncomingSender:
++ p.schan <- i.Accept().(Sender)
++ default:
++ i.Accept()
++ }
+ }
- return nil
- })
++ }()
+ return p
+}
+
+func (p *pairs) close() {
+ closeClientServer(p.client, p.server)
+}
+
+func (p *pairs) senderReceiver() (Sender, Receiver) {
+ snd, err := p.client.Sender()
+ fatalIf(p.t, err)
+ rcv := <-p.rchan
+ return snd, rcv
+}
+
+func (p *pairs) receiverSender() (Receiver, Sender) {
+ rcv, err := p.client.Receiver()
+ fatalIf(p.t, err)
+ snd := <-p.schan
+ return rcv, snd
+}
+
+type result struct {
+ label string
+ err error
+}
+
+func (r result) String() string { return fmt.Sprintf("%s(%s)", r.err, r.label) }
+
+func doSend(snd Sender, results chan result) {
- _, err := snd.Send(amqp.NewMessage())
++ err := snd.SendSync(amqp.NewMessage()).Error
+ results <- result{"send", err}
+}
+
+func doReceive(rcv Receiver, results chan result) {
+ _, err := rcv.Receive()
+ results <- result{"receive", err}
+}
+
- func doDisposition(sm SentMessage, results chan result) {
- _, err := sm.Disposition()
- results <- result{"disposition", err}
++func doDisposition(ack <-chan Outcome, results chan result) {
++ results <- result{"disposition", (<-ack).Error}
+}
+
+// Test that closing Links interrupts blocked link functions.
+func TestLinkCloseInterrupt(t *testing.T) {
+ want := amqp.Errorf("x", "all bad")
+ pairs := newPairs(t)
+ results := make(chan result) // Collect expected errors
+
+ // Sender.Close() interrupts Send()
+ snd, rcv := pairs.senderReceiver()
+ go doSend(snd, results)
+ snd.Close(want)
+ if r := <-results; want != r.err {
+ t.Errorf("want %#v got %#v", want, r)
+ }
+
+ // Remote Receiver.Close() interrupts Send()
+ snd, rcv = pairs.senderReceiver()
+ go doSend(snd, results)
+ rcv.Close(want)
+ if r := <-results; want != r.err {
+ t.Errorf("want %#v got %#v", want, r)
+ }
+
+ // Receiver.Close() interrupts Receive()
+ snd, rcv = pairs.senderReceiver()
+ go doReceive(rcv, results)
+ rcv.Close(want)
+ if r := <-results; want != r.err {
+ t.Errorf("want %#v got %#v", want, r)
+ }
+
+ // Remote Sender.Close() interrupts Receive()
+ snd, rcv = pairs.senderReceiver()
+ go doReceive(rcv, results)
+ snd.Close(want)
+ if r := <-results; want != r.err {
+ t.Errorf("want %#v got %#v", want, r)
+ }
+}
+
+// Test closing the server end of a connection.
+func TestConnectionCloseInterrupt1(t *testing.T) {
+ want := amqp.Errorf("x", "bad")
+ pairs := newPairs(t)
+ results := make(chan result) // Collect expected errors
+
+ // Connection.Close() interrupts Send, Receive, Disposition.
+ snd, rcv := pairs.senderReceiver()
+ go doReceive(rcv, results)
- sm, err := snd.Send(amqp.NewMessage())
- fatalIf(t, err)
- go doDisposition(sm, results)
++ ack := snd.SendWaitable(amqp.NewMessage())
++ go doDisposition(ack, results)
+ snd, rcv = pairs.senderReceiver()
+ go doSend(snd, results)
+ rcv, snd = pairs.receiverSender()
+ go doReceive(rcv, results)
+ pairs.server.Close(want)
+ for i := 0; i < 3; i++ {
+ if r := <-results; want != r.err {
+ // TODO aconway 2015-10-06: Not propagating the correct error, seeing nil and EOF.
- t.Logf("want %v got %v", want, r)
++ t.Logf("want %v got %v", want, r.err)
+ }
+ }
+}
+
+// Test closing the client end of the connection.
+func TestConnectionCloseInterrupt2(t *testing.T) {
+ want := amqp.Errorf("x", "bad")
+ pairs := newPairs(t)
+ results := make(chan result) // Collect expected errors
+
+ // Connection.Close() interrupts Send, Receive, Disposition.
+ snd, rcv := pairs.senderReceiver()
+ go doReceive(rcv, results)
- sm, err := snd.Send(amqp.NewMessage())
- fatalIf(t, err)
- go doDisposition(sm, results)
++ ack := snd.SendWaitable(amqp.NewMessage())
++ go doDisposition(ack, results)
+ snd, rcv = pairs.senderReceiver()
+ go doSend(snd, results)
+ rcv, snd = pairs.receiverSender()
+ go doReceive(rcv, results)
+ pairs.client.Close(want)
+ for i := 0; i < 3; i++ {
+ if r := <-results; want != r.err {
+ // TODO aconway 2015-10-06: Not propagating the correct error, seeing nil.
- t.Logf("want %v got %v", want, r)
++ t.Logf("want %v got %v", want, r.err)
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/receiver.go
----------------------------------------------------------------------
diff --cc electron/receiver.go
index 59ac018,0000000..22bdc7e
mode 100644,000000..100644
--- a/electron/receiver.go
+++ b/electron/receiver.go
@@@ -1,238 -1,0 +1,244 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
++ "fmt"
+ "qpid.apache.org/amqp"
- "qpid.apache.org/internal"
+ "qpid.apache.org/proton"
+ "time"
+)
+
+// Receiver is a Link that receives messages.
+//
+type Receiver interface {
+ Link
+
+ // Receive blocks until a message is available or until the Receiver is closed
+ // and has no more buffered messages.
+ Receive() (ReceivedMessage, error)
+
+ // ReceiveTimeout is like Receive but gives up after timeout, see Timeout.
+ //
+ // Note that that if Prefetch is false, after a Timeout the credit issued by
+ // Receive remains on the link. It will be used by the next call to Receive.
+ ReceiveTimeout(timeout time.Duration) (ReceivedMessage, error)
+
+ // Prefetch==true means the Receiver will automatically issue credit to the
+ // remote sender to keep its buffer as full as possible, i.e. it will
+ // "pre-fetch" messages independently of the application calling
+ // Receive(). This gives good throughput for applications that handle a
+ // continuous stream of messages. Larger capacity may improve throughput, the
+ // optimal value depends on the characteristics of your application.
+ //
+ // Prefetch==false means the Receiver will issue only issue credit when you
+ // call Receive(), and will only issue enough credit to satisfy the calls
+ // actually made. This gives lower throughput but will not fetch any messages
+ // in advance. It is good for synchronous applications that need to evaluate
+ // each message before deciding whether to receive another. The
+ // request-response pattern is a typical example. If you make concurrent
+ // calls to Receive with pre-fetch disabled, you can improve performance by
+ // setting the capacity close to the expected number of concurrent calls.
+ //
+ Prefetch() bool
+
+ // Capacity is the size (number of messages) of the local message buffer
+ // These are messages received but not yet returned to the application by a call to Receive()
+ Capacity() int
+}
+
+// Flow control policy for a receiver.
+type policy interface {
+ // Called at the start of Receive() to adjust credit before fetching a message.
+ Pre(*receiver)
+ // Called after Receive() has received a message from Buffer() before it returns.
+ // Non-nil error means no message was received because of an error.
+ Post(*receiver, error)
+}
+
+type prefetchPolicy struct{}
+
+func (p prefetchPolicy) Flow(r *receiver) {
+ r.engine().Inject(func() {
++ if r.Error() != nil {
++ return
++ }
+ _, _, max := r.credit()
+ if max > 0 {
+ r.eLink.Flow(max)
+ }
+ })
+}
+func (p prefetchPolicy) Pre(r *receiver) { p.Flow(r) }
+func (p prefetchPolicy) Post(r *receiver, err error) {
+ if err == nil {
+ p.Flow(r)
+ }
+}
+
+type noPrefetchPolicy struct{ waiting int }
+
+func (p noPrefetchPolicy) Flow(r *receiver) { // Not called in proton goroutine
+ r.engine().Inject(func() {
++ if r.Error() != nil {
++ return
++ }
+ len, credit, max := r.credit()
+ add := p.waiting - (len + credit)
+ if add > max {
+ add = max // Don't overflow
+ }
+ if add > 0 {
+ r.eLink.Flow(add)
+ }
+ })
+}
+func (p noPrefetchPolicy) Pre(r *receiver) { p.waiting++; p.Flow(r) }
+func (p noPrefetchPolicy) Post(r *receiver, err error) {
+ p.waiting--
+ if err == nil {
+ p.Flow(r)
+ }
+}
+
+// Receiver implementation
+type receiver struct {
+ link
+ buffer chan ReceivedMessage
+ policy policy
+}
+
++// Call in proton goroutine
+func newReceiver(l link) *receiver {
+ r := &receiver{link: l}
+ if r.capacity < 1 {
+ r.capacity = 1
+ }
+ if r.prefetch {
+ r.policy = &prefetchPolicy{}
+ } else {
+ r.policy = &noPrefetchPolicy{}
+ }
+ r.buffer = make(chan ReceivedMessage, r.capacity)
+ r.handler().addLink(r.eLink, r)
+ r.link.open()
+ return r
+}
+
+// call in proton goroutine.
+func (r *receiver) credit() (buffered, credit, max int) {
+ return len(r.buffer), r.eLink.Credit(), cap(r.buffer) - len(r.buffer)
+}
+
+func (r *receiver) Capacity() int { return cap(r.buffer) }
+func (r *receiver) Prefetch() bool { return r.prefetch }
+
+func (r *receiver) Receive() (rm ReceivedMessage, err error) {
+ return r.ReceiveTimeout(Forever)
+}
+
+func (r *receiver) ReceiveTimeout(timeout time.Duration) (rm ReceivedMessage, err error) {
- internal.Assert(r.buffer != nil, "Receiver is not open: %s", r)
++ assert(r.buffer != nil, "Receiver is not open: %s", r)
+ r.policy.Pre(r)
+ defer func() { r.policy.Post(r, err) }()
+ rmi, err := timedReceive(r.buffer, timeout)
+ switch err {
+ case Timeout:
+ return ReceivedMessage{}, Timeout
+ case Closed:
+ return ReceivedMessage{}, r.Error()
+ default:
+ return rmi.(ReceivedMessage), nil
+ }
+}
+
+// Called in proton goroutine on MMessage event.
+func (r *receiver) message(delivery proton.Delivery) {
+ if r.eLink.State().RemoteClosed() {
+ localClose(r.eLink, r.eLink.RemoteCondition().Error())
+ return
+ }
+ if delivery.HasMessage() {
+ m, err := delivery.Message()
+ if err != nil {
+ localClose(r.eLink, err)
+ return
+ }
- internal.Assert(m != nil)
++ assert(m != nil)
+ r.eLink.Advance()
+ if r.eLink.Credit() < 0 {
- localClose(r.eLink, internal.Errorf("received message in excess of credit limit"))
++ localClose(r.eLink, fmt.Errorf("received message in excess of credit limit"))
+ } else {
+ // We never issue more credit than cap(buffer) so this will not block.
+ r.buffer <- ReceivedMessage{m, delivery, r}
+ }
+ }
+}
+
+func (r *receiver) closed(err error) {
+ r.link.closed(err)
+ if r.buffer != nil {
+ close(r.buffer)
+ }
+}
+
+// ReceivedMessage contains an amqp.Message and allows the message to be acknowledged.
+type ReceivedMessage struct {
+ // Message is the received message.
+ Message amqp.Message
+
+ eDelivery proton.Delivery
+ receiver Receiver
+}
+
- // Acknowledge a ReceivedMessage with the given disposition code.
- func (rm *ReceivedMessage) Acknowledge(disposition Disposition) error {
- return rm.receiver.(*receiver).engine().InjectWait(func() error {
- // Settle doesn't return an error but if the receiver is broken the settlement won't happen.
- rm.eDelivery.SettleAs(uint64(disposition))
- return rm.receiver.Error()
++// Acknowledge a ReceivedMessage with the given delivery status.
++func (rm *ReceivedMessage) acknowledge(status uint64) error {
++ return rm.receiver.(*receiver).engine().Inject(func() {
++ // Deliveries are valid as long as the connection is, unless settled.
++ rm.eDelivery.SettleAs(uint64(status))
+ })
+}
+
- // Accept is short for Acknowledge(Accpeted)
- func (rm *ReceivedMessage) Accept() error { return rm.Acknowledge(Accepted) }
++// Accept tells the sender that we take responsibility for processing the message.
++func (rm *ReceivedMessage) Accept() error { return rm.acknowledge(proton.Accepted) }
++
++// Reject tells the sender we consider the message invalid and unusable.
++func (rm *ReceivedMessage) Reject() error { return rm.acknowledge(proton.Rejected) }
+
- // Reject is short for Acknowledge(Rejected)
- func (rm *ReceivedMessage) Reject() error { return rm.Acknowledge(Rejected) }
++// Release tells the sender we will not process the message but some other
++// receiver might.
++func (rm *ReceivedMessage) Release() error { return rm.acknowledge(proton.Released) }
+
- // IncomingReceiver is passed to the accept() function given to Connection.Listen()
- // when there is an incoming request for a receiver link.
++// IncomingReceiver is sent on the Connection.Incoming() channel when there is
++// an incoming request to open a receiver link.
+type IncomingReceiver struct {
+ incomingLink
+}
+
- // Link provides information about the incoming link.
- func (i *IncomingReceiver) Link() Link { return i }
++// SetCapacity sets the capacity of the incoming receiver, call before Accept()
++func (in *IncomingReceiver) SetCapacity(capacity int) { in.capacity = capacity }
+
- // AcceptReceiver sets Capacity and Prefetch of the accepted Receiver.
- func (i *IncomingReceiver) AcceptReceiver(capacity int, prefetch bool) Receiver {
- i.capacity = capacity
- i.prefetch = prefetch
- return i.Accept().(Receiver)
- }
++// SetPrefetch sets the pre-fetch mode of the incoming receiver, call before Accept()
++func (in *IncomingReceiver) SetPrefetch(prefetch bool) { in.prefetch = prefetch }
+
- func (i *IncomingReceiver) Accept() Endpoint {
- i.accepted = true
- return newReceiver(i.link)
++// Accept accepts an incoming receiver endpoint
++func (in *IncomingReceiver) Accept() Endpoint {
++ return in.accept(func() Endpoint { return newReceiver(in.link) })
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/sender.go
----------------------------------------------------------------------
diff --cc electron/sender.go
index 68cfbb3,0000000..573e9da
mode 100644,000000..100644
--- a/electron/sender.go
+++ b/electron/sender.go
@@@ -1,315 -1,0 +1,274 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+// #include <proton/disposition.h>
+import "C"
+
+import (
+ "qpid.apache.org/amqp"
- "qpid.apache.org/internal"
+ "qpid.apache.org/proton"
- "reflect"
+ "time"
+)
+
+// Sender is a Link that sends messages.
++//
++// The result of sending a message is provided by an Outcome value.
++//
++// A sender can buffer messages up to the credit limit provided by the remote receiver.
++// Send* methods will block if the buffer is full until there is space.
++// Send*Timeout methods will give up after the timeout and set Timeout as Outcome.Error.
++//
+type Sender interface {
+ Link
+
- // Send a message without waiting for acknowledgement. Returns a SentMessage.
- // use SentMessage.Disposition() to wait for acknowledgement and get the
- // disposition code.
- //
- // If the send buffer is full, send blocks until there is space in the buffer.
- Send(m amqp.Message) (sm SentMessage, err error)
++ // SendSync sends a message and blocks until the message is acknowledged by the remote receiver.
++ // Returns an Outcome, which may contain an error if the message could not be sent.
++ SendSync(m amqp.Message) Outcome
++
++ // SendWaitable puts a message in the send buffer and returns a channel that
++ // you can use to wait for the Outcome of just that message. The channel is
++ // buffered so you can receive from it whenever you want without blocking anything.
++ SendWaitable(m amqp.Message) <-chan Outcome
++
++ // SendForget buffers a message for sending and returns, with no notification of the outcome.
++ SendForget(m amqp.Message)
+
- // SendTimeout is like send but only waits up to timeout for buffer space.
++ // SendAsync puts a message in the send buffer and returns immediately. An
++ // Outcome with Value = value will be sent to the ack channel when the remote
++ // receiver has acknowledged the message or if there is an error.
+ //
- // Returns Timeout error if the timeout expires and the message has not been sent.
- SendTimeout(m amqp.Message, timeout time.Duration) (sm SentMessage, err error)
++ // You can use the same ack channel for many calls to SendAsync(), possibly on
++ // many Senders. The channel will receive the outcomes in the order they
++ // become available. The channel should be buffered and/or served by dedicated
++ // goroutines to avoid blocking the connection.
++ //
++ // If ack == nil no Outcome is sent.
++ SendAsync(m amqp.Message, ack chan<- Outcome, value interface{})
+
- // Send a message and forget it, there will be no acknowledgement.
- // If the send buffer is full, send blocks until there is space in the buffer.
- SendForget(m amqp.Message) error
++ SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, value interface{}, timeout time.Duration)
+
- // SendForgetTimeout is like send but only waits up to timeout for buffer space.
- // Returns Timeout error if the timeout expires and the message has not been sent.
- SendForgetTimeout(m amqp.Message, timeout time.Duration) error
++ SendWaitableTimeout(m amqp.Message, timeout time.Duration) <-chan Outcome
+
- // Credit indicates how many messages the receiving end of the link can accept.
- //
- // On a Sender credit can be negative, meaning that messages in excess of the
- // receiver's credit limit have been buffered locally till credit is available.
- Credit() (int, error)
- }
++ SendForgetTimeout(m amqp.Message, timeout time.Duration)
+
- type sendMessage struct {
- m amqp.Message
- sm SentMessage
++ SendSyncTimeout(m amqp.Message, timeout time.Duration) Outcome
+}
+
- type sender struct {
- link
- credit chan struct{} // Signal available credit.
++// Outcome provides information about the outcome of sending a message.
++type Outcome struct {
++ // Status of the message: was it sent, how was it acknowledged.
++ Status SentStatus
++ // Error is a local error if Status is Unsent or Unacknowledged, a remote error otherwise.
++ Error error
++ // Value provided by the application in SendAsync()
++ Value interface{}
+}
+
- // Disposition indicates the outcome of a settled message delivery.
- type Disposition uint64
++// SentStatus indicates the status of a sent message.
++type SentStatus int
+
+const (
- // No disposition available: pre-settled, not yet acknowledged or an error occurred
- NoDisposition Disposition = 0
++ // Message was never sent
++ Unsent SentStatus = iota
++ // Message was sent but never acknowledged. It may or may not have been received.
++ Unacknowledged
++ // Message was sent pre-settled, no remote outcome is available.
++ Presettled
+ // Message was accepted by the receiver
- Accepted = proton.Accepted
++ Accepted
+ // Message was rejected as invalid by the receiver
- Rejected = proton.Rejected
- // Message was not processed by the receiver but may be processed by some other receiver.
- Released = proton.Released
++ Rejected
++ // Message was not processed by the receiver but may be valid for a different receiver
++ Released
++ // Receiver responded with an unrecognized status.
++ Unknown
+)
+
- // String human readable name for a Disposition.
- func (d Disposition) String() string {
- switch d {
- case NoDisposition:
- return "no-disposition"
++// String human readable name for SentStatus.
++func (s SentStatus) String() string {
++ switch s {
++ case Unsent:
++ return "unsent"
++ case Unacknowledged:
++ return "unacknowledged"
+ case Accepted:
+ return "accepted"
+ case Rejected:
+ return "rejected"
+ case Released:
+ return "released"
- default:
++ case Unknown:
+ return "unknown"
++ default:
++ return "invalid"
+ }
+}
+
- func (s *sender) Send(m amqp.Message) (SentMessage, error) {
- return s.SendTimeout(m, Forever)
- }
-
- func (s *sender) SendTimeout(m amqp.Message, timeout time.Duration) (SentMessage, error) {
- var sm SentMessage
- if s.sndSettle == SndSettled {
- sm = nil
- } else {
- sm = newSentMessage(s.session.connection)
++// Convert proton delivery state code to SentStatus value
++func sentStatus(d uint64) SentStatus {
++ switch d {
++ case proton.Accepted:
++ return Accepted
++ case proton.Rejected:
++ return Rejected
++ case proton.Released, proton.Modified:
++ return Released
++ default:
++ return Unknown
+ }
- return s.sendInternal(sendMessage{m, sm}, timeout)
- }
-
- func (s *sender) SendForget(m amqp.Message) error {
- return s.SendForgetTimeout(m, Forever)
+}
+
- func (s *sender) SendForgetTimeout(m amqp.Message, timeout time.Duration) error {
- snd := sendMessage{m, nil}
- _, err := s.sendInternal(snd, timeout)
- return err
++// Sender implementation, held by handler.
++type sender struct {
++ link
++ credit chan struct{} // Signal available credit.
+}
+
- func (s *sender) sendInternal(snd sendMessage, timeout time.Duration) (SentMessage, error) {
- if _, err := timedReceive(s.credit, timeout); err != nil { // Wait for credit
- if err == Closed {
++func (s *sender) SendAsyncTimeout(m amqp.Message, ack chan<- Outcome, v interface{}, t time.Duration) {
++ // wait for credit
++ if _, err := timedReceive(s.credit, t); err != nil {
++ if err == Closed && s.Error != nil {
+ err = s.Error()
- internal.Assert(err != nil)
+ }
- return nil, err
++ ack <- Outcome{Unsent, err, v}
++ return
+ }
- if err := s.engine().Inject(func() { s.doSend(snd) }); err != nil {
- return nil, err
- }
- return snd.sm, nil
- }
-
- // Send a message. Handler goroutine
- func (s *sender) doSend(snd sendMessage) {
- delivery, err := s.eLink.Send(snd.m)
- switch sm := snd.sm.(type) {
- case nil:
- delivery.Settle()
- case *sentMessage:
- sm.delivery = delivery
- if err != nil {
- sm.settled(err)
- } else {
- s.handler().sentMessages[delivery] = sm
++ // Send a message in handler goroutine
++ err := s.engine().Inject(func() {
++ if s.Error() != nil {
++ if ack != nil {
++ ack <- Outcome{Unsent, s.Error(), v}
++ }
++ return
+ }
- default:
- internal.Assert(false, "bad SentMessage type %T", snd.sm)
- }
- if s.eLink.Credit() > 0 {
- s.sendable() // Signal credit.
++ if delivery, err := s.eLink.Send(m); err == nil {
++ if ack != nil { // We must report an outcome
++ if s.SndSettle() == SndSettled {
++ delivery.Settle() // Pre-settle if required
++ ack <- Outcome{Presettled, nil, v}
++ } else {
++ s.handler().sentMessages[delivery] = sentMessage{ack, v}
++ }
++ } else { // ack == nil, can't report outcome
++ if s.SndSettle() != SndUnsettled { // Pre-settle unless we are forced not to.
++ delivery.Settle()
++ }
++ }
++ } else { // err != nil
++ if ack != nil {
++ ack <- Outcome{Unsent, err, v}
++ }
++ }
++ if s.eLink.Credit() > 0 { // Signal there is still credit
++ s.sendable()
++ }
++ })
++ if err != nil && ack != nil {
++ ack <- Outcome{Unsent, err, v}
+ }
+}
+
- // Signal the sender has credit. Any goroutine.
++// Set credit flag if not already set. Non-blocking, any goroutine
+func (s *sender) sendable() {
+ select { // Non-blocking
- case s.credit <- struct{}{}: // Set the flag if not already set.
++ case s.credit <- struct{}{}:
+ default:
+ }
+}
+
- func (s *sender) closed(err error) {
- s.link.closed(err)
- close(s.credit)
++func (s *sender) SendWaitableTimeout(m amqp.Message, t time.Duration) <-chan Outcome {
++ out := make(chan Outcome, 1)
++ s.SendAsyncTimeout(m, out, nil, t)
++ return out
+}
+
- func newSender(l link) *sender {
- s := &sender{link: l, credit: make(chan struct{}, 1)}
- s.handler().addLink(s.eLink, s)
- s.link.open()
- return s
- }
-
- // SentMessage represents a previously sent message. It allows you to wait for acknowledgement.
- type SentMessage interface {
-
- // Disposition blocks till the message is acknowledged and returns the
- // disposition state.
- //
- // NoDisposition with Error() != nil means the Connection was closed before
- // the message was acknowledged.
- //
- // NoDisposition with Error() == nil means the message was pre-settled or
- // Forget() was called.
- Disposition() (Disposition, error)
-
- // DispositionTimeout is like Disposition but gives up after timeout, see Timeout.
- DispositionTimeout(time.Duration) (Disposition, error)
-
- // Forget interrupts any call to Disposition on this SentMessage and tells the
- // peer we are no longer interested in the disposition of this message.
- Forget()
-
- // Error returns the error that closed the disposition, or nil if there was no error.
- // If the disposition closed because the connection closed, it will return Closed.
- Error() error
-
- // Value is an optional value you wish to associate with the SentMessage. It
- // can be the message itself or some form of identifier.
- Value() interface{}
- SetValue(interface{})
- }
-
- // SentMessageSet is a concurrent-safe set of sent messages that can be checked
- // to get the next completed sent message
- type SentMessageSet struct {
- cases []reflect.SelectCase
- sm []SentMessage
- done chan SentMessage
- }
-
- func (s *SentMessageSet) Add(sm SentMessage) {
- s.sm = append(s.sm, sm)
- s.cases = append(s.cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sm.(*sentMessage).done)})
++func (s *sender) SendForgetTimeout(m amqp.Message, t time.Duration) {
++ s.SendAsyncTimeout(m, nil, nil, t)
+}
+
- // Wait waits up to timeout and returns the next SentMessage that has a valid dispositionb
- // or an error.
- func (s *SentMessageSet) Wait(sm SentMessage, timeout time.Duration) (SentMessage, error) {
- s.cases = s.cases[:len(s.sm)] // Remove previous timeout cases
- if timeout == 0 { // Non-blocking
- s.cases = append(s.cases, reflect.SelectCase{Dir: reflect.SelectDefault})
- } else {
- s.cases = append(s.cases,
- reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(After(timeout))})
++func (s *sender) SendSyncTimeout(m amqp.Message, t time.Duration) Outcome {
++ deadline := time.Now().Add(t)
++ ack := s.SendWaitableTimeout(m, t)
++ t = deadline.Sub(time.Now()) // Adjust for time already spent.
++ if t < 0 {
++ t = 0
+ }
- chosen, _, _ := reflect.Select(s.cases)
- if chosen > len(s.sm) {
- return nil, Timeout
++ if out, err := timedReceive(ack, t); err == nil {
++ return out.(Outcome)
+ } else {
- sm := s.sm[chosen]
- s.sm = append(s.sm[:chosen], s.sm[chosen+1:]...)
- return sm, nil
++ if err == Closed && s.Error() != nil {
++ err = s.Error()
++ }
++ return Outcome{Unacknowledged, err, nil}
+ }
+}
+
- // SentMessage implementation
- type sentMessage struct {
- connection *connection
- done chan struct{}
- delivery proton.Delivery
- disposition Disposition
- err error
- value interface{}
++func (s *sender) SendAsync(m amqp.Message, ack chan<- Outcome, v interface{}) {
++ s.SendAsyncTimeout(m, ack, v, Forever)
+}
+
- func newSentMessage(c *connection) *sentMessage {
- return &sentMessage{connection: c, done: make(chan struct{})}
++func (s *sender) SendWaitable(m amqp.Message) <-chan Outcome {
++ return s.SendWaitableTimeout(m, Forever)
+}
+
- func (sm *sentMessage) SetValue(v interface{}) { sm.value = v }
- func (sm *sentMessage) Value() interface{} { return sm.value }
- func (sm *sentMessage) Disposition() (Disposition, error) {
- <-sm.done
- return sm.disposition, sm.err
++func (s *sender) SendForget(m amqp.Message) {
++ s.SendForgetTimeout(m, Forever)
+}
+
- func (sm *sentMessage) DispositionTimeout(timeout time.Duration) (Disposition, error) {
- if _, err := timedReceive(sm.done, timeout); err == Timeout {
- return sm.disposition, Timeout
- } else {
- return sm.disposition, sm.err
- }
++func (s *sender) SendSync(m amqp.Message) Outcome {
++ return <-s.SendWaitable(m)
+}
+
- func (sm *sentMessage) Forget() {
- sm.connection.engine.Inject(func() {
- sm.delivery.Settle()
- delete(sm.connection.handler.sentMessages, sm.delivery)
- })
- sm.finish()
++// handler goroutine
++func (s *sender) closed(err error) {
++ s.link.closed(err)
++ close(s.credit)
+}
+
- func (sm *sentMessage) settled(err error) {
- if sm.delivery.Settled() {
- sm.disposition = Disposition(sm.delivery.Remote().Type())
- }
- sm.err = err
- sm.finish()
++func newSender(l link) *sender {
++ s := &sender{link: l, credit: make(chan struct{}, 1)}
++ s.handler().addLink(s.eLink, s)
++ s.link.open()
++ return s
+}
+
- func (sm *sentMessage) finish() {
- select {
- case <-sm.done: // No-op if already closed
- default:
- close(sm.done)
- }
++// sentMessage records a sent message on the handler.
++type sentMessage struct {
++ ack chan<- Outcome
++ value interface{}
+}
+
- func (sm *sentMessage) Error() error { return sm.err }
-
- // IncomingSender is passed to the accept() function given to Connection.Listen()
- // when there is an incoming request for a sender link.
++// IncomingSender is sent on the Connection.Incoming() channel when there is
++// an incoming request to open a sender link.
+type IncomingSender struct {
+ incomingLink
+}
+
- // Link provides information about the incoming link.
- func (i *IncomingSender) Link() Link { return i }
-
- func (i *IncomingSender) AcceptSender() Sender { return i.Accept().(Sender) }
++// Accept accepts an incoming sender endpoint
++func (in *IncomingSender) Accept() Endpoint {
++ return in.accept(func() Endpoint { return newSender(in.link) })
++}
+
- func (i *IncomingSender) Accept() Endpoint {
- i.accepted = true
- return newSender(i.link)
++// Call in injected functions to check if the sender is valid.
++func (s *sender) valid() bool {
++ s2, ok := s.handler().links[s.eLink].(*sender)
++ return ok && s2 == s
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/session.go
----------------------------------------------------------------------
diff --cc electron/session.go
index 3531da6,0000000..18d8bc8
mode 100644,000000..100644
--- a/electron/session.go
+++ b/electron/session.go
@@@ -1,125 -1,0 +1,128 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
+ "qpid.apache.org/proton"
+)
+
+// Session is an AMQP session, it contains Senders and Receivers.
+type Session interface {
+ Endpoint
+
- // Sender opens a new sender. v can be a string, which is used as the Target
- // address, or a SenderSettings struct containing more details settings.
- Sender(...LinkSetting) (Sender, error)
-
- // Receiver opens a new Receiver. v can be a string, which is used as the
- // Source address, or a ReceiverSettings struct containing more details
- // settings.
- Receiver(...LinkSetting) (Receiver, error)
++ // Sender opens a new sender.
++ Sender(...LinkOption) (Sender, error)
++
++ // Receiver opens a new Receiver.
++ Receiver(...LinkOption) (Receiver, error)
+}
+
+type session struct {
+ endpoint
+ eSession proton.Session
+ connection *connection
+ capacity uint
+}
+
- // SessionSetting can be passed when creating a sender or receiver.
- // See functions that return SessionSetting for details
- type SessionSetting func(*session)
++// SessionOption can be passed when creating a Session
++type SessionOption func(*session)
+
- // IncomingCapacity sets the size (in bytes) of the sessions incoming data buffer..
- func IncomingCapacity(cap uint) SessionSetting { return func(s *session) { s.capacity = cap } }
++// IncomingCapacity returns a Session Option that sets the size (in bytes) of
++// the sessions incoming data buffer..
++func IncomingCapacity(cap uint) SessionOption { return func(s *session) { s.capacity = cap } }
+
+// in proton goroutine
- func newSession(c *connection, es proton.Session, setting ...SessionSetting) *session {
++func newSession(c *connection, es proton.Session, setting ...SessionOption) *session {
+ s := &session{
+ connection: c,
+ eSession: es,
- endpoint: endpoint{str: es.String()},
++ endpoint: makeEndpoint(es.String()),
+ }
+ for _, set := range setting {
+ set(s)
+ }
+ c.handler.sessions[s.eSession] = s
+ s.eSession.SetIncomingCapacity(s.capacity)
+ s.eSession.Open()
+ return s
+}
+
+func (s *session) Connection() Connection { return s.connection }
+func (s *session) eEndpoint() proton.Endpoint { return s.eSession }
+func (s *session) engine() *proton.Engine { return s.connection.engine }
++
+func (s *session) Close(err error) {
- s.engine().Inject(func() { localClose(s.eSession, err) })
++ s.engine().Inject(func() {
++ if s.Error() == nil {
++ localClose(s.eSession, err)
++ }
++ })
+}
+
- func (s *session) SetCapacity(bytes uint) { s.capacity = bytes }
-
- func (s *session) Sender(setting ...LinkSetting) (snd Sender, err error) {
++func (s *session) Sender(setting ...LinkOption) (snd Sender, err error) {
+ err = s.engine().InjectWait(func() error {
++ if s.Error() != nil {
++ return s.Error()
++ }
+ l, err := localLink(s, true, setting...)
+ if err == nil {
+ snd = newSender(l)
+ }
+ return err
+ })
+ return
+}
+
- func (s *session) Receiver(setting ...LinkSetting) (rcv Receiver, err error) {
++func (s *session) Receiver(setting ...LinkOption) (rcv Receiver, err error) {
+ err = s.engine().InjectWait(func() error {
++ if s.Error() != nil {
++ return s.Error()
++ }
+ l, err := localLink(s, false, setting...)
+ if err == nil {
+ rcv = newReceiver(l)
+ }
+ return err
+ })
+ return
+}
+
- // Called from handler on closed.
- func (s *session) closed(err error) {
- s.err.Set(err)
- s.err.Set(Closed)
- }
-
- // IncomingSession is passed to the accept() function given to Connection.Listen()
- // when there is an incoming session request.
++// IncomingSender is sent on the Connection.Incoming() channel when there is an
++// incoming request to open a session.
+type IncomingSession struct {
+ incoming
+ h *handler
+ pSession proton.Session
+ capacity uint
+}
+
- // AcceptCapacity sets the session buffer capacity of an incoming session in bytes.
- func (i *IncomingSession) AcceptSession(bytes uint) Session {
- i.capacity = bytes
- return i.Accept().(Session)
++func newIncomingSession(h *handler, ps proton.Session) *IncomingSession {
++ return &IncomingSession{incoming: makeIncoming(ps), h: h, pSession: ps}
+}
+
- func (i *IncomingSession) Accept() Endpoint {
- i.accepted = true
- return newSession(i.h.connection, i.pSession, IncomingCapacity(i.capacity))
++// SetCapacity sets the session buffer capacity of an incoming session in bytes.
++func (in *IncomingSession) SetCapacity(bytes uint) { in.capacity = bytes }
++
++// Accept an incoming session endpoint.
++func (in *IncomingSession) Accept() Endpoint {
++ return in.accept(func() Endpoint {
++ return newSession(in.h.connection, in.pSession, IncomingCapacity(in.capacity))
++ })
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/electron/time.go
----------------------------------------------------------------------
diff --cc electron/time.go
index 3407b82,0000000..51bfbc5
mode 100644,000000..100644
--- a/electron/time.go
+++ b/electron/time.go
@@@ -1,82 -1,0 +1,83 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
- "qpid.apache.org/internal"
++ "fmt"
++ "math"
+ "reflect"
+ "time"
+)
+
+// Timeout is the error returned if an operation does not complete on time.
+//
+// Methods named *Timeout in this package take time.Duration timeout parameter.
+//
+// If timeout > 0 and there is no result available before the timeout, they
+// return a zero or nil value and Timeout as an error.
+//
+// If timeout == 0 they will return a result if one is immediatley available or
+// nil/zero and Timeout as an error if not.
+//
+// If timeout == Forever the function will return only when there is a result or
+// some non-timeout error occurs.
+//
- var Timeout = internal.Errorf("timeout")
++var Timeout = fmt.Errorf("timeout")
+
+// Forever can be used as a timeout parameter to indicate wait forever.
- const Forever time.Duration = -1
++const Forever time.Duration = math.MaxInt64
+
+// timedReceive receives on channel (which can be a chan of any type), waiting
+// up to timeout.
+//
+// timeout==0 means do a non-blocking receive attempt. timeout < 0 means block
+// forever. Other values mean block up to the timeout.
+//
+// Returns error Timeout on timeout, Closed on channel close.
+func timedReceive(channel interface{}, timeout time.Duration) (interface{}, error) {
+ cases := []reflect.SelectCase{
+ reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(channel)},
+ }
+ if timeout == 0 { // Non-blocking
+ cases = append(cases, reflect.SelectCase{Dir: reflect.SelectDefault})
+ } else { // Block up to timeout
+ cases = append(cases,
+ reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(After(timeout))})
+ }
+ chosen, value, ok := reflect.Select(cases)
+ switch {
- case !ok:
- return nil, Closed
- case chosen == 0:
++ case chosen == 0 && ok:
+ return value.Interface(), nil
++ case chosen == 0 && !ok:
++ return nil, Closed
+ default:
+ return nil, Timeout
+ }
+}
+
+// After is like time.After but returns a nil channel if timeout == Forever
+// since selecting on a nil channel will never return.
+func After(timeout time.Duration) <-chan time.Time {
+ if timeout == Forever {
+ return nil
+ } else {
+ return time.After(timeout)
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/proton/engine.go
----------------------------------------------------------------------
diff --cc proton/engine.go
index 2cebb49,0000000..2e67ef7
mode 100644,000000..100644
--- a/proton/engine.go
+++ b/proton/engine.go
@@@ -1,402 -1,0 +1,403 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package proton
+
+// #include <proton/connection.h>
+// #include <proton/event.h>
+// #include <proton/handlers.h>
+// #include <proton/session.h>
+// #include <proton/transport.h>
+// #include <memory.h>
+// #include <stdlib.h>
+//
+// PN_HANDLE(REMOTE_ADDR)
+import "C"
+
+import (
+ "fmt"
+ "io"
+ "net"
- "qpid.apache.org/internal"
+ "sync"
+ "unsafe"
+)
+
+// Injecter allows functions to be "injected" into the event-processing loop, to
+// be called in the same goroutine as event handlers.
+type Injecter interface {
+ // Inject a function into the engine goroutine.
+ //
+ // f() will be called in the same goroutine as event handlers, so it can safely
+ // use values belonging to event handlers without synchronization. f() should
+ // not block, no further events or injected functions can be processed until
+ // f() returns.
+ //
+ // Returns a non-nil error if the function could not be injected and will
+ // never be called. Otherwise the function will eventually be called.
+ //
+ // Note that proton values (Link, Session, Connection etc.) that existed when
+ // Inject(f) was called may have become invalid by the time f() is executed.
+ // Handlers should handle keep track of Closed events to ensure proton values
+ // are not used after they become invalid. One technique is to have map from
+ // proton values to application values. Check that the map has the correct
+ // proton/application value pair at the start of the injected function and
+ // delete the value from the map when handling a Closed event.
+ Inject(f func()) error
+
+ // InjectWait is like Inject but does not return till f() has completed.
+ // If f() cannot be injected it returns the error from Inject(), otherwise
+ // it returns the error from f()
+ InjectWait(f func() error) error
+}
+
+// bufferChan manages a pair of ping-pong buffers to pass bytes through a channel.
+type bufferChan struct {
+ buffers chan []byte
+ buf1, buf2 []byte
+}
+
+func newBufferChan(size int) *bufferChan {
+ return &bufferChan{make(chan []byte), make([]byte, size), make([]byte, size)}
+}
+
+func (b *bufferChan) buffer() []byte {
+ b.buf1, b.buf2 = b.buf2, b.buf1 // Alternate buffers.
+ return b.buf1[:cap(b.buf1)]
+}
+
+// Engine reads from a net.Conn, decodes AMQP events and calls the appropriate
+// Handler functions sequentially in a single goroutine. Actions taken by
+// Handler functions (such as sending messages) are encoded and written to the
+// net.Conn. You can create multiple Engines to handle multiple connections
+// concurrently.
+//
+// You implement the EventHandler and/or MessagingHandler interfaces and provide
+// those values to NewEngine(). Their HandleEvent method will be called in the
+// event-handling goroutine.
+//
+// Handlers can pass values from an event (Connections, Links, Deliveries etc.) to
+// other goroutines, store them, or use them as map indexes. Effectively they are
+// just pointers. Other goroutines cannot call their methods directly but they can
+// can create a function closure to call such methods and pass it to Engine.Inject()
+// to have it evaluated in the engine goroutine.
+//
+// You are responsible for ensuring you don't use an event value after it is
+// invalid. The handler methods will tell you when a value is no longer valid. For
+// example after a LinkClosed event, that link is no longer valid. If you do
+// Link.Close() yourself (in a handler or injected function) the link remains valid
+// until the corresponing LinkClosed event is received by the handler.
+//
+// Engine.Close() will take care of cleaning up any remaining values when you are
+// done with the Engine. All values associated with a engine become invalid when you
+// call Engine.Close()
+//
+// The qpid.apache.org/proton/concurrent package will do all this for you, so it
+// may be a better choice for some applications.
+//
+type Engine struct {
+ // Error is set on exit from Run() if there was an error.
- err internal.ErrorHolder
++ err ErrorHolder
+ inject chan func()
+
+ conn net.Conn
+ connection Connection
+ transport Transport
+ collector *C.pn_collector_t
+ read *bufferChan // Read buffers channel.
+ write *bufferChan // Write buffers channel.
+ handlers []EventHandler // Handlers for proton events.
+ running chan struct{} // This channel will be closed when the goroutines are done.
+ closeOnce sync.Once
+}
+
+const bufferSize = 4096
+
- // Map of Connection to *Engine
- var engines = internal.MakeSafeMap()
-
+// NewEngine initializes a engine with a connection and handlers. To start it running:
+// eng := NewEngine(...)
+// go run eng.Run()
+// The goroutine will exit when the engine is closed or disconnected.
+// You can check for errors on Engine.Error.
+//
+func NewEngine(conn net.Conn, handlers ...EventHandler) (*Engine, error) {
+ // Save the connection ID for Connection.String()
+ eng := &Engine{
+ inject: make(chan func()),
+ conn: conn,
+ transport: Transport{C.pn_transport()},
+ connection: Connection{C.pn_connection()},
+ collector: C.pn_collector(),
+ handlers: handlers,
+ read: newBufferChan(bufferSize),
+ write: newBufferChan(bufferSize),
+ running: make(chan struct{}),
+ }
+ if eng.transport.IsNil() || eng.connection.IsNil() || eng.collector == nil {
- return nil, internal.Errorf("failed to allocate engine")
++ return nil, fmt.Errorf("failed to allocate engine")
+ }
+
+ // TODO aconway 2015-06-25: connection settings for user, password, container etc.
+ // before transport.Bind() Set up connection before Engine, allow Engine or Reactor
+ // to run connection.
+
+ // Unique container-id by default.
- eng.connection.SetContainer(internal.UUID4().String())
++ eng.connection.SetContainer(UUID4().String())
+ pnErr := eng.transport.Bind(eng.connection)
+ if pnErr != 0 {
- return nil, internal.Errorf("cannot setup engine: %s", internal.PnErrorCode(pnErr))
++ return nil, fmt.Errorf("cannot setup engine: %s", PnErrorCode(pnErr))
+ }
+ C.pn_connection_collect(eng.connection.pn, eng.collector)
+ eng.connection.Open()
- connectionContexts.Put(eng.connection, connectionContext{eng.String()})
+ return eng, nil
+}
+
+func (eng *Engine) String() string {
+ return fmt.Sprintf("%s-%s", eng.conn.LocalAddr(), eng.conn.RemoteAddr())
+}
+
+func (eng *Engine) Id() string {
+ return fmt.Sprintf("%eng", &eng)
+}
+
+func (eng *Engine) Error() error {
+ return eng.err.Get()
+}
+
+// Inject a function into the Engine's event loop.
+//
+// f() will be called in the same event-processing goroutine that calls Handler
+// methods. f() can safely call methods on values that belong to this engine
+// (Sessions, Links etc)
+//
+// The injected function has no parameters or return values. It is normally a
+// closure and can use channels to communicate with the injecting goroutine if
+// necessary.
+//
+// Returns a non-nil error if the engine is closed before the function could be
+// injected.
+func (eng *Engine) Inject(f func()) error {
+ select {
+ case eng.inject <- f:
+ return nil
+ case <-eng.running:
+ return eng.Error()
+ }
+}
+
+// InjectWait is like Inject but does not return till f() has completed or the
+// engine is closed, and returns an error value from f()
+func (eng *Engine) InjectWait(f func() error) error {
+ done := make(chan error)
+ defer close(done)
+ err := eng.Inject(func() { done <- f() })
+ if err != nil {
+ return err
+ }
+ select {
+ case <-eng.running:
+ return eng.Error()
+ case err := <-done:
+ return err
+ }
+}
+
+// Server puts the Engine in server mode, meaning it will auto-detect security settings on
+// the incoming connnection such as use of SASL and SSL.
+// Must be called before Run()
+//
+func (eng *Engine) Server() { eng.transport.SetServer() }
+
+// Close the engine's connection, returns when the engine has exited.
+func (eng *Engine) Close(err error) {
+ eng.err.Set(err)
+ eng.Inject(func() {
+ CloseError(eng.connection, err)
+ })
+ <-eng.running
+}
+
+// Disconnect the engine's connection without and AMQP close, returns when the engine has exited.
+func (eng *Engine) Disconnect(err error) {
+ eng.err.Set(err)
+ eng.conn.Close()
+ <-eng.running
+}
+
+// Run the engine. Engine.Run() will exit when the engine is closed or
+// disconnected. You can check for errors after exit with Engine.Error().
+//
+func (eng *Engine) Run() error {
+ wait := sync.WaitGroup{}
+ wait.Add(2) // Read and write goroutines
+
+ readErr := make(chan error, 1) // Don't block
+ go func() { // Read goroutine
+ defer wait.Done()
+ for {
+ rbuf := eng.read.buffer()
+ n, err := eng.conn.Read(rbuf)
+ if n > 0 {
+ eng.read.buffers <- rbuf[:n]
+ }
+ if err != nil {
+ readErr <- err
+ close(readErr)
+ close(eng.read.buffers)
+ return
+ }
+ }
+ }()
+
+ writeErr := make(chan error, 1) // Don't block
+ go func() { // Write goroutine
+ defer wait.Done()
+ for {
+ wbuf, ok := <-eng.write.buffers
+ if !ok {
+ return
+ }
+ _, err := eng.conn.Write(wbuf)
+ if err != nil {
+ writeErr <- err
+ close(writeErr)
+ return
+ }
+ }
+ }()
+
+ wbuf := eng.write.buffer()[:0]
+
+ for eng.err.Get() == nil {
+ if len(wbuf) == 0 {
+ eng.pop(&wbuf)
+ }
+ // Don't set wchan unless there is something to write.
+ var wchan chan []byte
+ if len(wbuf) > 0 {
+ wchan = eng.write.buffers
+ }
+
+ select {
+ case buf, ok := <-eng.read.buffers: // Read a buffer
+ if ok {
+ eng.push(buf)
+ }
+ case wchan <- wbuf: // Write a buffer
+ wbuf = eng.write.buffer()[:0]
+ case f, ok := <-eng.inject: // Function injected from another goroutine
+ if ok {
+ f()
+ }
+ case err := <-readErr:
+ eng.netError(err)
+ case err := <-writeErr:
+ eng.netError(err)
+ }
+ eng.process()
+ }
+ close(eng.write.buffers)
+ eng.conn.Close() // Make sure connection is closed
+ wait.Wait()
+ close(eng.running) // Signal goroutines have exited and Error is set.
+
- connectionContexts.Delete(eng.connection)
++ // Execute any injected functions for side effects on application data structures.
++ inject := eng.inject
++ eng.inject = nil // Further calls to Inject() will return an error.
++ for f := range inject {
++ f()
++ }
++
+ if !eng.connection.IsNil() {
+ eng.connection.Free()
+ }
+ if !eng.transport.IsNil() {
+ eng.transport.Free()
+ }
+ if eng.collector != nil {
+ C.pn_collector_free(eng.collector)
+ }
+ for _, h := range eng.handlers {
+ switch h := h.(type) {
+ case cHandler:
+ C.pn_handler_free(h.pn)
+ }
+ }
+ return eng.err.Get()
+}
+
+func (eng *Engine) netError(err error) {
+ eng.err.Set(err)
+ eng.transport.CloseHead()
+ eng.transport.CloseTail()
+}
+
+func minInt(a, b int) int {
+ if a < b {
+ return a
+ } else {
+ return b
+ }
+}
+
+func (eng *Engine) pop(buf *[]byte) {
+ pending := int(eng.transport.Pending())
+ switch {
+ case pending == int(C.PN_EOS):
+ *buf = (*buf)[:]
+ return
+ case pending < 0:
- panic(internal.Errorf("%s", internal.PnErrorCode(pending)))
++ panic(fmt.Errorf("%s", PnErrorCode(pending)))
+ }
+ size := minInt(pending, cap(*buf))
+ *buf = (*buf)[:size]
+ if size == 0 {
+ return
+ }
+ C.memcpy(unsafe.Pointer(&(*buf)[0]), eng.transport.Head(), C.size_t(size))
- internal.Assert(size > 0)
++ assert(size > 0)
+ eng.transport.Pop(uint(size))
+}
+
+func (eng *Engine) push(buf []byte) {
+ buf2 := buf
+ for len(buf2) > 0 {
+ n := eng.transport.Push(buf2)
+ if n <= 0 {
- panic(internal.Errorf("error in transport: %s", internal.PnErrorCode(n)))
++ panic(fmt.Errorf("error in transport: %s", PnErrorCode(n)))
+ }
+ buf2 = buf2[n:]
+ }
+}
+
+func (eng *Engine) handle(e Event) {
+ for _, h := range eng.handlers {
+ h.HandleEvent(e)
+ }
+ if e.Type() == ETransportClosed {
+ eng.err.Set(io.EOF)
+ }
+}
+
+func (eng *Engine) process() {
+ for ce := C.pn_collector_peek(eng.collector); ce != nil; ce = C.pn_collector_peek(eng.collector) {
+ eng.handle(makeEvent(ce, eng))
+ C.pn_collector_pop(eng.collector)
+ }
+}
+
+func (eng *Engine) Connection() Connection { return eng.connection }
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9e788b2d/proton/error.go
----------------------------------------------------------------------
diff --cc proton/error.go
index f9cc948,0000000..80d9680
mode 100644,000000..100644
--- a/proton/error.go
+++ b/proton/error.go
@@@ -1,96 -1,0 +1,96 @@@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
++// Internal implementation details - ignore.
+package proton
+
++// #cgo LDFLAGS: -lqpid-proton
+// #include <proton/error.h>
+// #include <proton/codec.h>
+import "C"
+
+import (
+ "fmt"
- "reflect"
- "runtime"
++ "sync"
++ "sync/atomic"
+)
+
- var pnErrorNames = map[int]string{
- C.PN_EOS: "end of data",
- C.PN_ERR: "error",
- C.PN_OVERFLOW: "overflow",
- C.PN_UNDERFLOW: "underflow",
- C.PN_STATE_ERR: "bad state",
- C.PN_ARG_ERR: "invalid argument",
- C.PN_TIMEOUT: "timeout",
- C.PN_INTR: "interrupted",
- C.PN_INPROGRESS: "in progress",
- }
++type PnErrorCode int
+
- func pnErrorName(code int) string {
- name := pnErrorNames[code]
- if name != "" {
- return name
- } else {
- return "unknown error code"
++func (e PnErrorCode) String() string {
++ switch e {
++ case C.PN_EOS:
++ return "end-of-data"
++ case C.PN_ERR:
++ return "error"
++ case C.PN_OVERFLOW:
++ return "overflow"
++ case C.PN_UNDERFLOW:
++ return "underflow"
++ case C.PN_STATE_ERR:
++ return "bad-state"
++ case C.PN_ARG_ERR:
++ return "invalid-argument"
++ case C.PN_TIMEOUT:
++ return "timeout"
++ case C.PN_INTR:
++ return "interrupted"
++ case C.PN_INPROGRESS:
++ return "in-progress"
++ default:
++ return fmt.Sprintf("unknown-error(%d)", e)
+ }
+}
+
- type BadUnmarshal struct {
- AMQPType C.pn_type_t
- GoType reflect.Type
++func PnError(e *C.pn_error_t) error {
++ if e == nil || C.pn_error_code(e) == 0 {
++ return nil
++ }
++ return fmt.Errorf("%s: %s", PnErrorCode(C.pn_error_code(e)), C.GoString(C.pn_error_text(e)))
+}
+
- func newBadUnmarshal(pnType C.pn_type_t, v interface{}) *BadUnmarshal {
- return &BadUnmarshal{pnType, reflect.TypeOf(v)}
++// ErrorHolder is a goroutine-safe error holder that keeps the first error that is set.
++type ErrorHolder struct {
++ once sync.Once
++ value atomic.Value
+}
+
- func (e BadUnmarshal) Error() string {
- if e.GoType.Kind() != reflect.Ptr {
- return fmt.Sprintf("proton: cannot unmarshal to type %s, not a pointer", e.GoType)
- } else {
- return fmt.Sprintf("proton: cannot unmarshal AMQP %s to %s", getPnType(e.AMQPType), e.GoType)
++// Set the error if not already set, return the error in the Holder.
++func (e *ErrorHolder) Set(err error) {
++ if err != nil {
++ e.once.Do(func() { e.value.Store(err) })
+ }
+}
+
- // errorf creates an error with a formatted message
- func errorf(format string, a ...interface{}) error {
- return fmt.Errorf("proton: %s", fmt.Sprintf(format, a...))
++// Get the error.
++func (e *ErrorHolder) Get() (err error) {
++ err, _ = e.value.Load().(error)
++ return
+}
+
- func pnDataError(data *C.pn_data_t) string {
- err := C.pn_data_error(data)
- if err != nil && int(C.pn_error_code(err)) != 0 {
- return C.GoString(C.pn_error_text(err))
- }
- return ""
- }
-
- // doRecover is called to recover from internal panics
- func doRecover(err *error) {
- r := recover()
- switch r := r.(type) {
- case nil:
- return
- case runtime.Error:
- panic(r)
- case error:
- *err = r
- default:
- panic(r)
++// assert panics if condition is false with optional formatted message
++func assert(condition bool, format ...interface{}) {
++ if !condition {
++ if len(format) > 0 {
++ panic(fmt.Errorf(format[0].(string), format[1:]...))
++ } else {
++ panic(fmt.Errorf("assertion failed"))
++ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org