You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2015/10/23 16:36:10 UTC
[08/50] [abbrv] qpid-proton git commit: PROTON-1011: Go example of
event driven broker. Package renaming and some new features.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
new file mode 100644
index 0000000..92c0b90
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
@@ -0,0 +1,232 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
+ "qpid.apache.org/internal"
+ "qpid.apache.org/proton"
+ "qpid.apache.org/amqp"
+ "sync"
+ "time"
+)
+
+// Receiver is a Link that receives messages.
+//
+type Receiver interface {
+ Link
+
+ // Receive blocks until a message is available or until the Receiver is closed
+ // and has no more buffered messages.
+ Receive() (ReceivedMessage, error)
+
+ // ReceiveTimeout is like Receive but gives up after timeout, see Timeout.
+ //
+ // Note that that if Prefetch is false, after a Timeout the credit issued by
+ // Receive remains on the link. It will be used by the next call to Receive.
+ ReceiveTimeout(timeout time.Duration) (ReceivedMessage, error)
+
+ // Prefetch==true means the Receiver will automatically issue credit to the
+ // remote sender to keep its buffer as full as possible, i.e. it will
+ // "pre-fetch" messages independently of the application calling
+ // Receive(). This gives good throughput for applications that handle a
+ // continuous stream of messages. Larger capacity may improve throughput, the
+ // optimal value depends on the characteristics of your application.
+ //
+ // Prefetch==false means the Receiver will issue only issue credit when you
+ // call Receive(), and will only issue enough credit to satisfy the calls
+ // actually made. This gives lower throughput but will not fetch any messages
+ // in advance. It is good for synchronous applications that need to evaluate
+ // each message before deciding whether to receive another. The
+ // request-response pattern is a typical example. If you make concurrent
+ // calls to Receive with pre-fetch disabled, you can improve performance by
+ // setting the capacity close to the expected number of concurrent calls.
+ //
+ Prefetch() bool
+
+ // Capacity is the size (number of messages) of the local message buffer
+ // These are messages received but not yet returned to the application by a call to Receive()
+ Capacity() int
+
+ // SetCapacity sets Capacity and Prefetch of an accepted Receiver.
+ // May only be called in an accept() function, see Connection.Listen()
+ SetCapacity(capacity int, prefetch bool)
+}
+
+// Flow control policy for a receiver.
+type policy interface {
+ // Called at the start of Receive() to adjust credit before fetching a message.
+ Pre(*receiver)
+ // Called after Receive() has received a message from Buffer() before it returns.
+ // Non-nil error means no message was received because of an error.
+ Post(*receiver, error)
+}
+
+type prefetchPolicy struct{}
+
+func (p prefetchPolicy) Flow(r *receiver) {
+ r.engine().Inject(func() {
+ _, _, max := r.credit()
+ if max > 0 {
+ r.eLink.Flow(max)
+ }
+ })
+}
+func (p prefetchPolicy) Pre(r *receiver) { p.Flow(r) }
+func (p prefetchPolicy) Post(r *receiver, err error) {
+ if err == nil {
+ p.Flow(r)
+ }
+}
+
+type noPrefetchPolicy struct{ waiting int }
+
+func (p noPrefetchPolicy) Flow(r *receiver) { // Not called in proton goroutine
+ r.engine().Inject(func() {
+ len, credit, max := r.credit()
+ add := p.waiting - (len + credit)
+ if add > max {
+ add = max // Don't overflow
+ }
+ if add > 0 {
+ r.eLink.Flow(add)
+ }
+ })
+}
+func (p noPrefetchPolicy) Pre(r *receiver) { p.waiting++; p.Flow(r) }
+func (p noPrefetchPolicy) Post(r *receiver, err error) {
+ p.waiting--
+ if err == nil {
+ p.Flow(r)
+ }
+}
+
+// Receiver implementation
+type receiver struct {
+ link
+ buffer chan ReceivedMessage
+ policy policy
+ setupOnce sync.Once
+}
+
+func (r *receiver) SetCapacity(capacity int, prefetch bool) {
+ internal.Assert(r.inAccept, "Receiver.SetCapacity called outside of accept function")
+ r.capacity = capacity
+ r.prefetch = prefetch
+}
+
+func (r *receiver) setup() {
+ if r.capacity < 1 {
+ r.capacity = 1
+ }
+ if r.prefetch {
+ r.policy = &prefetchPolicy{}
+ } else {
+ r.policy = &noPrefetchPolicy{}
+ }
+ r.buffer = make(chan ReceivedMessage, r.capacity)
+}
+
+// call in proton goroutine.
+func (r *receiver) credit() (buffered, credit, max int) {
+ return len(r.buffer), r.eLink.Credit(), cap(r.buffer) - len(r.buffer)
+}
+
+func (r *receiver) Capacity() int { return cap(r.buffer) }
+func (r *receiver) Prefetch() bool { return r.prefetch }
+
+func (r *receiver) Receive() (rm ReceivedMessage, err error) {
+ return r.ReceiveTimeout(Forever)
+}
+
+func (r *receiver) ReceiveTimeout(timeout time.Duration) (rm ReceivedMessage, err error) {
+ r.setupOnce.Do(r.setup)
+ internal.Assert(r.buffer != nil, "Receiver is not open: %s", r)
+ r.policy.Pre(r)
+ defer func() { r.policy.Post(r, err) }()
+ rmi, ok, timedout := timedReceive(r.buffer, timeout)
+ switch {
+ case timedout:
+ return ReceivedMessage{}, Timeout
+ case !ok:
+ return ReceivedMessage{}, r.Error()
+ default:
+ return rmi.(ReceivedMessage), nil
+ }
+}
+
+// Called in proton goroutine on MMessage event.
+func (r *receiver) message(delivery proton.Delivery) {
+ if r.eLink.State().RemoteClosed() {
+ localClose(r.eLink, r.eLink.RemoteCondition().Error())
+ return
+ }
+ if delivery.HasMessage() {
+ m, err := delivery.Message()
+ if err != nil {
+ localClose(r.eLink, err)
+ return
+ }
+ internal.Assert(m != nil)
+ r.eLink.Advance()
+ if r.eLink.Credit() < 0 {
+ localClose(r.eLink, internal.Errorf("received message in excess of credit limit"))
+ } else {
+ // We never issue more credit than cap(buffer) so this will not block.
+ r.buffer <- ReceivedMessage{m, delivery, r}
+ }
+ }
+}
+
+func (r *receiver) open() {
+ r.setupOnce.Do(r.setup)
+ r.link.open()
+ r.handler().addLink(r.eLink, r)
+}
+
+func (r *receiver) closed(err error) {
+ r.link.closed(err)
+ if r.buffer != nil {
+ close(r.buffer)
+ }
+}
+
+// ReceivedMessage contains an amqp.Message and allows the message to be acknowledged.
+type ReceivedMessage struct {
+ // Message is the received message.
+ Message amqp.Message
+
+ eDelivery proton.Delivery
+ receiver Receiver
+}
+
+// Acknowledge a ReceivedMessage with the given disposition code.
+func (rm *ReceivedMessage) Acknowledge(disposition Disposition) error {
+ return rm.receiver.(*receiver).engine().InjectWait(func() error {
+ // Settle doesn't return an error but if the receiver is broken the settlement won't happen.
+ rm.eDelivery.SettleAs(uint64(disposition))
+ return rm.receiver.Error()
+ })
+}
+
+// Accept is short for Acknowledge(Accpeted)
+func (rm *ReceivedMessage) Accept() error { return rm.Acknowledge(Accepted) }
+
+// Reject is short for Acknowledge(Rejected)
+func (rm *ReceivedMessage) Reject() error { return rm.Acknowledge(Rejected) }
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
new file mode 100644
index 0000000..3124f74
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
@@ -0,0 +1,319 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+// #include <proton/disposition.h>
+import "C"
+
+import (
+ "container/list"
+ "qpid.apache.org/internal"
+ "qpid.apache.org/proton"
+ "qpid.apache.org/amqp"
+ "reflect"
+ "time"
+)
+
+// Sender is a Link that sends messages.
+type Sender interface {
+ Link
+
+ // Send a message without waiting for acknowledgement. Returns a SentMessage.
+ // use SentMessage.Disposition() to wait for acknowledgement and get the
+ // disposition code.
+ //
+ // If the send buffer is full, send blocks until there is space in the buffer.
+ Send(m amqp.Message) (sm SentMessage, err error)
+
+ // SendTimeout is like send but only waits up to timeout for buffer space.
+ //
+ // Returns Timeout error if the timeout expires and the message has not been sent.
+ SendTimeout(m amqp.Message, timeout time.Duration) (sm SentMessage, err error)
+
+ // Send a message and forget it, there will be no acknowledgement.
+ // If the send buffer is full, send blocks until there is space in the buffer.
+ SendForget(m amqp.Message) error
+
+ // SendForgetTimeout is like send but only waits up to timeout for buffer space.
+ // Returns Timeout error if the timeout expires and the message has not been sent.
+ SendForgetTimeout(m amqp.Message, timeout time.Duration) error
+
+ // Credit indicates how many messages the receiving end of the link can accept.
+ //
+ // On a Sender credit can be negative, meaning that messages in excess of the
+ // receiver's credit limit have been buffered locally till credit is available.
+ Credit() (int, error)
+}
+
+type sendMessage struct {
+ m amqp.Message
+ sm SentMessage
+}
+
+type sender struct {
+ link
+ blocked list.List // Channel of sendMessage for blocked senders.
+}
+
+// Disposition indicates the outcome of a settled message delivery.
+type Disposition uint64
+
+const (
+ // No disposition available: pre-settled, not yet acknowledged or an error occurred
+ NoDisposition Disposition = 0
+ // Message was accepted by the receiver
+ Accepted = proton.Accepted
+ // Message was rejected as invalid by the receiver
+ Rejected = proton.Rejected
+ // Message was not processed by the receiver but may be processed by some other receiver.
+ Released = proton.Released
+)
+
+// String human readable name for a Disposition.
+func (d Disposition) String() string {
+ switch d {
+ case NoDisposition:
+ return "no-disposition"
+ case Accepted:
+ return "accepted"
+ case Rejected:
+ return "rejected"
+ case Released:
+ return "released"
+ default:
+ return "unknown"
+ }
+}
+
+// Send a message, assumes there is credit
+func (s *sender) doSend(snd sendMessage) {
+ delivery, err := s.eLink.Send(snd.m)
+ switch sm := snd.sm.(type) {
+ case nil:
+ delivery.Settle()
+ case *sentMessage:
+ sm.delivery = delivery
+ if err != nil {
+ sm.settled(err)
+ } else {
+ s.handler().sentMessages[delivery] = sm
+ }
+ default:
+ internal.Assert(false, "bad SentMessage type %T", snd.sm)
+ }
+}
+
+func (s *sender) popBlocked() chan sendMessage {
+ if s.blocked.Len() > 0 {
+ return s.blocked.Remove(s.blocked.Front()).(chan sendMessage)
+ }
+ return nil
+}
+
+func (s *sender) Send(m amqp.Message) (SentMessage, error) {
+ return s.SendTimeout(m, Forever)
+}
+
+func (s *sender) SendTimeout(m amqp.Message, timeout time.Duration) (SentMessage, error) {
+ var sm SentMessage
+ if s.sndSettle == SndSettled {
+ sm = nil
+ } else {
+ sm = newSentMessage(s.session.connection)
+ }
+ return s.sendInternal(sendMessage{m, sm}, timeout)
+}
+
+func (s *sender) SendForget(m amqp.Message) error {
+ return s.SendForgetTimeout(m, Forever)
+}
+
+func (s *sender) SendForgetTimeout(m amqp.Message, timeout time.Duration) error {
+ snd := sendMessage{m, nil}
+ _, err := s.sendInternal(snd, timeout)
+ return err
+}
+
+func (s *sender) sendInternal(snd sendMessage, timeout time.Duration) (SentMessage, error) {
+ if s.Error() != nil {
+ return nil, s.Error()
+ }
+ var err error
+ if timeout == 0 {
+ err = s.engine().InjectWait(func() error {
+ if s.eLink.Credit() > 0 {
+ s.doSend(snd)
+ return nil
+ }
+ return Timeout
+ })
+ } else {
+ buf := make(chan sendMessage)
+ done := make(chan struct{})
+ defer close(buf)
+ s.engine().Inject(func() { // Runs concurrently
+ if s.eLink.Credit() > 0 {
+ s.doSend(snd)
+ close(done) // Signal already sent
+ } else {
+ s.blocked.PushBack(buf)
+ }
+ })
+ select {
+ case <-done: // Sent without blocking
+ case buf <- snd: // Sent via blocking channel
+ case <-s.done:
+ err = s.Error()
+ case <-After(timeout):
+ err = Timeout
+ }
+ }
+ if err != nil {
+ return nil, err
+ }
+ return snd.sm, nil
+}
+
+func (s *sender) closed(err error) {
+ s.link.closed(err)
+}
+
+func (s *sender) open() {
+ s.link.open()
+ s.handler().addLink(s.eLink, s)
+}
+
+// SentMessage represents a previously sent message. It allows you to wait for acknowledgement.
+type SentMessage interface {
+
+ // Disposition blocks till the message is acknowledged and returns the
+ // disposition state.
+ //
+ // NoDisposition with Error() != nil means the Connection was closed before
+ // the message was acknowledged.
+ //
+ // NoDisposition with Error() == nil means the message was pre-settled or
+ // Forget() was called.
+ Disposition() (Disposition, error)
+
+ // DispositionTimeout is like Disposition but gives up after timeout, see Timeout.
+ DispositionTimeout(time.Duration) (Disposition, error)
+
+ // Forget interrupts any call to Disposition on this SentMessage and tells the
+ // peer we are no longer interested in the disposition of this message.
+ Forget()
+
+ // Error returns the error that closed the disposition, or nil if there was no error.
+ // If the disposition closed because the connection closed, it will return Closed.
+ Error() error
+
+ // Value is an optional value you wish to associate with the SentMessage. It
+ // can be the message itself or some form of identifier.
+ Value() interface{}
+ SetValue(interface{})
+}
+
+// SentMessageSet is a concurrent-safe set of sent messages that can be checked
+// to get the next completed sent message
+type SentMessageSet struct {
+ cases []reflect.SelectCase
+ sm []SentMessage
+ done chan SentMessage
+}
+
+func (s *SentMessageSet) Add(sm SentMessage) {
+ s.sm = append(s.sm, sm)
+ s.cases = append(s.cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sm.(*sentMessage).done)})
+}
+
+// Wait waits up to timeout and returns the next SentMessage that has a valid dispositionb
+// or an error.
+func (s *SentMessageSet) Wait(sm SentMessage, timeout time.Duration) (SentMessage, error) {
+ s.cases = s.cases[:len(s.sm)] // Remove previous timeout cases
+ if timeout == 0 { // Non-blocking
+ s.cases = append(s.cases, reflect.SelectCase{Dir: reflect.SelectDefault})
+ } else {
+ s.cases = append(s.cases,
+ reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(After(timeout))})
+ }
+ chosen, _, _ := reflect.Select(s.cases)
+ if chosen > len(s.sm) {
+ return nil, Timeout
+ } else {
+ sm := s.sm[chosen]
+ s.sm = append(s.sm[:chosen], s.sm[chosen+1:]...)
+ return sm, nil
+ }
+}
+
+// SentMessage implementation
+type sentMessage struct {
+ connection *connection
+ done chan struct{}
+ delivery proton.Delivery
+ disposition Disposition
+ err error
+ value interface{}
+}
+
+func newSentMessage(c *connection) *sentMessage {
+ return &sentMessage{connection: c, done: make(chan struct{})}
+}
+
+func (sm *sentMessage) SetValue(v interface{}) { sm.value = v }
+func (sm *sentMessage) Value() interface{} { return sm.value }
+func (sm *sentMessage) Disposition() (Disposition, error) {
+ <-sm.done
+ return sm.disposition, sm.err
+}
+
+func (sm *sentMessage) DispositionTimeout(timeout time.Duration) (Disposition, error) {
+ if _, _, timedout := timedReceive(sm.done, timeout); timedout {
+ return sm.disposition, Timeout
+ } else {
+ return sm.disposition, sm.err
+ }
+}
+
+func (sm *sentMessage) Forget() {
+ sm.connection.engine.Inject(func() {
+ sm.delivery.Settle()
+ delete(sm.connection.handler.sentMessages, sm.delivery)
+ })
+ sm.finish()
+}
+
+func (sm *sentMessage) settled(err error) {
+ if sm.delivery.Settled() {
+ sm.disposition = Disposition(sm.delivery.Remote().Type())
+ }
+ sm.err = err
+ sm.finish()
+}
+
+func (sm *sentMessage) finish() {
+ select {
+ case <-sm.done: // No-op if already closed
+ default:
+ close(sm.done)
+ }
+}
+
+func (sm *sentMessage) Error() error { return sm.err }
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
new file mode 100644
index 0000000..612658a
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
@@ -0,0 +1,98 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
+ "qpid.apache.org/proton"
+)
+
+// Session is an AMQP session, it contains Senders and Receivers.
+//
+type Session interface {
+ Endpoint
+
+ // Sender opens a new sender. v can be a string, which is used as the Target
+ // address, or a SenderSettings struct containing more details settings.
+ Sender(...LinkSetting) (Sender, error)
+
+ // Receiver opens a new Receiver. v can be a string, which is used as the
+ // Source address, or a ReceiverSettings struct containing more details
+ // settings.
+ Receiver(...LinkSetting) (Receiver, error)
+
+ // SetCapacity sets the session buffer capacity in bytes.
+ // Only has effect if called in an accept() function, see Connection.Listen()
+ SetCapacity(bytes uint)
+}
+
+type session struct {
+ endpoint
+ eSession proton.Session
+ connection *connection
+ capacity uint
+}
+
+// in proton goroutine
+func newSession(c *connection, es proton.Session) *session {
+ return &session{
+ connection: c,
+ eSession: es,
+ endpoint: endpoint{str: es.String()},
+ }
+}
+
+func (s *session) Connection() Connection { return s.connection }
+func (s *session) eEndpoint() proton.Endpoint { return s.eSession }
+func (s *session) engine() *proton.Engine { return s.connection.engine }
+func (s *session) Close(err error) {
+ s.engine().Inject(func() { localClose(s.eSession, err) })
+}
+
+func (s *session) SetCapacity(bytes uint) { s.capacity = bytes }
+
+func (s *session) Sender(setting ...LinkSetting) (snd Sender, err error) {
+ err = s.engine().InjectWait(func() error {
+ l, err := localLink(s, true, setting...)
+ if err == nil {
+ snd = &sender{link: *l}
+ snd.(*sender).open()
+ }
+ return err
+ })
+ return
+}
+
+func (s *session) Receiver(setting ...LinkSetting) (rcv Receiver, err error) {
+ err = s.engine().InjectWait(func() error {
+ l, err := localLink(s, false, setting...)
+ if err == nil {
+ rcv = &receiver{link: *l}
+ rcv.(*receiver).open()
+ }
+ return err
+ })
+ return
+}
+
+// Called from handler on closed.
+func (s *session) closed(err error) {
+ s.err.Set(err)
+ s.err.Set(Closed)
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/electron/time.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/time.go b/proton-c/bindings/go/src/qpid.apache.org/electron/time.go
new file mode 100644
index 0000000..ee61332
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/time.go
@@ -0,0 +1,81 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
+ "qpid.apache.org/internal"
+ "reflect"
+ "time"
+)
+
+// Timeout is the error returned if an operation does not complete on time.
+//
+// Methods named *Timeout in this package take time.Duration timeout parameter.
+//
+// If timeout > 0 and there is no result available before the timeout, they
+// return a zero or nil value and Timeout as an error.
+//
+// If timeout == 0 they will return a result if one is immediatley available or
+// nil/zero and Timeout as an error if not.
+//
+// If timeout == Forever the function will return only when there is a result or
+// some non-timeout error occurs.
+//
+var Timeout = internal.Errorf("timeout")
+
+// Forever can be used as a timeout parameter to indicate wait forever.
+const Forever time.Duration = -1
+
+// timedReceive receives on channel (which can be a chan of any type), waiting
+// up to timeout.
+//
+// timeout==0 means do a non-blocking receive attempt. timeout < 0 means block
+// forever. Other values mean block up to the timeout.
+//
+func timedReceive(channel interface{}, timeout time.Duration) (value interface{}, ok bool, timedout bool) {
+ cases := []reflect.SelectCase{
+ reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(channel)},
+ }
+ switch {
+ case timeout == 0: // Non-blocking receive
+ cases = append(cases, reflect.SelectCase{Dir: reflect.SelectDefault})
+ case timeout == Forever: // Block forever, nothing to add
+ default: // Block up to timeout
+ cases = append(cases,
+ reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(time.After(timeout))})
+ }
+ chosen, recv, recvOk := reflect.Select(cases)
+ switch {
+ case chosen == 0:
+ return recv.Interface(), recvOk, false
+ default:
+ return nil, false, true
+ }
+}
+
+// After is like time.After but returns a nil channel if timeout == Forever
+// since selecting on a nil channel will never return.
+func After(timeout time.Duration) <-chan time.Time {
+ if timeout == Forever {
+ return nil
+ } else {
+ return time.After(timeout)
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/internal/error.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/internal/error.go b/proton-c/bindings/go/src/qpid.apache.org/internal/error.go
new file mode 100644
index 0000000..1b108e6
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/internal/error.go
@@ -0,0 +1,118 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+// Internal implementation details - ignore.
+package internal
+
+// #cgo LDFLAGS: -lqpid-proton
+// #include <proton/error.h>
+// #include <proton/codec.h>
+import "C"
+
+import (
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "unsafe"
+)
+
+// Error type for proton runtime errors returned as error values.
+type Error string
+
+// Error prefixes error message with proton:
+func (e Error) Error() string {
+ return "proton: " + string(e)
+}
+
+// Errorf creates an Error with a formatted message
+func Errorf(format string, a ...interface{}) Error {
+ return Error(fmt.Sprintf(format, a...))
+}
+
+type PnErrorCode int
+
+func (e PnErrorCode) String() string {
+ switch e {
+ case C.PN_EOS:
+ return "end-of-data"
+ case C.PN_ERR:
+ return "error"
+ case C.PN_OVERFLOW:
+ return "overflow"
+ case C.PN_UNDERFLOW:
+ return "underflow"
+ case C.PN_STATE_ERR:
+ return "bad-state"
+ case C.PN_ARG_ERR:
+ return "invalid-argument"
+ case C.PN_TIMEOUT:
+ return "timeout"
+ case C.PN_INTR:
+ return "interrupted"
+ case C.PN_INPROGRESS:
+ return "in-progress"
+ default:
+ return fmt.Sprintf("unknown-error(%d)", e)
+ }
+}
+
+func PnError(p unsafe.Pointer) error {
+ e := (*C.pn_error_t)(p)
+ if e == nil || C.pn_error_code(e) == 0 {
+ return nil
+ }
+ return Errorf("%s: %s", PnErrorCode(C.pn_error_code(e)), C.GoString(C.pn_error_text(e)))
+}
+
+// panicIf panics if condition is true, the panic value is Errorf(fmt, args...)
+func panicIf(condition bool, fmt string, args ...interface{}) {
+ if condition {
+ panic(Errorf(fmt, args...))
+ }
+}
+
+// ErrorHolder is a goroutine-safe error holder that keeps the first error that is set.
+type ErrorHolder struct {
+ once sync.Once
+ value atomic.Value
+}
+
+// Set the error if not already set, return the error in the Holder.
+func (e *ErrorHolder) Set(err error) {
+ if err != nil {
+ e.once.Do(func() { e.value.Store(err) })
+ }
+}
+
+// Get the error.
+func (e *ErrorHolder) Get() (err error) {
+ err, _ = e.value.Load().(error)
+ return
+}
+
+// Assert panics if condition is false with optional formatted message
+func Assert(condition bool, format ...interface{}) {
+ if !condition {
+ if len(format) > 0 {
+ panic(Errorf(format[0].(string), format[1:]...))
+ } else {
+ panic(Errorf("assertion failed"))
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/internal/flexchannel.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/internal/flexchannel.go b/proton-c/bindings/go/src/qpid.apache.org/internal/flexchannel.go
new file mode 100644
index 0000000..77b524c
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/internal/flexchannel.go
@@ -0,0 +1,82 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package internal
+
+// FlexChannel acts like a channel with an automatically sized buffer, see NewFlexChannel().
+type FlexChannel struct {
+ // In channel to send to. close(In) will close the FlexChannel once buffer has drained.
+ In chan<- interface{}
+ // Out channel to receive from. Out closes when In has closed and the buffer is empty.
+ Out <-chan interface{}
+
+ in, out chan interface{}
+ buffer []interface{}
+ limit int
+}
+
+// NewFlexChannel creates a FlexChannel, a channel with an automatically-sized buffer.
+//
+// Initially the buffer size is 0, the buffer grows as needed up to limit. limit < 0 means
+// there is no limit.
+//
+func NewFlexChannel(limit int) *FlexChannel {
+ fc := &FlexChannel{
+ in: make(chan interface{}),
+ out: make(chan interface{}),
+ buffer: make([]interface{}, 0),
+ limit: limit,
+ }
+ fc.In = fc.in
+ fc.Out = fc.out
+ go fc.run()
+ return fc
+}
+
+func (fc *FlexChannel) run() {
+ defer func() { // Flush the channel on exit
+ for _, data := range fc.buffer {
+ fc.out <- data
+ }
+ close(fc.out)
+ }()
+
+ for {
+ var usein, useout chan interface{}
+ var outvalue interface{}
+ if len(fc.buffer) > 0 {
+ useout = fc.out
+ outvalue = fc.buffer[0]
+ }
+ if len(fc.buffer) < fc.limit || fc.limit < 0 {
+ usein = fc.in
+ }
+ Assert(usein != nil || useout != nil)
+ select {
+ case useout <- outvalue:
+ fc.buffer = fc.buffer[1:]
+ case data, ok := <-usein:
+ if ok {
+ fc.buffer = append(fc.buffer, data)
+ } else {
+ return
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/internal/flexchannel_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/internal/flexchannel_test.go b/proton-c/bindings/go/src/qpid.apache.org/internal/flexchannel_test.go
new file mode 100644
index 0000000..d0e1a44
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/internal/flexchannel_test.go
@@ -0,0 +1,89 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package internal
+
+import (
+ "testing"
+)
+
+func recvall(ch <-chan interface{}) (result []interface{}) {
+ for {
+ select {
+ case x := <-ch:
+ result = append(result, x)
+ default:
+ return
+ }
+ }
+}
+
+func sendall(data []interface{}, ch chan<- interface{}) {
+}
+
+func TestFlex(t *testing.T) {
+ fc := NewFlexChannel(5)
+
+ // Test send/receve
+ go func() {
+ for i := 0; i < 4; i++ {
+ fc.In <- i
+ }
+ }()
+
+ for i := 0; i < 4; i++ {
+ j := <-fc.Out
+ if i != j {
+ t.Error("%v != %v", i, j)
+ }
+ }
+ select {
+ case x, ok := <-fc.Out:
+ t.Error("receive empty channel got", x, ok)
+ default:
+ }
+
+ // Test buffer limit
+ for i := 10; i < 15; i++ {
+ fc.In <- i
+ }
+ select {
+ case fc.In <- 0:
+ t.Error("send to full channel did not block")
+ default:
+ }
+ i := <-fc.Out
+ if i != 10 {
+ t.Error("%v != %v", i, 10)
+ }
+ fc.In <- 15
+ close(fc.In)
+
+ for i := 11; i < 16; i++ {
+ j := <-fc.Out
+ if i != j {
+ t.Error("%v != %v", i, j)
+ }
+ }
+
+ x, ok := <-fc.Out
+ if ok {
+ t.Error("Unexpected value on Out", x)
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/internal/safemap.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/internal/safemap.go b/proton-c/bindings/go/src/qpid.apache.org/internal/safemap.go
new file mode 100644
index 0000000..3a1fe2b
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/internal/safemap.go
@@ -0,0 +1,57 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package internal
+
+import (
+ "sync"
+)
+
+// SafeMap is a goroutine-safe map of interface{} to interface{}.
+type SafeMap struct {
+ m map[interface{}]interface{}
+ lock sync.Mutex
+}
+
+func MakeSafeMap() SafeMap { return SafeMap{m: make(map[interface{}]interface{})} }
+
+func (m *SafeMap) Get(key interface{}) interface{} {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+ return m.m[key]
+}
+
+func (m *SafeMap) GetOk(key interface{}) (interface{}, bool) {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+ v, ok := m.m[key]
+ return v, ok
+}
+
+func (m *SafeMap) Put(key, value interface{}) {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+ m.m[key] = value
+}
+
+func (m *SafeMap) Delete(key interface{}) {
+ m.lock.Lock()
+ defer m.lock.Unlock()
+ delete(m.m, key)
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/internal/uuid.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/internal/uuid.go b/proton-c/bindings/go/src/qpid.apache.org/internal/uuid.go
new file mode 100644
index 0000000..ef941a1
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/internal/uuid.go
@@ -0,0 +1,70 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package internal
+
+import (
+ "fmt"
+ "math/rand"
+ "strconv"
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+type UUID [16]byte
+
+func (u UUID) String() string {
+ return fmt.Sprintf("%X-%X-%X-%X-%X", u[0:4], u[4:6], u[6:8], u[8:10], u[10:])
+}
+
+// Don't mess with the default random source.
+var randomSource = rand.NewSource(time.Now().UnixNano())
+var randomLock sync.Mutex
+
+func random() byte {
+ randomLock.Lock()
+ defer randomLock.Unlock()
+ return byte(randomSource.Int63())
+}
+
+func UUID4() UUID {
+ var u UUID
+ for i := 0; i < len(u); i++ {
+ u[i] = random()
+ }
+ // See /https://tools.ietf.org/html/rfc4122#section-4.4
+ u[6] = (u[6] & 0x0F) | 0x40 // Version bits to 4
+ u[8] = (u[8] & 0x3F) | 0x80 // Reserved bits (top two) set to 01
+ return u
+}
+
+// A simple atomic counter to generate unique 64 bit IDs.
+type IdCounter struct{ count uint64 }
+
+// NextInt gets the next uint64 value from the atomic counter.
+func (uc *IdCounter) NextInt() uint64 {
+ return atomic.AddUint64(&uc.count, 1)
+}
+
+// Next gets the next integer value encoded as a base32 string, safe for NUL
+// terminated C strings.
+func (uc *IdCounter) Next() string {
+ return strconv.FormatUint(uc.NextInt(), 32)
+}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/proton/README.md
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/README.md b/proton-c/bindings/go/src/qpid.apache.org/proton/README.md
deleted file mode 100644
index ad57b47..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/README.md
+++ /dev/null
@@ -1,12 +0,0 @@
-# Go binding for proton
-
-This is a a [Go](http://golang.org) binding for proton.
-Package documentation is available at: <http://godoc.org/qpid.apache.org/proton>
-
-See the [examples](https://github.com/apache/qpid-proton/blob/master/examples/cpp/README.md)
-for working examples and practical instructions on how to get started.
-
-Feedback is encouraged at:
-
-- Email <pr...@qpid.apache.org>
-- Create issues <https://issues.apache.org/jira/browse/PROTON>, attach patches to an issue.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/doc.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/doc.go b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/doc.go
deleted file mode 100644
index cc2cd0e..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/doc.go
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-/*
-Package amqp encodes and decodes AMQP messages and data as Go types.
-
-It follows the standard 'encoding' libraries pattern. The mapping between AMQP
-and Go types is described in the documentation of the Marshal and Unmarshal
-functions.
-
-AMQP is an open standard for inter-operable message exchange, see <http://www.amqp.org/>
-*/
-package amqp
-
-// #cgo LDFLAGS: -lqpid-proton
-import "C"
-
-// This file is just for the package comment.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/error.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/error.go b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/error.go
deleted file mode 100644
index 868dbf3..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/error.go
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package amqp
-
-import (
- "fmt"
- "reflect"
-)
-
-// Error is an AMQP error condition. It has a name and a description.
-// It implements the Go error interface so can be returned as an error value.
-//
-// You can pass amqp.Error to methods that pass an error to a remote endpoint,
-// this gives you full control over what the remote endpoint will see.
-//
-// You can also pass any Go error to such functions, the remote peer
-// will see the equivalent of MakeError(error)
-//
-type Error struct{ Name, Description string }
-
-// Error implements the Go error interface for AMQP error errors.
-func (c Error) Error() string { return fmt.Sprintf("proton %s: %s", c.Name, c.Description) }
-
-// Errorf makes a Error with name and formatted description as per fmt.Sprintf
-func Errorf(name, format string, arg ...interface{}) Error {
- return Error{name, fmt.Sprintf(format, arg...)}
-}
-
-// MakeError makes an AMQP error from a go error using the Go error type as the name
-// and the err.Error() string as the description.
-func MakeError(err error) Error {
- return Error{reflect.TypeOf(err).Name(), err.Error()}
-}
-
-var (
- InternalError = "amqp:internal-error"
- NotFound = "amqp:not-found"
- UnauthorizedAccess = "amqp:unauthorized-access"
- DecodeError = "amqp:decode-error"
- ResourceLimit = "amqp:resource-limit"
- NotAllowed = "amqp:not-allowed"
- InvalidField = "amqp:invalid-field"
- NotImplemented = "amqp:not-implemented"
- ResourceLocked = "amqp:resource-locked"
- PreerrorFailed = "amqp:preerror-failed"
- ResourceDeleted = "amqp:resource-deleted"
- IllegalState = "amqp:illegal-state"
- FrameSizeTooSmall = "amqp:frame-size-too-small"
-)
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/interop
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/interop b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/interop
deleted file mode 120000
index b2dd603..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/interop
+++ /dev/null
@@ -1 +0,0 @@
-../../../../../../../tests/interop
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/interop_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/interop_test.go b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/interop_test.go
deleted file mode 100644
index b36ef64..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/interop_test.go
+++ /dev/null
@@ -1,381 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-// Test that conversion of Go type to/from AMQP is compatible with other
-// bindings.
-//
-package amqp
-
-import (
- "bytes"
- "fmt"
- "io"
- "io/ioutil"
- "os"
- "reflect"
- "strings"
- "testing"
-)
-
-func checkEqual(want interface{}, got interface{}) error {
- if !reflect.DeepEqual(want, got) {
- return fmt.Errorf("%#v != %#v", want, got)
- }
- return nil
-}
-
-func getReader(name string) (r io.Reader) {
- r, err := os.Open("interop/" + name + ".amqp")
- if err != nil {
- panic(fmt.Errorf("Can't open %#v: %v", name, err))
- }
- return
-}
-
-func remaining(d *Decoder) string {
- remainder, _ := ioutil.ReadAll(io.MultiReader(d.Buffered(), d.reader))
- return string(remainder)
-}
-
-// checkDecode: want is the expected value, gotPtr is a pointer to a
-// instance of the same type for Decode.
-func checkDecode(d *Decoder, want interface{}, gotPtr interface{}, t *testing.T) {
-
- if err := d.Decode(gotPtr); err != nil {
- t.Error("Decode failed", err)
- return
- }
- got := reflect.ValueOf(gotPtr).Elem().Interface()
- if err := checkEqual(want, got); err != nil {
- t.Error("Decode bad value:", err)
- return
- }
-
- // Try round trip encoding
- bytes, err := Marshal(want, nil)
- if err != nil {
- t.Error("Marshal failed", err)
- return
- }
- n, err := Unmarshal(bytes, gotPtr)
- if err != nil {
- t.Error("Unmarshal failed", err)
- return
- }
- if err := checkEqual(n, len(bytes)); err != nil {
- t.Error("Bad unmarshal length", err)
- return
- }
- got = reflect.ValueOf(gotPtr).Elem().Interface()
- if err = checkEqual(want, got); err != nil {
- t.Error("Bad unmarshal value", err)
- return
- }
-}
-
-func TestUnmarshal(t *testing.T) {
- bytes, err := ioutil.ReadAll(getReader("strings"))
- if err != nil {
- t.Error(err)
- }
- for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", "", ""} {
- var got string
- n, err := Unmarshal(bytes, &got)
- if err != nil {
- t.Error(err)
- }
- if want != got {
- t.Errorf("%#v != %#v", want, got)
- }
- bytes = bytes[n:]
- }
-}
-
-func TestPrimitivesExact(t *testing.T) {
- d := NewDecoder(getReader("primitives"))
- // Decoding into exact types
- var b bool
- checkDecode(d, true, &b, t)
- checkDecode(d, false, &b, t)
- var u8 uint8
- checkDecode(d, uint8(42), &u8, t)
- var u16 uint16
- checkDecode(d, uint16(42), &u16, t)
- var i16 int16
- checkDecode(d, int16(-42), &i16, t)
- var u32 uint32
- checkDecode(d, uint32(12345), &u32, t)
- var i32 int32
- checkDecode(d, int32(-12345), &i32, t)
- var u64 uint64
- checkDecode(d, uint64(12345), &u64, t)
- var i64 int64
- checkDecode(d, int64(-12345), &i64, t)
- var f32 float32
- checkDecode(d, float32(0.125), &f32, t)
- var f64 float64
- checkDecode(d, float64(0.125), &f64, t)
-}
-
-func TestPrimitivesCompatible(t *testing.T) {
- d := NewDecoder(getReader("primitives"))
- // Decoding into compatible types
- var b bool
- var i int
- var u uint
- var f float64
- checkDecode(d, true, &b, t)
- checkDecode(d, false, &b, t)
- checkDecode(d, uint(42), &u, t)
- checkDecode(d, uint(42), &u, t)
- checkDecode(d, -42, &i, t)
- checkDecode(d, uint(12345), &u, t)
- checkDecode(d, -12345, &i, t)
- checkDecode(d, uint(12345), &u, t)
- checkDecode(d, -12345, &i, t)
- checkDecode(d, 0.125, &f, t)
- checkDecode(d, 0.125, &f, t)
-}
-
-// checkDecodeValue: want is the expected value, decode into a reflect.Value
-func checkDecodeInterface(d *Decoder, want interface{}, t *testing.T) {
-
- var got, got2 interface{}
- if err := d.Decode(&got); err != nil {
- t.Error("Decode failed", err)
- return
- }
- if err := checkEqual(want, got); err != nil {
- t.Error(err)
- return
- }
- // Try round trip encoding
- bytes, err := Marshal(got, nil)
- if err != nil {
- t.Error(err)
- return
- }
- n, err := Unmarshal(bytes, &got2)
- if err != nil {
- t.Error(err)
- return
- }
- if err := checkEqual(n, len(bytes)); err != nil {
- t.Error(err)
- return
- }
- if err := checkEqual(want, got2); err != nil {
- t.Error(err)
- return
- }
-}
-
-func TestPrimitivesInterface(t *testing.T) {
- d := NewDecoder(getReader("primitives"))
- checkDecodeInterface(d, true, t)
- checkDecodeInterface(d, false, t)
- checkDecodeInterface(d, uint8(42), t)
- checkDecodeInterface(d, uint16(42), t)
- checkDecodeInterface(d, int16(-42), t)
- checkDecodeInterface(d, uint32(12345), t)
- checkDecodeInterface(d, int32(-12345), t)
- checkDecodeInterface(d, uint64(12345), t)
- checkDecodeInterface(d, int64(-12345), t)
- checkDecodeInterface(d, float32(0.125), t)
- checkDecodeInterface(d, float64(0.125), t)
-}
-
-func TestStrings(t *testing.T) {
- d := NewDecoder(getReader("strings"))
- // Test decoding as plain Go strings
- for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", "", ""} {
- var got string
- checkDecode(d, want, &got, t)
- }
- remains := remaining(d)
- if remains != "" {
- t.Errorf("leftover: %s", remains)
- }
-
- // Test decoding as specific string types
- d = NewDecoder(getReader("strings"))
- var bytes []byte
- var str, sym string
- checkDecode(d, []byte("abc\000defg"), &bytes, t)
- checkDecode(d, "abcdefg", &str, t)
- checkDecode(d, "abcdefg", &sym, t)
- checkDecode(d, make([]byte, 0), &bytes, t)
- checkDecode(d, "", &str, t)
- checkDecode(d, "", &sym, t)
- remains = remaining(d)
- if remains != "" {
- t.Fatalf("leftover: %s", remains)
- }
-
- // Test some error handling
- d = NewDecoder(getReader("strings"))
- var s string
- err := d.Decode(s)
- if err == nil {
- t.Fatal("Expected error")
- }
- if !strings.Contains(err.Error(), "not a pointer") {
- t.Error(err)
- }
- var i int
- err = d.Decode(&i)
- if !strings.Contains(err.Error(), "cannot unmarshal") {
- t.Error(err)
- }
- _, err = Unmarshal([]byte{}, nil)
- if !strings.Contains(err.Error(), "not enough data") {
- t.Error(err)
- }
- _, err = Unmarshal([]byte("foobar"), nil)
- if !strings.Contains(err.Error(), "invalid-argument") {
- t.Error(err)
- }
-}
-
-func TestEncodeDecode(t *testing.T) {
- type data struct {
- s string
- i int
- u8 uint8
- b bool
- f float32
- v interface{}
- }
-
- in := data{"foo", 42, 9, true, 1.234, "thing"}
-
- buf := bytes.Buffer{}
- e := NewEncoder(&buf)
- if err := e.Encode(in.s); err != nil {
- t.Error(err)
- }
- if err := e.Encode(in.i); err != nil {
- t.Error(err)
- }
- if err := e.Encode(in.u8); err != nil {
- t.Error(err)
- }
- if err := e.Encode(in.b); err != nil {
- t.Error(err)
- }
- if err := e.Encode(in.f); err != nil {
- t.Error(err)
- }
- if err := e.Encode(in.v); err != nil {
- t.Error(err)
- }
-
- var out data
- d := NewDecoder(&buf)
- if err := d.Decode(&out.s); err != nil {
- t.Error(err)
- }
- if err := d.Decode(&out.i); err != nil {
- t.Error(err)
- }
- if err := d.Decode(&out.u8); err != nil {
- t.Error(err)
- }
- if err := d.Decode(&out.b); err != nil {
- t.Error(err)
- }
- if err := d.Decode(&out.f); err != nil {
- t.Error(err)
- }
- if err := d.Decode(&out.v); err != nil {
- t.Error(err)
- }
-
- if err := checkEqual(in, out); err != nil {
- t.Error(err)
- }
-}
-
-func TestMap(t *testing.T) {
- d := NewDecoder(getReader("maps"))
-
- // Generic map
- var m Map
- checkDecode(d, Map{"one": int32(1), "two": int32(2), "three": int32(3)}, &m, t)
-
- // Interface as map
- var i interface{}
- checkDecode(d, Map{int32(1): "one", int32(2): "two", int32(3): "three"}, &i, t)
-
- d = NewDecoder(getReader("maps"))
- // Specific typed map
- var m2 map[string]int
- checkDecode(d, map[string]int{"one": 1, "two": 2, "three": 3}, &m2, t)
-
- // Nested map
- m = Map{int64(1): "one", "two": int32(2), true: Map{uint8(1): true, uint8(2): false}}
- bytes, err := Marshal(m, nil)
- if err != nil {
- t.Fatal(err)
- }
- _, err = Unmarshal(bytes, &i)
- if err != nil {
- t.Fatal(err)
- }
- if err = checkEqual(m, i); err != nil {
- t.Fatal(err)
- }
-}
-
-func TestList(t *testing.T) {
- d := NewDecoder(getReader("lists"))
- var l List
- checkDecode(d, List{int32(32), "foo", true}, &l, t)
- checkDecode(d, List{}, &l, t)
-}
-
-// TODO aconway 2015-09-08: the message.amqp file seems to be incorrectly coded as
-// as an AMQP string *inside* an AMQP binary?? Skip the test for now.
-func TODO_TestMessage(t *testing.T) {
- bytes, err := ioutil.ReadAll(getReader("message"))
- if err != nil {
- t.Fatal(err)
- }
-
- m, err := DecodeMessage(bytes)
- if err != nil {
- t.Fatal(err)
- } else {
- if err := checkEqual(m.Body(), "hello"); err != nil {
- t.Error(err)
- }
- }
-
- m2 := NewMessageWith("hello")
- bytes2, err := m2.Encode(nil)
- if err != nil {
- t.Error(err)
- } else {
- if err = checkEqual(bytes, bytes2); err != nil {
- t.Error(err)
- }
- }
-}
-
-// TODO aconway 2015-03-13: finish the full interop test
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/marshal.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/marshal.go b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/marshal.go
deleted file mode 100644
index e393c97..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/marshal.go
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package amqp
-
-// #include <proton/codec.h>
-import "C"
-
-import (
- "io"
- "qpid.apache.org/proton/internal"
- "reflect"
- "unsafe"
-)
-
-func dataError(prefix string, data *C.pn_data_t) error {
- err := internal.PnError(unsafe.Pointer(C.pn_data_error(data)))
- if err != nil {
- err = internal.Errorf("%s: %s", prefix, err.(internal.Error))
- }
- return err
-}
-
-/*
-Marshal encodes a Go value as AMQP data in buffer.
-If buffer is nil, or is not large enough, a new buffer is created.
-
-Returns the buffer used for encoding with len() adjusted to the actual size of data.
-
-Go types are encoded as follows
-
- +-------------------------------------+--------------------------------------------+
- |Go type |AMQP type |
- +-------------------------------------+--------------------------------------------+
- |bool |bool |
- +-------------------------------------+--------------------------------------------+
- |int8, int16, int32, int64 (int) |byte, short, int, long (int or long) |
- +-------------------------------------+--------------------------------------------+
- |uint8, uint16, uint32, uint64 (uint) |ubyte, ushort, uint, ulong (uint or ulong) |
- +-------------------------------------+--------------------------------------------+
- |float32, float64 |float, double. |
- +-------------------------------------+--------------------------------------------+
- |string |string |
- +-------------------------------------+--------------------------------------------+
- |[]byte, Binary |binary |
- +-------------------------------------+--------------------------------------------+
- |Symbol |symbol |
- +-------------------------------------+--------------------------------------------+
- |interface{} |the contained type |
- +-------------------------------------+--------------------------------------------+
- |nil |null |
- +-------------------------------------+--------------------------------------------+
- |map[K]T |map with K and T converted as above |
- +-------------------------------------+--------------------------------------------+
- |Map |map, may have mixed types for keys, values |
- +-------------------------------------+--------------------------------------------+
- |[]T |list with T converted as above |
- +-------------------------------------+--------------------------------------------+
- |List |list, may have mixed types values |
- +-------------------------------------+--------------------------------------------+
-
-The following Go types cannot be marshaled: uintptr, function, interface, channel
-
-TODO
-
-Go types: array, slice, struct, complex64/128.
-
-AMQP types: decimal32/64/128, char, timestamp, uuid, array, multi-section message bodies.
-
-Described types.
-
-*/
-func Marshal(v interface{}, buffer []byte) (outbuf []byte, err error) {
- defer doRecover(&err)
- data := C.pn_data(0)
- defer C.pn_data_free(data)
- marshal(v, data)
- encode := func(buf []byte) ([]byte, error) {
- n := int(C.pn_data_encode(data, cPtr(buf), cLen(buf)))
- switch {
- case n == int(C.PN_OVERFLOW):
- return buf, overflow
- case n < 0:
- return buf, dataError("marshal error", data)
- default:
- return buf[:n], nil
- }
- }
- return encodeGrow(buffer, encode)
-}
-
-const minEncode = 256
-
-// overflow is returned when an encoding function can't fit data in the buffer.
-var overflow = internal.Errorf("buffer too small")
-
-// encodeFn encodes into buffer[0:len(buffer)].
-// Returns buffer with length adjusted for data encoded.
-// If buffer too small, returns overflow as error.
-type encodeFn func(buffer []byte) ([]byte, error)
-
-// encodeGrow calls encode() into buffer, if it returns overflow grows the buffer.
-// Returns the final buffer.
-func encodeGrow(buffer []byte, encode encodeFn) ([]byte, error) {
- if buffer == nil || len(buffer) == 0 {
- buffer = make([]byte, minEncode)
- }
- var err error
- for buffer, err = encode(buffer); err == overflow; buffer, err = encode(buffer) {
- buffer = make([]byte, 2*len(buffer))
- }
- return buffer, err
-}
-
-func marshal(v interface{}, data *C.pn_data_t) {
- switch v := v.(type) {
- case nil:
- C.pn_data_put_null(data)
- case bool:
- C.pn_data_put_bool(data, C.bool(v))
- case int8:
- C.pn_data_put_byte(data, C.int8_t(v))
- case int16:
- C.pn_data_put_short(data, C.int16_t(v))
- case int32:
- C.pn_data_put_int(data, C.int32_t(v))
- case int64:
- C.pn_data_put_long(data, C.int64_t(v))
- case int:
- if unsafe.Sizeof(0) == 8 {
- C.pn_data_put_long(data, C.int64_t(v))
- } else {
- C.pn_data_put_int(data, C.int32_t(v))
- }
- case uint8:
- C.pn_data_put_ubyte(data, C.uint8_t(v))
- case uint16:
- C.pn_data_put_ushort(data, C.uint16_t(v))
- case uint32:
- C.pn_data_put_uint(data, C.uint32_t(v))
- case uint64:
- C.pn_data_put_ulong(data, C.uint64_t(v))
- case uint:
- if unsafe.Sizeof(0) == 8 {
- C.pn_data_put_ulong(data, C.uint64_t(v))
- } else {
- C.pn_data_put_uint(data, C.uint32_t(v))
- }
- case float32:
- C.pn_data_put_float(data, C.float(v))
- case float64:
- C.pn_data_put_double(data, C.double(v))
- case string:
- C.pn_data_put_string(data, pnBytes([]byte(v)))
- case []byte:
- C.pn_data_put_binary(data, pnBytes(v))
- case Binary:
- C.pn_data_put_binary(data, pnBytes([]byte(v)))
- case Symbol:
- C.pn_data_put_symbol(data, pnBytes([]byte(v)))
- case Map: // Special map type
- C.pn_data_put_map(data)
- C.pn_data_enter(data)
- for key, val := range v {
- marshal(key, data)
- marshal(val, data)
- }
- C.pn_data_exit(data)
- default:
- switch reflect.TypeOf(v).Kind() {
- case reflect.Map:
- putMap(data, v)
- case reflect.Slice:
- putList(data, v)
- default:
- panic(internal.Errorf("cannot marshal %s to AMQP", reflect.TypeOf(v)))
- }
- }
- err := dataError("marshal", data)
- if err != nil {
- panic(err)
- }
- return
-}
-
-func clearMarshal(v interface{}, data *C.pn_data_t) {
- C.pn_data_clear(data)
- marshal(v, data)
-}
-
-func putMap(data *C.pn_data_t, v interface{}) {
- mapValue := reflect.ValueOf(v)
- C.pn_data_put_map(data)
- C.pn_data_enter(data)
- for _, key := range mapValue.MapKeys() {
- marshal(key.Interface(), data)
- marshal(mapValue.MapIndex(key).Interface(), data)
- }
- C.pn_data_exit(data)
-}
-
-func putList(data *C.pn_data_t, v interface{}) {
- listValue := reflect.ValueOf(v)
- C.pn_data_put_list(data)
- C.pn_data_enter(data)
- for i := 0; i < listValue.Len(); i++ {
- marshal(listValue.Index(i).Interface(), data)
- }
- C.pn_data_exit(data)
-}
-
-// Encoder encodes AMQP values to an io.Writer
-type Encoder struct {
- writer io.Writer
- buffer []byte
-}
-
-// New encoder returns a new encoder that writes to w.
-func NewEncoder(w io.Writer) *Encoder {
- return &Encoder{w, make([]byte, minEncode)}
-}
-
-func (e *Encoder) Encode(v interface{}) (err error) {
- e.buffer, err = Marshal(v, e.buffer)
- if err == nil {
- e.writer.Write(e.buffer)
- }
- return err
-}
-
-func replace(data *C.pn_data_t, v interface{}) {
- C.pn_data_clear(data)
- marshal(v, data)
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/message.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/message.go b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/message.go
deleted file mode 100644
index 20cfa02..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/message.go
+++ /dev/null
@@ -1,347 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package amqp
-
-// #include <proton/types.h>
-// #include <proton/message.h>
-// #include <proton/codec.h>
-// #include <stdlib.h>
-//
-// /* Helper for setting message string fields */
-// typedef int (*set_fn)(pn_message_t*, const char*);
-// int msg_set_str(pn_message_t* m, char* s, set_fn set) {
-// int result = set(m, s);
-// free(s);
-// return result;
-// }
-//
-import "C"
-
-import (
- "qpid.apache.org/proton/internal"
- "runtime"
- "time"
- "unsafe"
-)
-
-// Message is the interface to an AMQP message.
-type Message interface {
- // Durable indicates that any parties taking responsibility
- // for the message must durably store the content.
- Durable() bool
- SetDurable(bool)
-
- // Priority impacts ordering guarantees. Within a
- // given ordered context, higher priority messages may jump ahead of
- // lower priority messages.
- Priority() uint8
- SetPriority(uint8)
-
- // TTL or Time To Live, a message it may be dropped after this duration
- TTL() time.Duration
- SetTTL(time.Duration)
-
- // FirstAcquirer indicates
- // that the recipient of the message is the first recipient to acquire
- // the message, i.e. there have been no failed delivery attempts to
- // other acquirers. Note that this does not mean the message has not
- // been delivered to, but not acquired, by other recipients.
- FirstAcquirer() bool
- SetFirstAcquirer(bool)
-
- // DeliveryCount tracks how many attempts have been made to
- // delivery a message.
- DeliveryCount() uint32
- SetDeliveryCount(uint32)
-
- // MessageId provides a unique identifier for a message.
- // it can be an a string, an unsigned long, a uuid or a
- // binary value.
- MessageId() interface{}
- SetMessageId(interface{})
-
- UserId() string
- SetUserId(string)
-
- Address() string
- SetAddress(string)
-
- Subject() string
- SetSubject(string)
-
- ReplyTo() string
- SetReplyTo(string)
-
- // CorrelationId is set on correlated request and response messages. It can be
- // an a string, an unsigned long, a uuid or a binary value.
- CorrelationId() interface{}
- SetCorrelationId(interface{})
-
- ContentType() string
- SetContentType(string)
-
- ContentEncoding() string
- SetContentEncoding(string)
-
- // ExpiryTime indicates an absoulte time when the message may be dropped.
- // A Zero time (i.e. t.isZero() == true) indicates a message never expires.
- ExpiryTime() time.Time
- SetExpiryTime(time.Time)
-
- CreationTime() time.Time
- SetCreationTime(time.Time)
-
- GroupId() string
- SetGroupId(string)
-
- GroupSequence() int32
- SetGroupSequence(int32)
-
- ReplyToGroupId() string
- SetReplyToGroupId(string)
-
- // Instructions - AMQP delivery instructions.
- Instructions() map[string]interface{}
- SetInstructions(v map[string]interface{})
-
- // Annotations - AMQP annotations.
- Annotations() map[string]interface{}
- SetAnnotations(v map[string]interface{})
-
- // Properties - Application properties.
- Properties() map[string]interface{}
- SetProperties(v map[string]interface{})
-
- // Inferred indicates how the message content
- // is encoded into AMQP sections. If inferred is true then binary and
- // list values in the body of the message will be encoded as AMQP DATA
- // and AMQP SEQUENCE sections, respectively. If inferred is false,
- // then all values in the body of the message will be encoded as AMQP
- // VALUE sections regardless of their type.
- Inferred() bool
- SetInferred(bool)
-
- // Marshal a Go value into the message body. See amqp.Marshal() for details.
- Marshal(interface{})
-
- // Unmarshal the message body into the value pointed to by v. See amqp.Unmarshal() for details.
- Unmarshal(interface{})
-
- // Body value resulting from the default unmarshalling of message body as interface{}
- Body() interface{}
-
- // Encode encodes the message as AMQP data. If buffer is non-nil and is large enough
- // the message is encoded into it, otherwise a new buffer is created.
- // Returns the buffer containing the message.
- Encode(buffer []byte) ([]byte, error)
-
- // Decode data into this message. Overwrites an existing message content.
- Decode(buffer []byte) error
-
- // Clear the message contents.
- Clear()
-
- // Copy the contents of another message to this one.
- Copy(m Message) error
-}
-
-type message struct{ pn *C.pn_message_t }
-
-func freeMessage(m *message) {
- C.pn_message_free(m.pn)
- m.pn = nil
-}
-
-// NewMessage creates a new message instance.
-func NewMessage() Message {
- m := &message{C.pn_message()}
- runtime.SetFinalizer(m, freeMessage)
- return m
-}
-
-// NewMessageWith creates a message with value as the body. Equivalent to
-// m := NewMessage(); m.Marshal(body)
-func NewMessageWith(value interface{}) Message {
- m := NewMessage()
- m.Marshal(value)
- return m
-}
-
-func (m *message) Clear() { C.pn_message_clear(m.pn) }
-
-func (m *message) Copy(x Message) error {
- if data, err := x.Encode(nil); err == nil {
- return m.Decode(data)
- } else {
- return err
- }
-}
-
-// ==== message get functions
-
-func rewindGet(data *C.pn_data_t) (v interface{}) {
- C.pn_data_rewind(data)
- C.pn_data_next(data)
- unmarshal(&v, data)
- return v
-}
-
-func rewindMap(data *C.pn_data_t) (v map[string]interface{}) {
- C.pn_data_rewind(data)
- C.pn_data_next(data)
- unmarshal(&v, data)
- return v
-}
-
-func (m *message) Inferred() bool { return bool(C.pn_message_is_inferred(m.pn)) }
-func (m *message) Durable() bool { return bool(C.pn_message_is_durable(m.pn)) }
-func (m *message) Priority() uint8 { return uint8(C.pn_message_get_priority(m.pn)) }
-func (m *message) TTL() time.Duration {
- return time.Duration(C.pn_message_get_ttl(m.pn)) * time.Millisecond
-}
-func (m *message) FirstAcquirer() bool { return bool(C.pn_message_is_first_acquirer(m.pn)) }
-func (m *message) DeliveryCount() uint32 { return uint32(C.pn_message_get_delivery_count(m.pn)) }
-func (m *message) MessageId() interface{} { return rewindGet(C.pn_message_id(m.pn)) }
-func (m *message) UserId() string { return goString(C.pn_message_get_user_id(m.pn)) }
-func (m *message) Address() string { return C.GoString(C.pn_message_get_address(m.pn)) }
-func (m *message) Subject() string { return C.GoString(C.pn_message_get_subject(m.pn)) }
-func (m *message) ReplyTo() string { return C.GoString(C.pn_message_get_reply_to(m.pn)) }
-func (m *message) CorrelationId() interface{} { return rewindGet(C.pn_message_correlation_id(m.pn)) }
-func (m *message) ContentType() string { return C.GoString(C.pn_message_get_content_type(m.pn)) }
-func (m *message) ContentEncoding() string { return C.GoString(C.pn_message_get_content_encoding(m.pn)) }
-
-func (m *message) ExpiryTime() time.Time {
- return time.Unix(0, int64(time.Millisecond*time.Duration(C.pn_message_get_expiry_time(m.pn))))
-}
-func (m *message) CreationTime() time.Time {
- return time.Unix(0, int64(time.Millisecond)*int64(C.pn_message_get_creation_time(m.pn)))
-}
-func (m *message) GroupId() string { return C.GoString(C.pn_message_get_group_id(m.pn)) }
-func (m *message) GroupSequence() int32 { return int32(C.pn_message_get_group_sequence(m.pn)) }
-func (m *message) ReplyToGroupId() string { return C.GoString(C.pn_message_get_reply_to_group_id(m.pn)) }
-
-func (m *message) Instructions() map[string]interface{} {
- return rewindMap(C.pn_message_instructions(m.pn))
-}
-func (m *message) Annotations() map[string]interface{} {
- return rewindMap(C.pn_message_annotations(m.pn))
-}
-func (m *message) Properties() map[string]interface{} {
- return rewindMap(C.pn_message_properties(m.pn))
-}
-
-// ==== message set methods
-
-func setData(v interface{}, data *C.pn_data_t) {
- C.pn_data_clear(data)
- marshal(v, data)
-}
-
-func dataString(data *C.pn_data_t) string {
- str := C.pn_string(C.CString(""))
- defer C.pn_free(unsafe.Pointer(str))
- C.pn_inspect(unsafe.Pointer(data), str)
- return C.GoString(C.pn_string_get(str))
-}
-
-func (m *message) SetInferred(b bool) { C.pn_message_set_inferred(m.pn, C.bool(m.Inferred())) }
-func (m *message) SetDurable(b bool) { C.pn_message_set_durable(m.pn, C.bool(b)) }
-func (m *message) SetPriority(b uint8) { C.pn_message_set_priority(m.pn, C.uint8_t(b)) }
-func (m *message) SetTTL(d time.Duration) {
- C.pn_message_set_ttl(m.pn, C.pn_millis_t(d/time.Millisecond))
-}
-func (m *message) SetFirstAcquirer(b bool) { C.pn_message_set_first_acquirer(m.pn, C.bool(b)) }
-func (m *message) SetDeliveryCount(c uint32) { C.pn_message_set_delivery_count(m.pn, C.uint32_t(c)) }
-func (m *message) SetMessageId(id interface{}) { setData(id, C.pn_message_id(m.pn)) }
-func (m *message) SetUserId(s string) { C.pn_message_set_user_id(m.pn, pnBytes(([]byte)(s))) }
-func (m *message) SetAddress(s string) {
- C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_address))
-}
-func (m *message) SetSubject(s string) {
- C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_subject))
-}
-func (m *message) SetReplyTo(s string) {
- C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to))
-}
-func (m *message) SetCorrelationId(c interface{}) { setData(c, C.pn_message_correlation_id(m.pn)) }
-func (m *message) SetContentType(s string) {
- C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_type))
-}
-func (m *message) SetContentEncoding(s string) {
- C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_encoding))
-}
-func (m *message) SetExpiryTime(t time.Time) { C.pn_message_set_expiry_time(m.pn, pnTime(t)) }
-func (m *message) SetCreationTime(t time.Time) { C.pn_message_set_creation_time(m.pn, pnTime(t)) }
-func (m *message) SetGroupId(s string) {
- C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_group_id))
-}
-func (m *message) SetGroupSequence(s int32) {
- C.pn_message_set_group_sequence(m.pn, C.pn_sequence_t(s))
-}
-func (m *message) SetReplyToGroupId(s string) {
- C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to_group_id))
-}
-
-func (m *message) SetInstructions(v map[string]interface{}) {
- setData(v, C.pn_message_instructions(m.pn))
-}
-func (m *message) SetAnnotations(v map[string]interface{}) { setData(v, C.pn_message_annotations(m.pn)) }
-func (m *message) SetProperties(v map[string]interface{}) { setData(v, C.pn_message_properties(m.pn)) }
-
-// Marshal/Unmarshal body
-func (m *message) Marshal(v interface{}) { clearMarshal(v, C.pn_message_body(m.pn)) }
-func (m *message) Unmarshal(v interface{}) { rewindUnmarshal(v, C.pn_message_body(m.pn)) }
-func (m *message) Body() (v interface{}) { m.Unmarshal(&v); return }
-
-func (m *message) Decode(data []byte) error {
- m.Clear()
- if len(data) == 0 {
- return internal.Errorf("empty buffer for decode")
- }
- if C.pn_message_decode(m.pn, cPtr(data), cLen(data)) < 0 {
- return internal.Errorf("decoding message: %s",
- internal.PnError(unsafe.Pointer(C.pn_message_error(m.pn))))
- }
- return nil
-}
-
-func DecodeMessage(data []byte) (m Message, err error) {
- m = NewMessage()
- err = m.Decode(data)
- return
-}
-
-func (m *message) Encode(buffer []byte) ([]byte, error) {
- encode := func(buf []byte) ([]byte, error) {
- len := cLen(buf)
- result := C.pn_message_encode(m.pn, cPtr(buf), &len)
- switch {
- case result == C.PN_OVERFLOW:
- return buf, overflow
- case result < 0:
- return buf, internal.Errorf("cannot encode message: %s", internal.PnErrorCode(result))
- default:
- return buf[:len], nil
- }
- }
- return encodeGrow(buffer, encode)
-}
-
-// TODO aconway 2015-09-14: Multi-section messages.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/message_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/message_test.go b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/message_test.go
deleted file mode 100644
index 7a6e5a8..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/message_test.go
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package amqp
-
-import (
- "testing"
- "time"
-)
-
-func roundTrip(m Message) error {
- buffer, err := m.Encode(nil)
- if err != nil {
- return err
- }
- m2, err := DecodeMessage(buffer)
- if err != nil {
- return err
- }
- return checkEqual(m, m2)
-}
-
-func TestDefaultMessage(t *testing.T) {
- m := NewMessage()
- // Check defaults
- for _, data := range [][]interface{}{
- {m.Inferred(), false},
- {m.Durable(), false},
- {m.Priority(), uint8(4)},
- {m.TTL(), time.Duration(0)},
- {m.UserId(), ""},
- {m.Address(), ""},
- {m.Subject(), ""},
- {m.ReplyTo(), ""},
- {m.ContentType(), ""},
- {m.ContentEncoding(), ""},
- {m.GroupId(), ""},
- {m.GroupSequence(), int32(0)},
- {m.ReplyToGroupId(), ""},
- {m.MessageId(), nil},
- {m.CorrelationId(), nil},
- {m.Instructions(), map[string]interface{}{}},
- {m.Annotations(), map[string]interface{}{}},
- {m.Properties(), map[string]interface{}{}},
- {m.Body(), nil},
- } {
- if err := checkEqual(data[0], data[1]); err != nil {
- t.Error(err)
- }
- }
- if err := roundTrip(m); err != nil {
- t.Error(err)
- }
-}
-
-func TestMessageRoundTrip(t *testing.T) {
- m := NewMessage()
- m.SetInferred(false)
- m.SetDurable(true)
- m.SetPriority(42)
- m.SetTTL(0)
- m.SetUserId("user")
- m.SetAddress("address")
- m.SetSubject("subject")
- m.SetReplyTo("replyto")
- m.SetContentType("content")
- m.SetContentEncoding("encoding")
- m.SetGroupId("group")
- m.SetGroupSequence(42)
- m.SetReplyToGroupId("replytogroup")
- m.SetMessageId("id")
- m.SetCorrelationId("correlation")
- m.SetInstructions(map[string]interface{}{"instructions": "foo"})
- m.SetAnnotations(map[string]interface{}{"annotations": "foo"})
- m.SetProperties(map[string]interface{}{"int": int32(32), "bool": true, "string": "foo"})
- m.Marshal("hello")
-
- for _, data := range [][]interface{}{
- {m.Inferred(), false},
- {m.Durable(), true},
- {m.Priority(), uint8(42)},
- {m.TTL(), time.Duration(0)},
- {m.UserId(), "user"},
- {m.Address(), "address"},
- {m.Subject(), "subject"},
- {m.ReplyTo(), "replyto"},
- {m.ContentType(), "content"},
- {m.ContentEncoding(), "encoding"},
- {m.GroupId(), "group"},
- {m.GroupSequence(), int32(42)},
- {m.ReplyToGroupId(), "replytogroup"},
- {m.MessageId(), "id"},
- {m.CorrelationId(), "correlation"},
- {m.Instructions(), map[string]interface{}{"instructions": "foo"}},
- {m.Annotations(), map[string]interface{}{"annotations": "foo"}},
- {m.Properties(), map[string]interface{}{"int": int32(32), "bool": true, "string": "foo"}},
- {m.Body(), "hello"},
- } {
- if err := checkEqual(data[0], data[1]); err != nil {
- t.Error(err)
- }
- }
- if err := roundTrip(m); err != nil {
- t.Error(err)
- }
-}
-
-func TestMessageBodyTypes(t *testing.T) {
- var s string
- var body interface{}
- var i int64
-
- m := NewMessageWith(int64(42))
- m.Unmarshal(&body)
- m.Unmarshal(&i)
- if err := checkEqual(body.(int64), int64(42)); err != nil {
- t.Error(err)
- }
- if err := checkEqual(i, int64(42)); err != nil {
- t.Error(err)
- }
-
- m = NewMessageWith("hello")
- m.Unmarshal(&s)
- m.Unmarshal(&body)
- if err := checkEqual(s, "hello"); err != nil {
- t.Error(err)
- }
- if err := checkEqual(body.(string), "hello"); err != nil {
- t.Error(err)
- }
- if err := roundTrip(m); err != nil {
- t.Error(err)
- }
-
- m = NewMessageWith(Binary("bin"))
- m.Unmarshal(&s)
- m.Unmarshal(&body)
- if err := checkEqual(body.(Binary), Binary("bin")); err != nil {
- t.Error(err)
- }
- if err := checkEqual(s, "bin"); err != nil {
- t.Error(err)
- }
- if err := roundTrip(m); err != nil {
- t.Error(err)
- }
-
- // TODO aconway 2015-09-08: array etc.
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/types.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/types.go b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/types.go
deleted file mode 100644
index 131c974..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/types.go
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements. See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership. The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied. See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package amqp
-
-// #include <proton/codec.h>
-import "C"
-
-import (
- "bytes"
- "fmt"
- "reflect"
- "time"
- "unsafe"
-)
-
-type Type C.pn_type_t
-
-func (t Type) String() string {
- switch C.pn_type_t(t) {
- case C.PN_NULL:
- return "null"
- case C.PN_BOOL:
- return "bool"
- case C.PN_UBYTE:
- return "ubyte"
- case C.PN_BYTE:
- return "byte"
- case C.PN_USHORT:
- return "ushort"
- case C.PN_SHORT:
- return "short"
- case C.PN_CHAR:
- return "char"
- case C.PN_UINT:
- return "uint"
- case C.PN_INT:
- return "int"
- case C.PN_ULONG:
- return "ulong"
- case C.PN_LONG:
- return "long"
- case C.PN_TIMESTAMP:
- return "timestamp"
- case C.PN_FLOAT:
- return "float"
- case C.PN_DOUBLE:
- return "double"
- case C.PN_DECIMAL32:
- return "decimal32"
- case C.PN_DECIMAL64:
- return "decimal64"
- case C.PN_DECIMAL128:
- return "decimal128"
- case C.PN_UUID:
- return "uuid"
- case C.PN_BINARY:
- return "binary"
- case C.PN_STRING:
- return "string"
- case C.PN_SYMBOL:
- return "symbol"
- case C.PN_DESCRIBED:
- return "described"
- case C.PN_ARRAY:
- return "array"
- case C.PN_LIST:
- return "list"
- case C.PN_MAP:
- return "map"
- case C.PN_INVALID:
- return "no-data"
- default:
- return fmt.Sprintf("unknown-type(%d)", t)
- }
-}
-
-// Go types
-var (
- bytesType = reflect.TypeOf([]byte{})
- valueType = reflect.TypeOf(reflect.Value{})
-)
-
-// TODO aconway 2015-04-08: can't handle AMQP maps with key types that are not valid Go map keys.
-
-// Map is a generic map that can have mixed key and value types and so can represent any AMQP map
-type Map map[interface{}]interface{}
-
-// List is a generic list that can hold mixed values and can represent any AMQP list.
-//
-type List []interface{}
-
-// Symbol is a string that is encoded as an AMQP symbol
-type Symbol string
-
-func (s Symbol) GoString() string { return fmt.Sprintf("s\"%s\"", s) }
-
-// Binary is a string that is encoded as an AMQP binary.
-// It is a string rather than a byte[] because byte[] is not hashable and can't be used as
-// a map key, AMQP frequently uses binary types as map keys. It can convert to and from []byte
-type Binary string
-
-func (b Binary) GoString() string { return fmt.Sprintf("b\"%s\"", b) }
-
-// GoString for Map prints values with their types, useful for debugging.
-func (m Map) GoString() string {
- out := &bytes.Buffer{}
- fmt.Fprintf(out, "%T{", m)
- i := len(m)
- for k, v := range m {
- fmt.Fprintf(out, "%T(%#v): %T(%#v)", k, k, v, v)
- i--
- if i > 0 {
- fmt.Fprint(out, ", ")
- }
- }
- fmt.Fprint(out, "}")
- return out.String()
-}
-
-// GoString for List prints values with their types, useful for debugging.
-func (l List) GoString() string {
- out := &bytes.Buffer{}
- fmt.Fprintf(out, "%T{", l)
- for i := 0; i < len(l); i++ {
- fmt.Fprintf(out, "%T(%#v)", l[i], l[i])
- if i == len(l)-1 {
- fmt.Fprint(out, ", ")
- }
- }
- fmt.Fprint(out, "}")
- return out.String()
-}
-
-// pnTime converts Go time.Time to Proton millisecond Unix time.
-func pnTime(t time.Time) C.pn_timestamp_t {
- secs := t.Unix()
- // Note: sub-second accuracy is not guaraunteed if the Unix time in
- // nanoseconds cannot be represented by an int64 (sometime around year 2260)
- msecs := (t.UnixNano() % int64(time.Second)) / int64(time.Millisecond)
- return C.pn_timestamp_t(secs*1000 + msecs)
-}
-
-// goTime converts a pn_timestamp_t to a Go time.Time.
-func goTime(t C.pn_timestamp_t) time.Time {
- secs := int64(t) / 1000
- nsecs := (int64(t) % 1000) * int64(time.Millisecond)
- return time.Unix(secs, nsecs)
-}
-
-func goBytes(cBytes C.pn_bytes_t) (bytes []byte) {
- if cBytes.start != nil {
- bytes = C.GoBytes(unsafe.Pointer(cBytes.start), C.int(cBytes.size))
- }
- return
-}
-
-func goString(cBytes C.pn_bytes_t) (str string) {
- if cBytes.start != nil {
- str = C.GoStringN(cBytes.start, C.int(cBytes.size))
- }
- return
-}
-
-func pnBytes(b []byte) C.pn_bytes_t {
- if len(b) == 0 {
- return C.pn_bytes_t{0, nil}
- } else {
- return C.pn_bytes_t{C.size_t(len(b)), (*C.char)(unsafe.Pointer(&b[0]))}
- }
-}
-
-func cPtr(b []byte) *C.char {
- if len(b) == 0 {
- return nil
- }
- return (*C.char)(unsafe.Pointer(&b[0]))
-}
-
-func cLen(b []byte) C.size_t {
- return C.size_t(len(b))
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org