You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2015/10/23 16:36:10 UTC

[08/50] [abbrv] qpid-proton git commit: PROTON-1011: Go example of event driven broker. Package renaming and some new features.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
new file mode 100644
index 0000000..92c0b90
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/receiver.go
@@ -0,0 +1,232 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
+	"qpid.apache.org/internal"
+	"qpid.apache.org/proton"
+	"qpid.apache.org/amqp"
+	"sync"
+	"time"
+)
+
+// Receiver is a Link that receives messages.
+//
+type Receiver interface {
+	Link
+
+	// Receive blocks until a message is available or until the Receiver is closed
+	// and has no more buffered messages.
+	Receive() (ReceivedMessage, error)
+
+	// ReceiveTimeout is like Receive but gives up after timeout, see Timeout.
+	//
+	// Note that that if Prefetch is false, after a Timeout the credit issued by
+	// Receive remains on the link. It will be used by the next call to Receive.
+	ReceiveTimeout(timeout time.Duration) (ReceivedMessage, error)
+
+	// Prefetch==true means the Receiver will automatically issue credit to the
+	// remote sender to keep its buffer as full as possible, i.e. it will
+	// "pre-fetch" messages independently of the application calling
+	// Receive(). This gives good throughput for applications that handle a
+	// continuous stream of messages. Larger capacity may improve throughput, the
+	// optimal value depends on the characteristics of your application.
+	//
+	// Prefetch==false means the Receiver will issue only issue credit when you
+	// call Receive(), and will only issue enough credit to satisfy the calls
+	// actually made. This gives lower throughput but will not fetch any messages
+	// in advance. It is good for synchronous applications that need to evaluate
+	// each message before deciding whether to receive another. The
+	// request-response pattern is a typical example.  If you make concurrent
+	// calls to Receive with pre-fetch disabled, you can improve performance by
+	// setting the capacity close to the expected number of concurrent calls.
+	//
+	Prefetch() bool
+
+	// Capacity is the size (number of messages) of the local message buffer
+	// These are messages received but not yet returned to the application by a call to Receive()
+	Capacity() int
+
+	// SetCapacity sets Capacity and Prefetch of an accepted Receiver.
+	// May only be called in an accept() function, see Connection.Listen()
+	SetCapacity(capacity int, prefetch bool)
+}
+
+// Flow control policy for a receiver.
+type policy interface {
+	// Called at the start of Receive() to adjust credit before fetching a message.
+	Pre(*receiver)
+	// Called after Receive() has received a message from Buffer() before it returns.
+	// Non-nil error means no message was received because of an error.
+	Post(*receiver, error)
+}
+
+type prefetchPolicy struct{}
+
+func (p prefetchPolicy) Flow(r *receiver) {
+	r.engine().Inject(func() {
+		_, _, max := r.credit()
+		if max > 0 {
+			r.eLink.Flow(max)
+		}
+	})
+}
+func (p prefetchPolicy) Pre(r *receiver) { p.Flow(r) }
+func (p prefetchPolicy) Post(r *receiver, err error) {
+	if err == nil {
+		p.Flow(r)
+	}
+}
+
+type noPrefetchPolicy struct{ waiting int }
+
+func (p noPrefetchPolicy) Flow(r *receiver) { // Not called in proton goroutine
+	r.engine().Inject(func() {
+		len, credit, max := r.credit()
+		add := p.waiting - (len + credit)
+		if add > max {
+			add = max // Don't overflow
+		}
+		if add > 0 {
+			r.eLink.Flow(add)
+		}
+	})
+}
+func (p noPrefetchPolicy) Pre(r *receiver) { p.waiting++; p.Flow(r) }
+func (p noPrefetchPolicy) Post(r *receiver, err error) {
+	p.waiting--
+	if err == nil {
+		p.Flow(r)
+	}
+}
+
+// Receiver implementation
+type receiver struct {
+	link
+	buffer    chan ReceivedMessage
+	policy    policy
+	setupOnce sync.Once
+}
+
+func (r *receiver) SetCapacity(capacity int, prefetch bool) {
+	internal.Assert(r.inAccept, "Receiver.SetCapacity called outside of accept function")
+	r.capacity = capacity
+	r.prefetch = prefetch
+}
+
+func (r *receiver) setup() {
+	if r.capacity < 1 {
+		r.capacity = 1
+	}
+	if r.prefetch {
+		r.policy = &prefetchPolicy{}
+	} else {
+		r.policy = &noPrefetchPolicy{}
+	}
+	r.buffer = make(chan ReceivedMessage, r.capacity)
+}
+
+// call in proton goroutine.
+func (r *receiver) credit() (buffered, credit, max int) {
+	return len(r.buffer), r.eLink.Credit(), cap(r.buffer) - len(r.buffer)
+}
+
+func (r *receiver) Capacity() int  { return cap(r.buffer) }
+func (r *receiver) Prefetch() bool { return r.prefetch }
+
+func (r *receiver) Receive() (rm ReceivedMessage, err error) {
+	return r.ReceiveTimeout(Forever)
+}
+
+func (r *receiver) ReceiveTimeout(timeout time.Duration) (rm ReceivedMessage, err error) {
+	r.setupOnce.Do(r.setup)
+	internal.Assert(r.buffer != nil, "Receiver is not open: %s", r)
+	r.policy.Pre(r)
+	defer func() { r.policy.Post(r, err) }()
+	rmi, ok, timedout := timedReceive(r.buffer, timeout)
+	switch {
+	case timedout:
+		return ReceivedMessage{}, Timeout
+	case !ok:
+		return ReceivedMessage{}, r.Error()
+	default:
+		return rmi.(ReceivedMessage), nil
+	}
+}
+
+// Called in proton goroutine on MMessage event.
+func (r *receiver) message(delivery proton.Delivery) {
+	if r.eLink.State().RemoteClosed() {
+		localClose(r.eLink, r.eLink.RemoteCondition().Error())
+		return
+	}
+	if delivery.HasMessage() {
+		m, err := delivery.Message()
+		if err != nil {
+			localClose(r.eLink, err)
+			return
+		}
+		internal.Assert(m != nil)
+		r.eLink.Advance()
+		if r.eLink.Credit() < 0 {
+			localClose(r.eLink, internal.Errorf("received message in excess of credit limit"))
+		} else {
+			// We never issue more credit than cap(buffer) so this will not block.
+			r.buffer <- ReceivedMessage{m, delivery, r}
+		}
+	}
+}
+
+func (r *receiver) open() {
+	r.setupOnce.Do(r.setup)
+	r.link.open()
+	r.handler().addLink(r.eLink, r)
+}
+
+func (r *receiver) closed(err error) {
+	r.link.closed(err)
+	if r.buffer != nil {
+		close(r.buffer)
+	}
+}
+
+// ReceivedMessage contains an amqp.Message and allows the message to be acknowledged.
+type ReceivedMessage struct {
+	// Message is the received message.
+	Message amqp.Message
+
+	eDelivery proton.Delivery
+	receiver  Receiver
+}
+
+// Acknowledge a ReceivedMessage with the given disposition code.
+func (rm *ReceivedMessage) Acknowledge(disposition Disposition) error {
+	return rm.receiver.(*receiver).engine().InjectWait(func() error {
+		// Settle doesn't return an error but if the receiver is broken the settlement won't happen.
+		rm.eDelivery.SettleAs(uint64(disposition))
+		return rm.receiver.Error()
+	})
+}
+
+// Accept is short for Acknowledge(Accpeted)
+func (rm *ReceivedMessage) Accept() error { return rm.Acknowledge(Accepted) }
+
+// Reject is short for Acknowledge(Rejected)
+func (rm *ReceivedMessage) Reject() error { return rm.Acknowledge(Rejected) }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
new file mode 100644
index 0000000..3124f74
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/sender.go
@@ -0,0 +1,319 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+// #include <proton/disposition.h>
+import "C"
+
+import (
+	"container/list"
+	"qpid.apache.org/internal"
+	"qpid.apache.org/proton"
+	"qpid.apache.org/amqp"
+	"reflect"
+	"time"
+)
+
+// Sender is a Link that sends messages.
+type Sender interface {
+	Link
+
+	// Send a message without waiting for acknowledgement. Returns a SentMessage.
+	// use SentMessage.Disposition() to wait for acknowledgement and get the
+	// disposition code.
+	//
+	// If the send buffer is full, send blocks until there is space in the buffer.
+	Send(m amqp.Message) (sm SentMessage, err error)
+
+	// SendTimeout is like send but only waits up to timeout for buffer space.
+	//
+	// Returns Timeout error if the timeout expires and the message has not been sent.
+	SendTimeout(m amqp.Message, timeout time.Duration) (sm SentMessage, err error)
+
+	// Send a message and forget it, there will be no acknowledgement.
+	// If the send buffer is full, send blocks until there is space in the buffer.
+	SendForget(m amqp.Message) error
+
+	// SendForgetTimeout is like send but only waits up to timeout for buffer space.
+	// Returns Timeout error if the timeout expires and the message has not been sent.
+	SendForgetTimeout(m amqp.Message, timeout time.Duration) error
+
+	// Credit indicates how many messages the receiving end of the link can accept.
+	//
+	// On a Sender credit can be negative, meaning that messages in excess of the
+	// receiver's credit limit have been buffered locally till credit is available.
+	Credit() (int, error)
+}
+
+type sendMessage struct {
+	m  amqp.Message
+	sm SentMessage
+}
+
+type sender struct {
+	link
+	blocked list.List // Channel of sendMessage for blocked senders.
+}
+
+// Disposition indicates the outcome of a settled message delivery.
+type Disposition uint64
+
+const (
+	// No disposition available: pre-settled, not yet acknowledged or an error occurred
+	NoDisposition Disposition = 0
+	// Message was accepted by the receiver
+	Accepted = proton.Accepted
+	// Message was rejected as invalid by the receiver
+	Rejected = proton.Rejected
+	// Message was not processed by the receiver but may be processed by some other receiver.
+	Released = proton.Released
+)
+
+// String human readable name for a Disposition.
+func (d Disposition) String() string {
+	switch d {
+	case NoDisposition:
+		return "no-disposition"
+	case Accepted:
+		return "accepted"
+	case Rejected:
+		return "rejected"
+	case Released:
+		return "released"
+	default:
+		return "unknown"
+	}
+}
+
+// Send a message, assumes there is credit
+func (s *sender) doSend(snd sendMessage) {
+	delivery, err := s.eLink.Send(snd.m)
+	switch sm := snd.sm.(type) {
+	case nil:
+		delivery.Settle()
+	case *sentMessage:
+		sm.delivery = delivery
+		if err != nil {
+			sm.settled(err)
+		} else {
+			s.handler().sentMessages[delivery] = sm
+		}
+	default:
+		internal.Assert(false, "bad SentMessage type %T", snd.sm)
+	}
+}
+
+func (s *sender) popBlocked() chan sendMessage {
+	if s.blocked.Len() > 0 {
+		return s.blocked.Remove(s.blocked.Front()).(chan sendMessage)
+	}
+	return nil
+}
+
+func (s *sender) Send(m amqp.Message) (SentMessage, error) {
+	return s.SendTimeout(m, Forever)
+}
+
+func (s *sender) SendTimeout(m amqp.Message, timeout time.Duration) (SentMessage, error) {
+	var sm SentMessage
+	if s.sndSettle == SndSettled {
+		sm = nil
+	} else {
+		sm = newSentMessage(s.session.connection)
+	}
+	return s.sendInternal(sendMessage{m, sm}, timeout)
+}
+
+func (s *sender) SendForget(m amqp.Message) error {
+	return s.SendForgetTimeout(m, Forever)
+}
+
+func (s *sender) SendForgetTimeout(m amqp.Message, timeout time.Duration) error {
+	snd := sendMessage{m, nil}
+	_, err := s.sendInternal(snd, timeout)
+	return err
+}
+
+func (s *sender) sendInternal(snd sendMessage, timeout time.Duration) (SentMessage, error) {
+	if s.Error() != nil {
+		return nil, s.Error()
+	}
+	var err error
+	if timeout == 0 {
+		err = s.engine().InjectWait(func() error {
+			if s.eLink.Credit() > 0 {
+				s.doSend(snd)
+				return nil
+			}
+			return Timeout
+		})
+	} else {
+		buf := make(chan sendMessage)
+		done := make(chan struct{})
+		defer close(buf)
+		s.engine().Inject(func() { // Runs concurrently
+			if s.eLink.Credit() > 0 {
+				s.doSend(snd)
+				close(done) // Signal already sent
+			} else {
+				s.blocked.PushBack(buf)
+			}
+		})
+		select {
+		case <-done: // Sent without blocking
+		case buf <- snd: // Sent via blocking channel
+		case <-s.done:
+			err = s.Error()
+		case <-After(timeout):
+			err = Timeout
+		}
+	}
+	if err != nil {
+		return nil, err
+	}
+	return snd.sm, nil
+}
+
+func (s *sender) closed(err error) {
+	s.link.closed(err)
+}
+
+func (s *sender) open() {
+	s.link.open()
+	s.handler().addLink(s.eLink, s)
+}
+
+// SentMessage represents a previously sent message. It allows you to wait for acknowledgement.
+type SentMessage interface {
+
+	// Disposition blocks till the message is acknowledged and returns the
+	// disposition state.
+	//
+	// NoDisposition with Error() != nil means the Connection was closed before
+	// the message was acknowledged.
+	//
+	// NoDisposition with Error() == nil means the message was pre-settled or
+	// Forget() was called.
+	Disposition() (Disposition, error)
+
+	// DispositionTimeout is like Disposition but gives up after timeout, see Timeout.
+	DispositionTimeout(time.Duration) (Disposition, error)
+
+	// Forget interrupts any call to Disposition on this SentMessage and tells the
+	// peer we are no longer interested in the disposition of this message.
+	Forget()
+
+	// Error returns the error that closed the disposition, or nil if there was no error.
+	// If the disposition closed because the connection closed, it will return Closed.
+	Error() error
+
+	// Value is an optional value you wish to associate with the SentMessage. It
+	// can be the message itself or some form of identifier.
+	Value() interface{}
+	SetValue(interface{})
+}
+
+// SentMessageSet is a concurrent-safe set of sent messages that can be checked
+// to get the next completed sent message
+type SentMessageSet struct {
+	cases []reflect.SelectCase
+	sm    []SentMessage
+	done  chan SentMessage
+}
+
+func (s *SentMessageSet) Add(sm SentMessage) {
+	s.sm = append(s.sm, sm)
+	s.cases = append(s.cases, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sm.(*sentMessage).done)})
+}
+
+// Wait waits up to timeout and returns the next SentMessage that has a valid dispositionb
+// or an error.
+func (s *SentMessageSet) Wait(sm SentMessage, timeout time.Duration) (SentMessage, error) {
+	s.cases = s.cases[:len(s.sm)] // Remove previous timeout cases
+	if timeout == 0 {             // Non-blocking
+		s.cases = append(s.cases, reflect.SelectCase{Dir: reflect.SelectDefault})
+	} else {
+		s.cases = append(s.cases,
+			reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(After(timeout))})
+	}
+	chosen, _, _ := reflect.Select(s.cases)
+	if chosen > len(s.sm) {
+		return nil, Timeout
+	} else {
+		sm := s.sm[chosen]
+		s.sm = append(s.sm[:chosen], s.sm[chosen+1:]...)
+		return sm, nil
+	}
+}
+
+// SentMessage implementation
+type sentMessage struct {
+	connection  *connection
+	done        chan struct{}
+	delivery    proton.Delivery
+	disposition Disposition
+	err         error
+	value       interface{}
+}
+
+func newSentMessage(c *connection) *sentMessage {
+	return &sentMessage{connection: c, done: make(chan struct{})}
+}
+
+func (sm *sentMessage) SetValue(v interface{}) { sm.value = v }
+func (sm *sentMessage) Value() interface{}     { return sm.value }
+func (sm *sentMessage) Disposition() (Disposition, error) {
+	<-sm.done
+	return sm.disposition, sm.err
+}
+
+func (sm *sentMessage) DispositionTimeout(timeout time.Duration) (Disposition, error) {
+	if _, _, timedout := timedReceive(sm.done, timeout); timedout {
+		return sm.disposition, Timeout
+	} else {
+		return sm.disposition, sm.err
+	}
+}
+
+func (sm *sentMessage) Forget() {
+	sm.connection.engine.Inject(func() {
+		sm.delivery.Settle()
+		delete(sm.connection.handler.sentMessages, sm.delivery)
+	})
+	sm.finish()
+}
+
+func (sm *sentMessage) settled(err error) {
+	if sm.delivery.Settled() {
+		sm.disposition = Disposition(sm.delivery.Remote().Type())
+	}
+	sm.err = err
+	sm.finish()
+}
+
+func (sm *sentMessage) finish() {
+	select {
+	case <-sm.done: // No-op if already closed
+	default:
+		close(sm.done)
+	}
+}
+
+func (sm *sentMessage) Error() error { return sm.err }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/session.go b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
new file mode 100644
index 0000000..612658a
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/session.go
@@ -0,0 +1,98 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
+	"qpid.apache.org/proton"
+)
+
+// Session is an AMQP session, it contains Senders and Receivers.
+//
+type Session interface {
+	Endpoint
+
+	// Sender opens a new sender. v can be a string, which is used as the Target
+	// address, or a SenderSettings struct containing more details settings.
+	Sender(...LinkSetting) (Sender, error)
+
+	// Receiver opens a new Receiver. v can be a string, which is used as the
+	// Source address, or a ReceiverSettings struct containing more details
+	// settings.
+	Receiver(...LinkSetting) (Receiver, error)
+
+	// SetCapacity sets the session buffer capacity in bytes.
+	// Only has effect if called in an accept() function, see Connection.Listen()
+	SetCapacity(bytes uint)
+}
+
+type session struct {
+	endpoint
+	eSession   proton.Session
+	connection *connection
+	capacity   uint
+}
+
+// in proton goroutine
+func newSession(c *connection, es proton.Session) *session {
+	return &session{
+		connection: c,
+		eSession:   es,
+		endpoint:   endpoint{str: es.String()},
+	}
+}
+
+func (s *session) Connection() Connection     { return s.connection }
+func (s *session) eEndpoint() proton.Endpoint { return s.eSession }
+func (s *session) engine() *proton.Engine     { return s.connection.engine }
+func (s *session) Close(err error) {
+	s.engine().Inject(func() { localClose(s.eSession, err) })
+}
+
+func (s *session) SetCapacity(bytes uint) { s.capacity = bytes }
+
+func (s *session) Sender(setting ...LinkSetting) (snd Sender, err error) {
+	err = s.engine().InjectWait(func() error {
+		l, err := localLink(s, true, setting...)
+		if err == nil {
+			snd = &sender{link: *l}
+			snd.(*sender).open()
+		}
+		return err
+	})
+	return
+}
+
+func (s *session) Receiver(setting ...LinkSetting) (rcv Receiver, err error) {
+	err = s.engine().InjectWait(func() error {
+		l, err := localLink(s, false, setting...)
+		if err == nil {
+			rcv = &receiver{link: *l}
+			rcv.(*receiver).open()
+		}
+		return err
+	})
+	return
+}
+
+// Called from handler on closed.
+func (s *session) closed(err error) {
+	s.err.Set(err)
+	s.err.Set(Closed)
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/electron/time.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/time.go b/proton-c/bindings/go/src/qpid.apache.org/electron/time.go
new file mode 100644
index 0000000..ee61332
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/time.go
@@ -0,0 +1,81 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package electron
+
+import (
+	"qpid.apache.org/internal"
+	"reflect"
+	"time"
+)
+
+// Timeout is the error returned if an operation does not complete on time.
+//
+// Methods named *Timeout in this package take time.Duration timeout parameter.
+//
+// If timeout > 0 and there is no result available before the timeout, they
+// return a zero or nil value and Timeout as an error.
+//
+// If timeout == 0 they will return a result if one is immediatley available or
+// nil/zero and Timeout as an error if not.
+//
+// If timeout == Forever the function will return only when there is a result or
+// some non-timeout error occurs.
+//
+var Timeout = internal.Errorf("timeout")
+
+// Forever can be used as a timeout parameter to indicate wait forever.
+const Forever time.Duration = -1
+
+// timedReceive receives on channel (which can be a chan of any type), waiting
+// up to timeout.
+//
+// timeout==0 means do a non-blocking receive attempt. timeout < 0 means block
+// forever. Other values mean block up to the timeout.
+//
+func timedReceive(channel interface{}, timeout time.Duration) (value interface{}, ok bool, timedout bool) {
+	cases := []reflect.SelectCase{
+		reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(channel)},
+	}
+	switch {
+	case timeout == 0: // Non-blocking receive
+		cases = append(cases, reflect.SelectCase{Dir: reflect.SelectDefault})
+	case timeout == Forever: // Block forever, nothing to add
+	default: // Block up to timeout
+		cases = append(cases,
+			reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(time.After(timeout))})
+	}
+	chosen, recv, recvOk := reflect.Select(cases)
+	switch {
+	case chosen == 0:
+		return recv.Interface(), recvOk, false
+	default:
+		return nil, false, true
+	}
+}
+
+// After is like time.After but returns a nil channel if timeout == Forever
+// since selecting on a nil channel will never return.
+func After(timeout time.Duration) <-chan time.Time {
+	if timeout == Forever {
+		return nil
+	} else {
+		return time.After(timeout)
+	}
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/internal/error.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/internal/error.go b/proton-c/bindings/go/src/qpid.apache.org/internal/error.go
new file mode 100644
index 0000000..1b108e6
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/internal/error.go
@@ -0,0 +1,118 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+// Internal implementation details - ignore.
+package internal
+
+// #cgo LDFLAGS: -lqpid-proton
+// #include <proton/error.h>
+// #include <proton/codec.h>
+import "C"
+
+import (
+	"fmt"
+	"sync"
+	"sync/atomic"
+	"unsafe"
+)
+
+// Error type for proton runtime errors returned as error values.
+type Error string
+
+// Error prefixes error message with proton:
+func (e Error) Error() string {
+	return "proton: " + string(e)
+}
+
+// Errorf creates an Error with a formatted message
+func Errorf(format string, a ...interface{}) Error {
+	return Error(fmt.Sprintf(format, a...))
+}
+
+type PnErrorCode int
+
+func (e PnErrorCode) String() string {
+	switch e {
+	case C.PN_EOS:
+		return "end-of-data"
+	case C.PN_ERR:
+		return "error"
+	case C.PN_OVERFLOW:
+		return "overflow"
+	case C.PN_UNDERFLOW:
+		return "underflow"
+	case C.PN_STATE_ERR:
+		return "bad-state"
+	case C.PN_ARG_ERR:
+		return "invalid-argument"
+	case C.PN_TIMEOUT:
+		return "timeout"
+	case C.PN_INTR:
+		return "interrupted"
+	case C.PN_INPROGRESS:
+		return "in-progress"
+	default:
+		return fmt.Sprintf("unknown-error(%d)", e)
+	}
+}
+
+func PnError(p unsafe.Pointer) error {
+	e := (*C.pn_error_t)(p)
+	if e == nil || C.pn_error_code(e) == 0 {
+		return nil
+	}
+	return Errorf("%s: %s", PnErrorCode(C.pn_error_code(e)), C.GoString(C.pn_error_text(e)))
+}
+
+// panicIf panics if condition is true, the panic value is Errorf(fmt, args...)
+func panicIf(condition bool, fmt string, args ...interface{}) {
+	if condition {
+		panic(Errorf(fmt, args...))
+	}
+}
+
+// ErrorHolder is a goroutine-safe error holder that keeps the first error that is set.
+type ErrorHolder struct {
+	once  sync.Once
+	value atomic.Value
+}
+
+// Set the error if not already set, return the error in the Holder.
+func (e *ErrorHolder) Set(err error) {
+	if err != nil {
+		e.once.Do(func() { e.value.Store(err) })
+	}
+}
+
+// Get the error.
+func (e *ErrorHolder) Get() (err error) {
+	err, _ = e.value.Load().(error)
+	return
+}
+
+// Assert panics if condition is false with optional formatted message
+func Assert(condition bool, format ...interface{}) {
+	if !condition {
+		if len(format) > 0 {
+			panic(Errorf(format[0].(string), format[1:]...))
+		} else {
+			panic(Errorf("assertion failed"))
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/internal/flexchannel.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/internal/flexchannel.go b/proton-c/bindings/go/src/qpid.apache.org/internal/flexchannel.go
new file mode 100644
index 0000000..77b524c
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/internal/flexchannel.go
@@ -0,0 +1,82 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package internal
+
+// FlexChannel acts like a channel with an automatically sized buffer, see NewFlexChannel().
+type FlexChannel struct {
+	// In channel to send to. close(In) will close the FlexChannel once buffer has drained.
+	In chan<- interface{}
+	// Out channel to receive from. Out closes when In has closed and the buffer is empty.
+	Out <-chan interface{}
+
+	in, out chan interface{}
+	buffer  []interface{}
+	limit   int
+}
+
+// NewFlexChannel creates a FlexChannel, a channel with an automatically-sized buffer.
+//
+// Initially the buffer size is 0, the buffer grows as needed up to limit. limit < 0 means
+// there is no limit.
+//
+func NewFlexChannel(limit int) *FlexChannel {
+	fc := &FlexChannel{
+		in:     make(chan interface{}),
+		out:    make(chan interface{}),
+		buffer: make([]interface{}, 0),
+		limit:  limit,
+	}
+	fc.In = fc.in
+	fc.Out = fc.out
+	go fc.run()
+	return fc
+}
+
+func (fc *FlexChannel) run() {
+	defer func() { // Flush the channel on exit
+		for _, data := range fc.buffer {
+			fc.out <- data
+		}
+		close(fc.out)
+	}()
+
+	for {
+		var usein, useout chan interface{}
+		var outvalue interface{}
+		if len(fc.buffer) > 0 {
+			useout = fc.out
+			outvalue = fc.buffer[0]
+		}
+		if len(fc.buffer) < fc.limit || fc.limit < 0 {
+			usein = fc.in
+		}
+		Assert(usein != nil || useout != nil)
+		select {
+		case useout <- outvalue:
+			fc.buffer = fc.buffer[1:]
+		case data, ok := <-usein:
+			if ok {
+				fc.buffer = append(fc.buffer, data)
+			} else {
+				return
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/internal/flexchannel_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/internal/flexchannel_test.go b/proton-c/bindings/go/src/qpid.apache.org/internal/flexchannel_test.go
new file mode 100644
index 0000000..d0e1a44
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/internal/flexchannel_test.go
@@ -0,0 +1,89 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package internal
+
+import (
+	"testing"
+)
+
+func recvall(ch <-chan interface{}) (result []interface{}) {
+	for {
+		select {
+		case x := <-ch:
+			result = append(result, x)
+		default:
+			return
+		}
+	}
+}
+
+func sendall(data []interface{}, ch chan<- interface{}) {
+}
+
+func TestFlex(t *testing.T) {
+	fc := NewFlexChannel(5)
+
+	// Test send/receve
+	go func() {
+		for i := 0; i < 4; i++ {
+			fc.In <- i
+		}
+	}()
+
+	for i := 0; i < 4; i++ {
+		j := <-fc.Out
+		if i != j {
+			t.Error("%v != %v", i, j)
+		}
+	}
+	select {
+	case x, ok := <-fc.Out:
+		t.Error("receive empty channel got", x, ok)
+	default:
+	}
+
+	// Test buffer limit
+	for i := 10; i < 15; i++ {
+		fc.In <- i
+	}
+	select {
+	case fc.In <- 0:
+		t.Error("send to full channel did not block")
+	default:
+	}
+	i := <-fc.Out
+	if i != 10 {
+		t.Error("%v != %v", i, 10)
+	}
+	fc.In <- 15
+	close(fc.In)
+
+	for i := 11; i < 16; i++ {
+		j := <-fc.Out
+		if i != j {
+			t.Error("%v != %v", i, j)
+		}
+	}
+
+	x, ok := <-fc.Out
+	if ok {
+		t.Error("Unexpected value on Out", x)
+	}
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/internal/safemap.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/internal/safemap.go b/proton-c/bindings/go/src/qpid.apache.org/internal/safemap.go
new file mode 100644
index 0000000..3a1fe2b
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/internal/safemap.go
@@ -0,0 +1,57 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package internal
+
+import (
+	"sync"
+)
+
+// SafeMap is a goroutine-safe map of interface{} to interface{}.
+type SafeMap struct {
+	m    map[interface{}]interface{}
+	lock sync.Mutex
+}
+
+func MakeSafeMap() SafeMap { return SafeMap{m: make(map[interface{}]interface{})} }
+
+func (m *SafeMap) Get(key interface{}) interface{} {
+	m.lock.Lock()
+	defer m.lock.Unlock()
+	return m.m[key]
+}
+
+func (m *SafeMap) GetOk(key interface{}) (interface{}, bool) {
+	m.lock.Lock()
+	defer m.lock.Unlock()
+	v, ok := m.m[key]
+	return v, ok
+}
+
+func (m *SafeMap) Put(key, value interface{}) {
+	m.lock.Lock()
+	defer m.lock.Unlock()
+	m.m[key] = value
+}
+
+func (m *SafeMap) Delete(key interface{}) {
+	m.lock.Lock()
+	defer m.lock.Unlock()
+	delete(m.m, key)
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/internal/uuid.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/internal/uuid.go b/proton-c/bindings/go/src/qpid.apache.org/internal/uuid.go
new file mode 100644
index 0000000..ef941a1
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/internal/uuid.go
@@ -0,0 +1,70 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+package internal
+
+import (
+	"fmt"
+	"math/rand"
+	"strconv"
+	"sync"
+	"sync/atomic"
+	"time"
+)
+
+type UUID [16]byte
+
+func (u UUID) String() string {
+	return fmt.Sprintf("%X-%X-%X-%X-%X", u[0:4], u[4:6], u[6:8], u[8:10], u[10:])
+}
+
+// Don't mess with the default random source.
+var randomSource = rand.NewSource(time.Now().UnixNano())
+var randomLock sync.Mutex
+
+func random() byte {
+	randomLock.Lock()
+	defer randomLock.Unlock()
+	return byte(randomSource.Int63())
+}
+
+func UUID4() UUID {
+	var u UUID
+	for i := 0; i < len(u); i++ {
+		u[i] = random()
+	}
+	// See /https://tools.ietf.org/html/rfc4122#section-4.4
+	u[6] = (u[6] & 0x0F) | 0x40 // Version bits to 4
+	u[8] = (u[8] & 0x3F) | 0x80 // Reserved bits (top two) set to 01
+	return u
+}
+
+// A simple atomic counter to generate unique 64 bit IDs.
+type IdCounter struct{ count uint64 }
+
+// NextInt gets the next uint64 value from the atomic counter.
+func (uc *IdCounter) NextInt() uint64 {
+	return atomic.AddUint64(&uc.count, 1)
+}
+
+// Next gets the next integer value encoded as a base32 string, safe for NUL
+// terminated C strings.
+func (uc *IdCounter) Next() string {
+	return strconv.FormatUint(uc.NextInt(), 32)
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/proton/README.md
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/README.md b/proton-c/bindings/go/src/qpid.apache.org/proton/README.md
deleted file mode 100644
index ad57b47..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/README.md
+++ /dev/null
@@ -1,12 +0,0 @@
-# Go binding for proton
-
-This is a a [Go](http://golang.org) binding for proton.
-Package documentation is available at: <http://godoc.org/qpid.apache.org/proton>
-
-See the [examples](https://github.com/apache/qpid-proton/blob/master/examples/cpp/README.md)
-for working examples and practical instructions on how to get started.
-
-Feedback is encouraged at:
-
-- Email <pr...@qpid.apache.org>
-- Create issues <https://issues.apache.org/jira/browse/PROTON>, attach patches to an issue.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/doc.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/doc.go b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/doc.go
deleted file mode 100644
index cc2cd0e..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/doc.go
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-/*
-Package amqp encodes and decodes AMQP messages and data as Go types.
-
-It follows the standard 'encoding' libraries pattern. The mapping between AMQP
-and Go types is described in the documentation of the Marshal and Unmarshal
-functions.
-
-AMQP is an open standard for inter-operable message exchange, see <http://www.amqp.org/>
-*/
-package amqp
-
-// #cgo LDFLAGS: -lqpid-proton
-import "C"
-
-// This file is just for the package comment.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/error.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/error.go b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/error.go
deleted file mode 100644
index 868dbf3..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/error.go
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package amqp
-
-import (
-	"fmt"
-	"reflect"
-)
-
-// Error is an AMQP error condition. It has a name and a description.
-// It implements the Go error interface so can be returned as an error value.
-//
-// You can pass amqp.Error to methods that pass an error to a remote endpoint,
-// this gives you full control over what the remote endpoint will see.
-//
-// You can also pass any Go error to such functions, the remote peer
-// will see the equivalent of MakeError(error)
-//
-type Error struct{ Name, Description string }
-
-// Error implements the Go error interface for AMQP error errors.
-func (c Error) Error() string { return fmt.Sprintf("proton %s: %s", c.Name, c.Description) }
-
-// Errorf makes a Error with name and formatted description as per fmt.Sprintf
-func Errorf(name, format string, arg ...interface{}) Error {
-	return Error{name, fmt.Sprintf(format, arg...)}
-}
-
-// MakeError makes an AMQP error from a go error using the Go error type as the name
-// and the err.Error() string as the description.
-func MakeError(err error) Error {
-	return Error{reflect.TypeOf(err).Name(), err.Error()}
-}
-
-var (
-	InternalError      = "amqp:internal-error"
-	NotFound           = "amqp:not-found"
-	UnauthorizedAccess = "amqp:unauthorized-access"
-	DecodeError        = "amqp:decode-error"
-	ResourceLimit      = "amqp:resource-limit"
-	NotAllowed         = "amqp:not-allowed"
-	InvalidField       = "amqp:invalid-field"
-	NotImplemented     = "amqp:not-implemented"
-	ResourceLocked     = "amqp:resource-locked"
-	PreerrorFailed     = "amqp:preerror-failed"
-	ResourceDeleted    = "amqp:resource-deleted"
-	IllegalState       = "amqp:illegal-state"
-	FrameSizeTooSmall  = "amqp:frame-size-too-small"
-)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/interop
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/interop b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/interop
deleted file mode 120000
index b2dd603..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/interop
+++ /dev/null
@@ -1 +0,0 @@
-../../../../../../../tests/interop
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/interop_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/interop_test.go b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/interop_test.go
deleted file mode 100644
index b36ef64..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/interop_test.go
+++ /dev/null
@@ -1,381 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-// Test that conversion of Go type to/from AMQP is compatible with other
-// bindings.
-//
-package amqp
-
-import (
-	"bytes"
-	"fmt"
-	"io"
-	"io/ioutil"
-	"os"
-	"reflect"
-	"strings"
-	"testing"
-)
-
-func checkEqual(want interface{}, got interface{}) error {
-	if !reflect.DeepEqual(want, got) {
-		return fmt.Errorf("%#v != %#v", want, got)
-	}
-	return nil
-}
-
-func getReader(name string) (r io.Reader) {
-	r, err := os.Open("interop/" + name + ".amqp")
-	if err != nil {
-		panic(fmt.Errorf("Can't open %#v: %v", name, err))
-	}
-	return
-}
-
-func remaining(d *Decoder) string {
-	remainder, _ := ioutil.ReadAll(io.MultiReader(d.Buffered(), d.reader))
-	return string(remainder)
-}
-
-// checkDecode: want is the expected value, gotPtr is a pointer to a
-// instance of the same type for Decode.
-func checkDecode(d *Decoder, want interface{}, gotPtr interface{}, t *testing.T) {
-
-	if err := d.Decode(gotPtr); err != nil {
-		t.Error("Decode failed", err)
-		return
-	}
-	got := reflect.ValueOf(gotPtr).Elem().Interface()
-	if err := checkEqual(want, got); err != nil {
-		t.Error("Decode bad value:", err)
-		return
-	}
-
-	// Try round trip encoding
-	bytes, err := Marshal(want, nil)
-	if err != nil {
-		t.Error("Marshal failed", err)
-		return
-	}
-	n, err := Unmarshal(bytes, gotPtr)
-	if err != nil {
-		t.Error("Unmarshal failed", err)
-		return
-	}
-	if err := checkEqual(n, len(bytes)); err != nil {
-		t.Error("Bad unmarshal length", err)
-		return
-	}
-	got = reflect.ValueOf(gotPtr).Elem().Interface()
-	if err = checkEqual(want, got); err != nil {
-		t.Error("Bad unmarshal value", err)
-		return
-	}
-}
-
-func TestUnmarshal(t *testing.T) {
-	bytes, err := ioutil.ReadAll(getReader("strings"))
-	if err != nil {
-		t.Error(err)
-	}
-	for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", "", ""} {
-		var got string
-		n, err := Unmarshal(bytes, &got)
-		if err != nil {
-			t.Error(err)
-		}
-		if want != got {
-			t.Errorf("%#v != %#v", want, got)
-		}
-		bytes = bytes[n:]
-	}
-}
-
-func TestPrimitivesExact(t *testing.T) {
-	d := NewDecoder(getReader("primitives"))
-	// Decoding into exact types
-	var b bool
-	checkDecode(d, true, &b, t)
-	checkDecode(d, false, &b, t)
-	var u8 uint8
-	checkDecode(d, uint8(42), &u8, t)
-	var u16 uint16
-	checkDecode(d, uint16(42), &u16, t)
-	var i16 int16
-	checkDecode(d, int16(-42), &i16, t)
-	var u32 uint32
-	checkDecode(d, uint32(12345), &u32, t)
-	var i32 int32
-	checkDecode(d, int32(-12345), &i32, t)
-	var u64 uint64
-	checkDecode(d, uint64(12345), &u64, t)
-	var i64 int64
-	checkDecode(d, int64(-12345), &i64, t)
-	var f32 float32
-	checkDecode(d, float32(0.125), &f32, t)
-	var f64 float64
-	checkDecode(d, float64(0.125), &f64, t)
-}
-
-func TestPrimitivesCompatible(t *testing.T) {
-	d := NewDecoder(getReader("primitives"))
-	// Decoding into compatible types
-	var b bool
-	var i int
-	var u uint
-	var f float64
-	checkDecode(d, true, &b, t)
-	checkDecode(d, false, &b, t)
-	checkDecode(d, uint(42), &u, t)
-	checkDecode(d, uint(42), &u, t)
-	checkDecode(d, -42, &i, t)
-	checkDecode(d, uint(12345), &u, t)
-	checkDecode(d, -12345, &i, t)
-	checkDecode(d, uint(12345), &u, t)
-	checkDecode(d, -12345, &i, t)
-	checkDecode(d, 0.125, &f, t)
-	checkDecode(d, 0.125, &f, t)
-}
-
-// checkDecodeValue: want is the expected value, decode into a reflect.Value
-func checkDecodeInterface(d *Decoder, want interface{}, t *testing.T) {
-
-	var got, got2 interface{}
-	if err := d.Decode(&got); err != nil {
-		t.Error("Decode failed", err)
-		return
-	}
-	if err := checkEqual(want, got); err != nil {
-		t.Error(err)
-		return
-	}
-	// Try round trip encoding
-	bytes, err := Marshal(got, nil)
-	if err != nil {
-		t.Error(err)
-		return
-	}
-	n, err := Unmarshal(bytes, &got2)
-	if err != nil {
-		t.Error(err)
-		return
-	}
-	if err := checkEqual(n, len(bytes)); err != nil {
-		t.Error(err)
-		return
-	}
-	if err := checkEqual(want, got2); err != nil {
-		t.Error(err)
-		return
-	}
-}
-
-func TestPrimitivesInterface(t *testing.T) {
-	d := NewDecoder(getReader("primitives"))
-	checkDecodeInterface(d, true, t)
-	checkDecodeInterface(d, false, t)
-	checkDecodeInterface(d, uint8(42), t)
-	checkDecodeInterface(d, uint16(42), t)
-	checkDecodeInterface(d, int16(-42), t)
-	checkDecodeInterface(d, uint32(12345), t)
-	checkDecodeInterface(d, int32(-12345), t)
-	checkDecodeInterface(d, uint64(12345), t)
-	checkDecodeInterface(d, int64(-12345), t)
-	checkDecodeInterface(d, float32(0.125), t)
-	checkDecodeInterface(d, float64(0.125), t)
-}
-
-func TestStrings(t *testing.T) {
-	d := NewDecoder(getReader("strings"))
-	// Test decoding as plain Go strings
-	for _, want := range []string{"abc\000defg", "abcdefg", "abcdefg", "", "", ""} {
-		var got string
-		checkDecode(d, want, &got, t)
-	}
-	remains := remaining(d)
-	if remains != "" {
-		t.Errorf("leftover: %s", remains)
-	}
-
-	// Test decoding as specific string types
-	d = NewDecoder(getReader("strings"))
-	var bytes []byte
-	var str, sym string
-	checkDecode(d, []byte("abc\000defg"), &bytes, t)
-	checkDecode(d, "abcdefg", &str, t)
-	checkDecode(d, "abcdefg", &sym, t)
-	checkDecode(d, make([]byte, 0), &bytes, t)
-	checkDecode(d, "", &str, t)
-	checkDecode(d, "", &sym, t)
-	remains = remaining(d)
-	if remains != "" {
-		t.Fatalf("leftover: %s", remains)
-	}
-
-	// Test some error handling
-	d = NewDecoder(getReader("strings"))
-	var s string
-	err := d.Decode(s)
-	if err == nil {
-		t.Fatal("Expected error")
-	}
-	if !strings.Contains(err.Error(), "not a pointer") {
-		t.Error(err)
-	}
-	var i int
-	err = d.Decode(&i)
-	if !strings.Contains(err.Error(), "cannot unmarshal") {
-		t.Error(err)
-	}
-	_, err = Unmarshal([]byte{}, nil)
-	if !strings.Contains(err.Error(), "not enough data") {
-		t.Error(err)
-	}
-	_, err = Unmarshal([]byte("foobar"), nil)
-	if !strings.Contains(err.Error(), "invalid-argument") {
-		t.Error(err)
-	}
-}
-
-func TestEncodeDecode(t *testing.T) {
-	type data struct {
-		s  string
-		i  int
-		u8 uint8
-		b  bool
-		f  float32
-		v  interface{}
-	}
-
-	in := data{"foo", 42, 9, true, 1.234, "thing"}
-
-	buf := bytes.Buffer{}
-	e := NewEncoder(&buf)
-	if err := e.Encode(in.s); err != nil {
-		t.Error(err)
-	}
-	if err := e.Encode(in.i); err != nil {
-		t.Error(err)
-	}
-	if err := e.Encode(in.u8); err != nil {
-		t.Error(err)
-	}
-	if err := e.Encode(in.b); err != nil {
-		t.Error(err)
-	}
-	if err := e.Encode(in.f); err != nil {
-		t.Error(err)
-	}
-	if err := e.Encode(in.v); err != nil {
-		t.Error(err)
-	}
-
-	var out data
-	d := NewDecoder(&buf)
-	if err := d.Decode(&out.s); err != nil {
-		t.Error(err)
-	}
-	if err := d.Decode(&out.i); err != nil {
-		t.Error(err)
-	}
-	if err := d.Decode(&out.u8); err != nil {
-		t.Error(err)
-	}
-	if err := d.Decode(&out.b); err != nil {
-		t.Error(err)
-	}
-	if err := d.Decode(&out.f); err != nil {
-		t.Error(err)
-	}
-	if err := d.Decode(&out.v); err != nil {
-		t.Error(err)
-	}
-
-	if err := checkEqual(in, out); err != nil {
-		t.Error(err)
-	}
-}
-
-func TestMap(t *testing.T) {
-	d := NewDecoder(getReader("maps"))
-
-	// Generic map
-	var m Map
-	checkDecode(d, Map{"one": int32(1), "two": int32(2), "three": int32(3)}, &m, t)
-
-	// Interface as map
-	var i interface{}
-	checkDecode(d, Map{int32(1): "one", int32(2): "two", int32(3): "three"}, &i, t)
-
-	d = NewDecoder(getReader("maps"))
-	// Specific typed map
-	var m2 map[string]int
-	checkDecode(d, map[string]int{"one": 1, "two": 2, "three": 3}, &m2, t)
-
-	// Nested map
-	m = Map{int64(1): "one", "two": int32(2), true: Map{uint8(1): true, uint8(2): false}}
-	bytes, err := Marshal(m, nil)
-	if err != nil {
-		t.Fatal(err)
-	}
-	_, err = Unmarshal(bytes, &i)
-	if err != nil {
-		t.Fatal(err)
-	}
-	if err = checkEqual(m, i); err != nil {
-		t.Fatal(err)
-	}
-}
-
-func TestList(t *testing.T) {
-	d := NewDecoder(getReader("lists"))
-	var l List
-	checkDecode(d, List{int32(32), "foo", true}, &l, t)
-	checkDecode(d, List{}, &l, t)
-}
-
-// TODO aconway 2015-09-08: the message.amqp file seems to be incorrectly coded as
-// as an AMQP string *inside* an AMQP binary?? Skip the test for now.
-func TODO_TestMessage(t *testing.T) {
-	bytes, err := ioutil.ReadAll(getReader("message"))
-	if err != nil {
-		t.Fatal(err)
-	}
-
-	m, err := DecodeMessage(bytes)
-	if err != nil {
-		t.Fatal(err)
-	} else {
-		if err := checkEqual(m.Body(), "hello"); err != nil {
-			t.Error(err)
-		}
-	}
-
-	m2 := NewMessageWith("hello")
-	bytes2, err := m2.Encode(nil)
-	if err != nil {
-		t.Error(err)
-	} else {
-		if err = checkEqual(bytes, bytes2); err != nil {
-			t.Error(err)
-		}
-	}
-}
-
-// TODO aconway 2015-03-13: finish the full interop test

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/marshal.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/marshal.go b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/marshal.go
deleted file mode 100644
index e393c97..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/marshal.go
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package amqp
-
-// #include <proton/codec.h>
-import "C"
-
-import (
-	"io"
-	"qpid.apache.org/proton/internal"
-	"reflect"
-	"unsafe"
-)
-
-func dataError(prefix string, data *C.pn_data_t) error {
-	err := internal.PnError(unsafe.Pointer(C.pn_data_error(data)))
-	if err != nil {
-		err = internal.Errorf("%s: %s", prefix, err.(internal.Error))
-	}
-	return err
-}
-
-/*
-Marshal encodes a Go value as AMQP data in buffer.
-If buffer is nil, or is not large enough, a new buffer  is created.
-
-Returns the buffer used for encoding with len() adjusted to the actual size of data.
-
-Go types are encoded as follows
-
- +-------------------------------------+--------------------------------------------+
- |Go type                              |AMQP type                                   |
- +-------------------------------------+--------------------------------------------+
- |bool                                 |bool                                        |
- +-------------------------------------+--------------------------------------------+
- |int8, int16, int32, int64 (int)      |byte, short, int, long (int or long)        |
- +-------------------------------------+--------------------------------------------+
- |uint8, uint16, uint32, uint64 (uint) |ubyte, ushort, uint, ulong (uint or ulong)  |
- +-------------------------------------+--------------------------------------------+
- |float32, float64                     |float, double.                              |
- +-------------------------------------+--------------------------------------------+
- |string                               |string                                      |
- +-------------------------------------+--------------------------------------------+
- |[]byte, Binary                       |binary                                      |
- +-------------------------------------+--------------------------------------------+
- |Symbol                               |symbol                                      |
- +-------------------------------------+--------------------------------------------+
- |interface{}                          |the contained type                          |
- +-------------------------------------+--------------------------------------------+
- |nil                                  |null                                        |
- +-------------------------------------+--------------------------------------------+
- |map[K]T                              |map with K and T converted as above         |
- +-------------------------------------+--------------------------------------------+
- |Map                                  |map, may have mixed types for keys, values  |
- +-------------------------------------+--------------------------------------------+
- |[]T                                  |list with T converted as above              |
- +-------------------------------------+--------------------------------------------+
- |List                                 |list, may have mixed types  values          |
- +-------------------------------------+--------------------------------------------+
-
-The following Go types cannot be marshaled: uintptr, function, interface, channel
-
-TODO
-
-Go types: array, slice, struct, complex64/128.
-
-AMQP types: decimal32/64/128, char, timestamp, uuid, array, multi-section message bodies.
-
-Described types.
-
-*/
-func Marshal(v interface{}, buffer []byte) (outbuf []byte, err error) {
-	defer doRecover(&err)
-	data := C.pn_data(0)
-	defer C.pn_data_free(data)
-	marshal(v, data)
-	encode := func(buf []byte) ([]byte, error) {
-		n := int(C.pn_data_encode(data, cPtr(buf), cLen(buf)))
-		switch {
-		case n == int(C.PN_OVERFLOW):
-			return buf, overflow
-		case n < 0:
-			return buf, dataError("marshal error", data)
-		default:
-			return buf[:n], nil
-		}
-	}
-	return encodeGrow(buffer, encode)
-}
-
-const minEncode = 256
-
-// overflow is returned when an encoding function can't fit data in the buffer.
-var overflow = internal.Errorf("buffer too small")
-
-// encodeFn encodes into buffer[0:len(buffer)].
-// Returns buffer with length adjusted for data encoded.
-// If buffer too small, returns overflow as error.
-type encodeFn func(buffer []byte) ([]byte, error)
-
-// encodeGrow calls encode() into buffer, if it returns overflow grows the buffer.
-// Returns the final buffer.
-func encodeGrow(buffer []byte, encode encodeFn) ([]byte, error) {
-	if buffer == nil || len(buffer) == 0 {
-		buffer = make([]byte, minEncode)
-	}
-	var err error
-	for buffer, err = encode(buffer); err == overflow; buffer, err = encode(buffer) {
-		buffer = make([]byte, 2*len(buffer))
-	}
-	return buffer, err
-}
-
-func marshal(v interface{}, data *C.pn_data_t) {
-	switch v := v.(type) {
-	case nil:
-		C.pn_data_put_null(data)
-	case bool:
-		C.pn_data_put_bool(data, C.bool(v))
-	case int8:
-		C.pn_data_put_byte(data, C.int8_t(v))
-	case int16:
-		C.pn_data_put_short(data, C.int16_t(v))
-	case int32:
-		C.pn_data_put_int(data, C.int32_t(v))
-	case int64:
-		C.pn_data_put_long(data, C.int64_t(v))
-	case int:
-		if unsafe.Sizeof(0) == 8 {
-			C.pn_data_put_long(data, C.int64_t(v))
-		} else {
-			C.pn_data_put_int(data, C.int32_t(v))
-		}
-	case uint8:
-		C.pn_data_put_ubyte(data, C.uint8_t(v))
-	case uint16:
-		C.pn_data_put_ushort(data, C.uint16_t(v))
-	case uint32:
-		C.pn_data_put_uint(data, C.uint32_t(v))
-	case uint64:
-		C.pn_data_put_ulong(data, C.uint64_t(v))
-	case uint:
-		if unsafe.Sizeof(0) == 8 {
-			C.pn_data_put_ulong(data, C.uint64_t(v))
-		} else {
-			C.pn_data_put_uint(data, C.uint32_t(v))
-		}
-	case float32:
-		C.pn_data_put_float(data, C.float(v))
-	case float64:
-		C.pn_data_put_double(data, C.double(v))
-	case string:
-		C.pn_data_put_string(data, pnBytes([]byte(v)))
-	case []byte:
-		C.pn_data_put_binary(data, pnBytes(v))
-	case Binary:
-		C.pn_data_put_binary(data, pnBytes([]byte(v)))
-	case Symbol:
-		C.pn_data_put_symbol(data, pnBytes([]byte(v)))
-	case Map: // Special map type
-		C.pn_data_put_map(data)
-		C.pn_data_enter(data)
-		for key, val := range v {
-			marshal(key, data)
-			marshal(val, data)
-		}
-		C.pn_data_exit(data)
-	default:
-		switch reflect.TypeOf(v).Kind() {
-		case reflect.Map:
-			putMap(data, v)
-		case reflect.Slice:
-			putList(data, v)
-		default:
-			panic(internal.Errorf("cannot marshal %s to AMQP", reflect.TypeOf(v)))
-		}
-	}
-	err := dataError("marshal", data)
-	if err != nil {
-		panic(err)
-	}
-	return
-}
-
-func clearMarshal(v interface{}, data *C.pn_data_t) {
-	C.pn_data_clear(data)
-	marshal(v, data)
-}
-
-func putMap(data *C.pn_data_t, v interface{}) {
-	mapValue := reflect.ValueOf(v)
-	C.pn_data_put_map(data)
-	C.pn_data_enter(data)
-	for _, key := range mapValue.MapKeys() {
-		marshal(key.Interface(), data)
-		marshal(mapValue.MapIndex(key).Interface(), data)
-	}
-	C.pn_data_exit(data)
-}
-
-func putList(data *C.pn_data_t, v interface{}) {
-	listValue := reflect.ValueOf(v)
-	C.pn_data_put_list(data)
-	C.pn_data_enter(data)
-	for i := 0; i < listValue.Len(); i++ {
-		marshal(listValue.Index(i).Interface(), data)
-	}
-	C.pn_data_exit(data)
-}
-
-// Encoder encodes AMQP values to an io.Writer
-type Encoder struct {
-	writer io.Writer
-	buffer []byte
-}
-
-// New encoder returns a new encoder that writes to w.
-func NewEncoder(w io.Writer) *Encoder {
-	return &Encoder{w, make([]byte, minEncode)}
-}
-
-func (e *Encoder) Encode(v interface{}) (err error) {
-	e.buffer, err = Marshal(v, e.buffer)
-	if err == nil {
-		e.writer.Write(e.buffer)
-	}
-	return err
-}
-
-func replace(data *C.pn_data_t, v interface{}) {
-	C.pn_data_clear(data)
-	marshal(v, data)
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/message.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/message.go b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/message.go
deleted file mode 100644
index 20cfa02..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/message.go
+++ /dev/null
@@ -1,347 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package amqp
-
-// #include <proton/types.h>
-// #include <proton/message.h>
-// #include <proton/codec.h>
-// #include <stdlib.h>
-//
-// /* Helper for setting message string fields */
-// typedef int (*set_fn)(pn_message_t*, const char*);
-// int msg_set_str(pn_message_t* m, char* s, set_fn set) {
-//     int result = set(m, s);
-//     free(s);
-//     return result;
-// }
-//
-import "C"
-
-import (
-	"qpid.apache.org/proton/internal"
-	"runtime"
-	"time"
-	"unsafe"
-)
-
-// Message is the interface to an AMQP message.
-type Message interface {
-	// Durable indicates that any parties taking responsibility
-	// for the message must durably store the content.
-	Durable() bool
-	SetDurable(bool)
-
-	// Priority impacts ordering guarantees. Within a
-	// given ordered context, higher priority messages may jump ahead of
-	// lower priority messages.
-	Priority() uint8
-	SetPriority(uint8)
-
-	// TTL or Time To Live, a message it may be dropped after this duration
-	TTL() time.Duration
-	SetTTL(time.Duration)
-
-	// FirstAcquirer indicates
-	// that the recipient of the message is the first recipient to acquire
-	// the message, i.e. there have been no failed delivery attempts to
-	// other acquirers. Note that this does not mean the message has not
-	// been delivered to, but not acquired, by other recipients.
-	FirstAcquirer() bool
-	SetFirstAcquirer(bool)
-
-	// DeliveryCount tracks how many attempts have been made to
-	// delivery a message.
-	DeliveryCount() uint32
-	SetDeliveryCount(uint32)
-
-	// MessageId provides a unique identifier for a message.
-	// it can be an a string, an unsigned long, a uuid or a
-	// binary value.
-	MessageId() interface{}
-	SetMessageId(interface{})
-
-	UserId() string
-	SetUserId(string)
-
-	Address() string
-	SetAddress(string)
-
-	Subject() string
-	SetSubject(string)
-
-	ReplyTo() string
-	SetReplyTo(string)
-
-	// CorrelationId is set on correlated request and response messages. It can be
-	// an a string, an unsigned long, a uuid or a binary value.
-	CorrelationId() interface{}
-	SetCorrelationId(interface{})
-
-	ContentType() string
-	SetContentType(string)
-
-	ContentEncoding() string
-	SetContentEncoding(string)
-
-	// ExpiryTime indicates an absoulte time when the message may be dropped.
-	// A Zero time (i.e. t.isZero() == true) indicates a message never expires.
-	ExpiryTime() time.Time
-	SetExpiryTime(time.Time)
-
-	CreationTime() time.Time
-	SetCreationTime(time.Time)
-
-	GroupId() string
-	SetGroupId(string)
-
-	GroupSequence() int32
-	SetGroupSequence(int32)
-
-	ReplyToGroupId() string
-	SetReplyToGroupId(string)
-
-	// Instructions - AMQP delivery instructions.
-	Instructions() map[string]interface{}
-	SetInstructions(v map[string]interface{})
-
-	// Annotations - AMQP annotations.
-	Annotations() map[string]interface{}
-	SetAnnotations(v map[string]interface{})
-
-	// Properties - Application properties.
-	Properties() map[string]interface{}
-	SetProperties(v map[string]interface{})
-
-	// Inferred indicates how the message content
-	// is encoded into AMQP sections. If inferred is true then binary and
-	// list values in the body of the message will be encoded as AMQP DATA
-	// and AMQP SEQUENCE sections, respectively. If inferred is false,
-	// then all values in the body of the message will be encoded as AMQP
-	// VALUE sections regardless of their type.
-	Inferred() bool
-	SetInferred(bool)
-
-	// Marshal a Go value into the message body. See amqp.Marshal() for details.
-	Marshal(interface{})
-
-	// Unmarshal the message body into the value pointed to by v. See amqp.Unmarshal() for details.
-	Unmarshal(interface{})
-
-	// Body value resulting from the default unmarshalling of message body as interface{}
-	Body() interface{}
-
-	// Encode encodes the message as AMQP data. If buffer is non-nil and is large enough
-	// the message is encoded into it, otherwise a new buffer is created.
-	// Returns the buffer containing the message.
-	Encode(buffer []byte) ([]byte, error)
-
-	// Decode data into this message. Overwrites an existing message content.
-	Decode(buffer []byte) error
-
-	// Clear the message contents.
-	Clear()
-
-	// Copy the contents of another message to this one.
-	Copy(m Message) error
-}
-
-type message struct{ pn *C.pn_message_t }
-
-func freeMessage(m *message) {
-	C.pn_message_free(m.pn)
-	m.pn = nil
-}
-
-// NewMessage creates a new message instance.
-func NewMessage() Message {
-	m := &message{C.pn_message()}
-	runtime.SetFinalizer(m, freeMessage)
-	return m
-}
-
-// NewMessageWith creates a message with value as the body. Equivalent to
-//     m := NewMessage(); m.Marshal(body)
-func NewMessageWith(value interface{}) Message {
-	m := NewMessage()
-	m.Marshal(value)
-	return m
-}
-
-func (m *message) Clear() { C.pn_message_clear(m.pn) }
-
-func (m *message) Copy(x Message) error {
-	if data, err := x.Encode(nil); err == nil {
-		return m.Decode(data)
-	} else {
-		return err
-	}
-}
-
-// ==== message get functions
-
-func rewindGet(data *C.pn_data_t) (v interface{}) {
-	C.pn_data_rewind(data)
-	C.pn_data_next(data)
-	unmarshal(&v, data)
-	return v
-}
-
-func rewindMap(data *C.pn_data_t) (v map[string]interface{}) {
-	C.pn_data_rewind(data)
-	C.pn_data_next(data)
-	unmarshal(&v, data)
-	return v
-}
-
-func (m *message) Inferred() bool  { return bool(C.pn_message_is_inferred(m.pn)) }
-func (m *message) Durable() bool   { return bool(C.pn_message_is_durable(m.pn)) }
-func (m *message) Priority() uint8 { return uint8(C.pn_message_get_priority(m.pn)) }
-func (m *message) TTL() time.Duration {
-	return time.Duration(C.pn_message_get_ttl(m.pn)) * time.Millisecond
-}
-func (m *message) FirstAcquirer() bool        { return bool(C.pn_message_is_first_acquirer(m.pn)) }
-func (m *message) DeliveryCount() uint32      { return uint32(C.pn_message_get_delivery_count(m.pn)) }
-func (m *message) MessageId() interface{}     { return rewindGet(C.pn_message_id(m.pn)) }
-func (m *message) UserId() string             { return goString(C.pn_message_get_user_id(m.pn)) }
-func (m *message) Address() string            { return C.GoString(C.pn_message_get_address(m.pn)) }
-func (m *message) Subject() string            { return C.GoString(C.pn_message_get_subject(m.pn)) }
-func (m *message) ReplyTo() string            { return C.GoString(C.pn_message_get_reply_to(m.pn)) }
-func (m *message) CorrelationId() interface{} { return rewindGet(C.pn_message_correlation_id(m.pn)) }
-func (m *message) ContentType() string        { return C.GoString(C.pn_message_get_content_type(m.pn)) }
-func (m *message) ContentEncoding() string    { return C.GoString(C.pn_message_get_content_encoding(m.pn)) }
-
-func (m *message) ExpiryTime() time.Time {
-	return time.Unix(0, int64(time.Millisecond*time.Duration(C.pn_message_get_expiry_time(m.pn))))
-}
-func (m *message) CreationTime() time.Time {
-	return time.Unix(0, int64(time.Millisecond)*int64(C.pn_message_get_creation_time(m.pn)))
-}
-func (m *message) GroupId() string        { return C.GoString(C.pn_message_get_group_id(m.pn)) }
-func (m *message) GroupSequence() int32   { return int32(C.pn_message_get_group_sequence(m.pn)) }
-func (m *message) ReplyToGroupId() string { return C.GoString(C.pn_message_get_reply_to_group_id(m.pn)) }
-
-func (m *message) Instructions() map[string]interface{} {
-	return rewindMap(C.pn_message_instructions(m.pn))
-}
-func (m *message) Annotations() map[string]interface{} {
-	return rewindMap(C.pn_message_annotations(m.pn))
-}
-func (m *message) Properties() map[string]interface{} {
-	return rewindMap(C.pn_message_properties(m.pn))
-}
-
-// ==== message set methods
-
-func setData(v interface{}, data *C.pn_data_t) {
-	C.pn_data_clear(data)
-	marshal(v, data)
-}
-
-func dataString(data *C.pn_data_t) string {
-	str := C.pn_string(C.CString(""))
-	defer C.pn_free(unsafe.Pointer(str))
-	C.pn_inspect(unsafe.Pointer(data), str)
-	return C.GoString(C.pn_string_get(str))
-}
-
-func (m *message) SetInferred(b bool)  { C.pn_message_set_inferred(m.pn, C.bool(m.Inferred())) }
-func (m *message) SetDurable(b bool)   { C.pn_message_set_durable(m.pn, C.bool(b)) }
-func (m *message) SetPriority(b uint8) { C.pn_message_set_priority(m.pn, C.uint8_t(b)) }
-func (m *message) SetTTL(d time.Duration) {
-	C.pn_message_set_ttl(m.pn, C.pn_millis_t(d/time.Millisecond))
-}
-func (m *message) SetFirstAcquirer(b bool)     { C.pn_message_set_first_acquirer(m.pn, C.bool(b)) }
-func (m *message) SetDeliveryCount(c uint32)   { C.pn_message_set_delivery_count(m.pn, C.uint32_t(c)) }
-func (m *message) SetMessageId(id interface{}) { setData(id, C.pn_message_id(m.pn)) }
-func (m *message) SetUserId(s string)          { C.pn_message_set_user_id(m.pn, pnBytes(([]byte)(s))) }
-func (m *message) SetAddress(s string) {
-	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_address))
-}
-func (m *message) SetSubject(s string) {
-	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_subject))
-}
-func (m *message) SetReplyTo(s string) {
-	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to))
-}
-func (m *message) SetCorrelationId(c interface{}) { setData(c, C.pn_message_correlation_id(m.pn)) }
-func (m *message) SetContentType(s string) {
-	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_type))
-}
-func (m *message) SetContentEncoding(s string) {
-	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_encoding))
-}
-func (m *message) SetExpiryTime(t time.Time)   { C.pn_message_set_expiry_time(m.pn, pnTime(t)) }
-func (m *message) SetCreationTime(t time.Time) { C.pn_message_set_creation_time(m.pn, pnTime(t)) }
-func (m *message) SetGroupId(s string) {
-	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_group_id))
-}
-func (m *message) SetGroupSequence(s int32) {
-	C.pn_message_set_group_sequence(m.pn, C.pn_sequence_t(s))
-}
-func (m *message) SetReplyToGroupId(s string) {
-	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to_group_id))
-}
-
-func (m *message) SetInstructions(v map[string]interface{}) {
-	setData(v, C.pn_message_instructions(m.pn))
-}
-func (m *message) SetAnnotations(v map[string]interface{}) { setData(v, C.pn_message_annotations(m.pn)) }
-func (m *message) SetProperties(v map[string]interface{})  { setData(v, C.pn_message_properties(m.pn)) }
-
-// Marshal/Unmarshal body
-func (m *message) Marshal(v interface{})   { clearMarshal(v, C.pn_message_body(m.pn)) }
-func (m *message) Unmarshal(v interface{}) { rewindUnmarshal(v, C.pn_message_body(m.pn)) }
-func (m *message) Body() (v interface{})   { m.Unmarshal(&v); return }
-
-func (m *message) Decode(data []byte) error {
-	m.Clear()
-	if len(data) == 0 {
-		return internal.Errorf("empty buffer for decode")
-	}
-	if C.pn_message_decode(m.pn, cPtr(data), cLen(data)) < 0 {
-		return internal.Errorf("decoding message: %s",
-			internal.PnError(unsafe.Pointer(C.pn_message_error(m.pn))))
-	}
-	return nil
-}
-
-func DecodeMessage(data []byte) (m Message, err error) {
-	m = NewMessage()
-	err = m.Decode(data)
-	return
-}
-
-func (m *message) Encode(buffer []byte) ([]byte, error) {
-	encode := func(buf []byte) ([]byte, error) {
-		len := cLen(buf)
-		result := C.pn_message_encode(m.pn, cPtr(buf), &len)
-		switch {
-		case result == C.PN_OVERFLOW:
-			return buf, overflow
-		case result < 0:
-			return buf, internal.Errorf("cannot encode message: %s", internal.PnErrorCode(result))
-		default:
-			return buf[:len], nil
-		}
-	}
-	return encodeGrow(buffer, encode)
-}
-
-// TODO aconway 2015-09-14: Multi-section messages.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/message_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/message_test.go b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/message_test.go
deleted file mode 100644
index 7a6e5a8..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/message_test.go
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package amqp
-
-import (
-	"testing"
-	"time"
-)
-
-func roundTrip(m Message) error {
-	buffer, err := m.Encode(nil)
-	if err != nil {
-		return err
-	}
-	m2, err := DecodeMessage(buffer)
-	if err != nil {
-		return err
-	}
-	return checkEqual(m, m2)
-}
-
-func TestDefaultMessage(t *testing.T) {
-	m := NewMessage()
-	// Check defaults
-	for _, data := range [][]interface{}{
-		{m.Inferred(), false},
-		{m.Durable(), false},
-		{m.Priority(), uint8(4)},
-		{m.TTL(), time.Duration(0)},
-		{m.UserId(), ""},
-		{m.Address(), ""},
-		{m.Subject(), ""},
-		{m.ReplyTo(), ""},
-		{m.ContentType(), ""},
-		{m.ContentEncoding(), ""},
-		{m.GroupId(), ""},
-		{m.GroupSequence(), int32(0)},
-		{m.ReplyToGroupId(), ""},
-		{m.MessageId(), nil},
-		{m.CorrelationId(), nil},
-		{m.Instructions(), map[string]interface{}{}},
-		{m.Annotations(), map[string]interface{}{}},
-		{m.Properties(), map[string]interface{}{}},
-		{m.Body(), nil},
-	} {
-		if err := checkEqual(data[0], data[1]); err != nil {
-			t.Error(err)
-		}
-	}
-	if err := roundTrip(m); err != nil {
-		t.Error(err)
-	}
-}
-
-func TestMessageRoundTrip(t *testing.T) {
-	m := NewMessage()
-	m.SetInferred(false)
-	m.SetDurable(true)
-	m.SetPriority(42)
-	m.SetTTL(0)
-	m.SetUserId("user")
-	m.SetAddress("address")
-	m.SetSubject("subject")
-	m.SetReplyTo("replyto")
-	m.SetContentType("content")
-	m.SetContentEncoding("encoding")
-	m.SetGroupId("group")
-	m.SetGroupSequence(42)
-	m.SetReplyToGroupId("replytogroup")
-	m.SetMessageId("id")
-	m.SetCorrelationId("correlation")
-	m.SetInstructions(map[string]interface{}{"instructions": "foo"})
-	m.SetAnnotations(map[string]interface{}{"annotations": "foo"})
-	m.SetProperties(map[string]interface{}{"int": int32(32), "bool": true, "string": "foo"})
-	m.Marshal("hello")
-
-	for _, data := range [][]interface{}{
-		{m.Inferred(), false},
-		{m.Durable(), true},
-		{m.Priority(), uint8(42)},
-		{m.TTL(), time.Duration(0)},
-		{m.UserId(), "user"},
-		{m.Address(), "address"},
-		{m.Subject(), "subject"},
-		{m.ReplyTo(), "replyto"},
-		{m.ContentType(), "content"},
-		{m.ContentEncoding(), "encoding"},
-		{m.GroupId(), "group"},
-		{m.GroupSequence(), int32(42)},
-		{m.ReplyToGroupId(), "replytogroup"},
-		{m.MessageId(), "id"},
-		{m.CorrelationId(), "correlation"},
-		{m.Instructions(), map[string]interface{}{"instructions": "foo"}},
-		{m.Annotations(), map[string]interface{}{"annotations": "foo"}},
-		{m.Properties(), map[string]interface{}{"int": int32(32), "bool": true, "string": "foo"}},
-		{m.Body(), "hello"},
-	} {
-		if err := checkEqual(data[0], data[1]); err != nil {
-			t.Error(err)
-		}
-	}
-	if err := roundTrip(m); err != nil {
-		t.Error(err)
-	}
-}
-
-func TestMessageBodyTypes(t *testing.T) {
-	var s string
-	var body interface{}
-	var i int64
-
-	m := NewMessageWith(int64(42))
-	m.Unmarshal(&body)
-	m.Unmarshal(&i)
-	if err := checkEqual(body.(int64), int64(42)); err != nil {
-		t.Error(err)
-	}
-	if err := checkEqual(i, int64(42)); err != nil {
-		t.Error(err)
-	}
-
-	m = NewMessageWith("hello")
-	m.Unmarshal(&s)
-	m.Unmarshal(&body)
-	if err := checkEqual(s, "hello"); err != nil {
-		t.Error(err)
-	}
-	if err := checkEqual(body.(string), "hello"); err != nil {
-		t.Error(err)
-	}
-	if err := roundTrip(m); err != nil {
-		t.Error(err)
-	}
-
-	m = NewMessageWith(Binary("bin"))
-	m.Unmarshal(&s)
-	m.Unmarshal(&body)
-	if err := checkEqual(body.(Binary), Binary("bin")); err != nil {
-		t.Error(err)
-	}
-	if err := checkEqual(s, "bin"); err != nil {
-		t.Error(err)
-	}
-	if err := roundTrip(m); err != nil {
-		t.Error(err)
-	}
-
-	// TODO aconway 2015-09-08: array etc.
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/478ba4ea/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/types.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/types.go b/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/types.go
deleted file mode 100644
index 131c974..0000000
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/amqp/types.go
+++ /dev/null
@@ -1,198 +0,0 @@
-/*
-Licensed to the Apache Software Foundation (ASF) under one
-or more contributor license agreements.  See the NOTICE file
-distributed with this work for additional information
-regarding copyright ownership.  The ASF licenses this file
-to you under the Apache License, Version 2.0 (the
-"License"); you may not use this file except in compliance
-with the License.  You may obtain a copy of the License at
-
-  http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing,
-software distributed under the License is distributed on an
-"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-KIND, either express or implied.  See the License for the
-specific language governing permissions and limitations
-under the License.
-*/
-
-package amqp
-
-// #include <proton/codec.h>
-import "C"
-
-import (
-	"bytes"
-	"fmt"
-	"reflect"
-	"time"
-	"unsafe"
-)
-
-type Type C.pn_type_t
-
-func (t Type) String() string {
-	switch C.pn_type_t(t) {
-	case C.PN_NULL:
-		return "null"
-	case C.PN_BOOL:
-		return "bool"
-	case C.PN_UBYTE:
-		return "ubyte"
-	case C.PN_BYTE:
-		return "byte"
-	case C.PN_USHORT:
-		return "ushort"
-	case C.PN_SHORT:
-		return "short"
-	case C.PN_CHAR:
-		return "char"
-	case C.PN_UINT:
-		return "uint"
-	case C.PN_INT:
-		return "int"
-	case C.PN_ULONG:
-		return "ulong"
-	case C.PN_LONG:
-		return "long"
-	case C.PN_TIMESTAMP:
-		return "timestamp"
-	case C.PN_FLOAT:
-		return "float"
-	case C.PN_DOUBLE:
-		return "double"
-	case C.PN_DECIMAL32:
-		return "decimal32"
-	case C.PN_DECIMAL64:
-		return "decimal64"
-	case C.PN_DECIMAL128:
-		return "decimal128"
-	case C.PN_UUID:
-		return "uuid"
-	case C.PN_BINARY:
-		return "binary"
-	case C.PN_STRING:
-		return "string"
-	case C.PN_SYMBOL:
-		return "symbol"
-	case C.PN_DESCRIBED:
-		return "described"
-	case C.PN_ARRAY:
-		return "array"
-	case C.PN_LIST:
-		return "list"
-	case C.PN_MAP:
-		return "map"
-	case C.PN_INVALID:
-		return "no-data"
-	default:
-		return fmt.Sprintf("unknown-type(%d)", t)
-	}
-}
-
-// Go types
-var (
-	bytesType = reflect.TypeOf([]byte{})
-	valueType = reflect.TypeOf(reflect.Value{})
-)
-
-// TODO aconway 2015-04-08: can't handle AMQP maps with key types that are not valid Go map keys.
-
-// Map is a generic map that can have mixed key and value types and so can represent any AMQP map
-type Map map[interface{}]interface{}
-
-// List is a generic list that can hold mixed values and can represent any AMQP list.
-//
-type List []interface{}
-
-// Symbol is a string that is encoded as an AMQP symbol
-type Symbol string
-
-func (s Symbol) GoString() string { return fmt.Sprintf("s\"%s\"", s) }
-
-// Binary is a string that is encoded as an AMQP binary.
-// It is a string rather than a byte[] because byte[] is not hashable and can't be used as
-// a map key, AMQP frequently uses binary types as map keys. It can convert to and from []byte
-type Binary string
-
-func (b Binary) GoString() string { return fmt.Sprintf("b\"%s\"", b) }
-
-// GoString for Map prints values with their types, useful for debugging.
-func (m Map) GoString() string {
-	out := &bytes.Buffer{}
-	fmt.Fprintf(out, "%T{", m)
-	i := len(m)
-	for k, v := range m {
-		fmt.Fprintf(out, "%T(%#v): %T(%#v)", k, k, v, v)
-		i--
-		if i > 0 {
-			fmt.Fprint(out, ", ")
-		}
-	}
-	fmt.Fprint(out, "}")
-	return out.String()
-}
-
-// GoString for List prints values with their types, useful for debugging.
-func (l List) GoString() string {
-	out := &bytes.Buffer{}
-	fmt.Fprintf(out, "%T{", l)
-	for i := 0; i < len(l); i++ {
-		fmt.Fprintf(out, "%T(%#v)", l[i], l[i])
-		if i == len(l)-1 {
-			fmt.Fprint(out, ", ")
-		}
-	}
-	fmt.Fprint(out, "}")
-	return out.String()
-}
-
-// pnTime converts Go time.Time to Proton millisecond Unix time.
-func pnTime(t time.Time) C.pn_timestamp_t {
-	secs := t.Unix()
-	// Note: sub-second accuracy is not guaraunteed if the Unix time in
-	// nanoseconds cannot be represented by an int64 (sometime around year 2260)
-	msecs := (t.UnixNano() % int64(time.Second)) / int64(time.Millisecond)
-	return C.pn_timestamp_t(secs*1000 + msecs)
-}
-
-// goTime converts a pn_timestamp_t to a Go time.Time.
-func goTime(t C.pn_timestamp_t) time.Time {
-	secs := int64(t) / 1000
-	nsecs := (int64(t) % 1000) * int64(time.Millisecond)
-	return time.Unix(secs, nsecs)
-}
-
-func goBytes(cBytes C.pn_bytes_t) (bytes []byte) {
-	if cBytes.start != nil {
-		bytes = C.GoBytes(unsafe.Pointer(cBytes.start), C.int(cBytes.size))
-	}
-	return
-}
-
-func goString(cBytes C.pn_bytes_t) (str string) {
-	if cBytes.start != nil {
-		str = C.GoStringN(cBytes.start, C.int(cBytes.size))
-	}
-	return
-}
-
-func pnBytes(b []byte) C.pn_bytes_t {
-	if len(b) == 0 {
-		return C.pn_bytes_t{0, nil}
-	} else {
-		return C.pn_bytes_t{C.size_t(len(b)), (*C.char)(unsafe.Pointer(&b[0]))}
-	}
-}
-
-func cPtr(b []byte) *C.char {
-	if len(b) == 0 {
-		return nil
-	}
-	return (*C.char)(unsafe.Pointer(&b[0]))
-}
-
-func cLen(b []byte) C.size_t {
-	return C.size_t(len(b))
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org