You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/10/11 22:21:43 UTC

[11/12] qpid-proton git commit: PROTON-1910: [go] native Message implementation

PROTON-1910: [go] native Message implementation


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/886d2b93
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/886d2b93
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/886d2b93

Branch: refs/heads/master
Commit: 886d2b9349f74c02219b29990ee04278124f5224
Parents: ef716fa
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Oct 11 15:20:23 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 11 15:20:23 2018 -0400

----------------------------------------------------------------------
 go/src/qpid.apache.org/amqp/message.go      | 521 +++++++++++++++--------
 go/src/qpid.apache.org/amqp/message_test.go |  10 +-
 2 files changed, 355 insertions(+), 176 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/886d2b93/go/src/qpid.apache.org/amqp/message.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/amqp/message.go b/go/src/qpid.apache.org/amqp/message.go
index e514b26..919904c 100644
--- a/go/src/qpid.apache.org/amqp/message.go
+++ b/go/src/qpid.apache.org/amqp/message.go
@@ -35,10 +35,10 @@ package amqp
 import "C"
 
 import (
+	"bytes"
 	"fmt"
-	"runtime"
+	"reflect"
 	"time"
-	"unsafe"
 )
 
 // Message is the interface to an AMQP message.
@@ -124,11 +124,12 @@ type Message interface {
 
 	// Per-delivery annotations to provide delivery instructions.
 	// May be added or removed by intermediaries during delivery.
+	// See ApplicationProperties() for properties set by the application.
 	DeliveryAnnotations() map[AnnotationKey]interface{}
 	SetDeliveryAnnotations(map[AnnotationKey]interface{})
 
 	// Message annotations added as part of the bare message at creation, usually
-	// by an AMQP library. See ApplicationProperties() for adding application data.
+	// by an AMQP library. See ApplicationProperties() for properties set by the application.
 	MessageAnnotations() map[AnnotationKey]interface{}
 	SetMessageAnnotations(map[AnnotationKey]interface{})
 
@@ -141,15 +142,18 @@ type Message interface {
 	Inferred() bool
 	SetInferred(bool)
 
-	// Marshal a Go value into the message body. See amqp.Marshal() for details.
+	// Get the message body, using the amqp.Unmarshal() rules for interface{}
+	Body() interface{}
+
+	// Set the body using amqp.Marshal()
+	SetBody(interface{})
+
+	// Marshal a Go value into the message body, synonym for SetBody()
 	Marshal(interface{})
 
-	// Unmarshal the message body into the value pointed to by v. See amqp.Unmarshal() for details.
+	// Unmarshal the message body into the value pointed to by v. See amqp.Unmarshal()
 	Unmarshal(interface{})
 
-	// Body value resulting from the default unmarshaling of message body as interface{}
-	Body() interface{}
-
 	// Encode encodes the message as AMQP data. If buffer is non-nil and is large enough
 	// the message is encoded into it, otherwise a new buffer is created.
 	// Returns the buffer containing the message.
@@ -158,7 +162,7 @@ type Message interface {
 	// Decode data into this message. Overwrites an existing message content.
 	Decode(buffer []byte) error
 
-	// Clear the message contents.
+	// Clear the message contents, set all fields to the default value.
 	Clear()
 
 	// Copy the contents of another message to this one.
@@ -180,194 +184,205 @@ type Message interface {
 	String() string
 }
 
-type message struct{ pn *C.pn_message_t }
-
-func freeMessage(m *message) {
-	C.pn_message_free(m.pn)
-	m.pn = nil
-}
-
 // NewMessage creates a new message instance.
 func NewMessage() Message {
-	m := &message{C.pn_message()}
-	runtime.SetFinalizer(m, freeMessage)
+	m := &message{}
+	m.Clear()
 	return m
 }
 
-// NewMessageWith creates a message with value as the body. Equivalent to
-//     m := NewMessage(); m.Marshal(body)
+// NewMessageWith creates a message with value as the body.
 func NewMessageWith(value interface{}) Message {
 	m := NewMessage()
-	m.Marshal(value)
+	m.SetBody(value)
 	return m
 }
 
-func (m *message) Clear() { C.pn_message_clear(m.pn) }
-
-func (m *message) Copy(x Message) error {
-	if data, err := x.Encode(nil); err == nil {
-		return m.Decode(data)
-	} else {
-		return err
-	}
-}
-
-// ==== message get functions
-
-func rewindGet(data *C.pn_data_t) (v interface{}) {
-	C.pn_data_rewind(data)
-	C.pn_data_next(data)
-	unmarshal(&v, data)
-	return v
-}
-
-func (m *message) Inferred() bool  { return bool(C.pn_message_is_inferred(m.pn)) }
-func (m *message) Durable() bool   { return bool(C.pn_message_is_durable(m.pn)) }
-func (m *message) Priority() uint8 { return uint8(C.pn_message_get_priority(m.pn)) }
-func (m *message) TTL() time.Duration {
-	return time.Duration(C.pn_message_get_ttl(m.pn)) * time.Millisecond
+// NewMessageCopy creates a copy of an existing message.
+func NewMessageCopy(m Message) Message {
+	m2 := NewMessage()
+	m2.Copy(m)
+	return m2
 }
-func (m *message) FirstAcquirer() bool        { return bool(C.pn_message_is_first_acquirer(m.pn)) }
-func (m *message) DeliveryCount() uint32      { return uint32(C.pn_message_get_delivery_count(m.pn)) }
-func (m *message) MessageId() interface{}     { return rewindGet(C.pn_message_id(m.pn)) }
-func (m *message) UserId() string             { return goString(C.pn_message_get_user_id(m.pn)) }
-func (m *message) Address() string            { return C.GoString(C.pn_message_get_address(m.pn)) }
-func (m *message) Subject() string            { return C.GoString(C.pn_message_get_subject(m.pn)) }
-func (m *message) ReplyTo() string            { return C.GoString(C.pn_message_get_reply_to(m.pn)) }
-func (m *message) CorrelationId() interface{} { return rewindGet(C.pn_message_correlation_id(m.pn)) }
-func (m *message) ContentType() string        { return C.GoString(C.pn_message_get_content_type(m.pn)) }
-func (m *message) ContentEncoding() string    { return C.GoString(C.pn_message_get_content_encoding(m.pn)) }
 
-func (m *message) ExpiryTime() time.Time {
-	return time.Unix(0, int64(time.Millisecond*time.Duration(C.pn_message_get_expiry_time(m.pn))))
-}
-func (m *message) CreationTime() time.Time {
-	return time.Unix(0, int64(time.Millisecond)*int64(C.pn_message_get_creation_time(m.pn)))
-}
-func (m *message) GroupId() string        { return C.GoString(C.pn_message_get_group_id(m.pn)) }
-func (m *message) GroupSequence() int32   { return int32(C.pn_message_get_group_sequence(m.pn)) }
-func (m *message) ReplyToGroupId() string { return C.GoString(C.pn_message_get_reply_to_group_id(m.pn)) }
+// Reset message to all default values
+func (m *message) Clear() { *m = message{priority: 4} }
 
-func getAnnotations(data *C.pn_data_t) (v map[AnnotationKey]interface{}) {
-	if C.pn_data_size(data) > 0 {
-		C.pn_data_rewind(data)
-		C.pn_data_next(data)
-		unmarshal(&v, data)
+// Copy makes a deep copy of message x
+func (m *message) Copy(x Message) error {
+	var mc MessageCodec
+	bytes, err := mc.Encode(x, nil)
+	if err == nil {
+		err = mc.Decode(m, bytes)
 	}
-	return v
-}
+	return err
+}
+
+type message struct {
+	address               string
+	applicationProperties map[string]interface{}
+	contentEncoding       string
+	contentType           string
+	correlationId         interface{}
+	creationTime          time.Time
+	deliveryAnnotations   map[AnnotationKey]interface{}
+	deliveryCount         uint32
+	durable               bool
+	expiryTime            time.Time
+	firstAcquirer         bool
+	groupId               string
+	groupSequence         int32
+	inferred              bool
+	messageAnnotations    map[AnnotationKey]interface{}
+	messageId             interface{}
+	priority              uint8
+	replyTo               string
+	replyToGroupId        string
+	subject               string
+	ttl                   time.Duration
+	userId                string
+	body                  interface{}
+	// Keep the original data to support Unmarshal to a non-interface{} type
+	// Waste of memory, consider deprecating or making it optional.
+	pnBody *C.pn_data_t
+}
+
+// ==== message get methods
+func (m *message) Body() interface{}          { return m.body }
+func (m *message) Inferred() bool             { return m.inferred }
+func (m *message) Durable() bool              { return m.durable }
+func (m *message) Priority() uint8            { return m.priority }
+func (m *message) TTL() time.Duration         { return m.ttl }
+func (m *message) FirstAcquirer() bool        { return m.firstAcquirer }
+func (m *message) DeliveryCount() uint32      { return m.deliveryCount }
+func (m *message) MessageId() interface{}     { return m.messageId }
+func (m *message) UserId() string             { return m.userId }
+func (m *message) Address() string            { return m.address }
+func (m *message) Subject() string            { return m.subject }
+func (m *message) ReplyTo() string            { return m.replyTo }
+func (m *message) CorrelationId() interface{} { return m.correlationId }
+func (m *message) ContentType() string        { return m.contentType }
+func (m *message) ContentEncoding() string    { return m.contentEncoding }
+func (m *message) ExpiryTime() time.Time      { return m.expiryTime }
+func (m *message) CreationTime() time.Time    { return m.creationTime }
+func (m *message) GroupId() string            { return m.groupId }
+func (m *message) GroupSequence() int32       { return m.groupSequence }
+func (m *message) ReplyToGroupId() string     { return m.replyToGroupId }
 
 func (m *message) DeliveryAnnotations() map[AnnotationKey]interface{} {
-	return getAnnotations(C.pn_message_instructions(m.pn))
+	if m.deliveryAnnotations == nil {
+		m.deliveryAnnotations = make(map[AnnotationKey]interface{})
+	}
+	return m.deliveryAnnotations
 }
 func (m *message) MessageAnnotations() map[AnnotationKey]interface{} {
-	return getAnnotations(C.pn_message_annotations(m.pn))
+	if m.messageAnnotations == nil {
+		m.messageAnnotations = make(map[AnnotationKey]interface{})
+	}
+	return m.messageAnnotations
 }
-
 func (m *message) ApplicationProperties() map[string]interface{} {
-	var v map[string]interface{}
-	data := C.pn_message_properties(m.pn)
-	if C.pn_data_size(data) > 0 {
-		C.pn_data_rewind(data)
-		C.pn_data_next(data)
-		unmarshal(&v, data)
+	if m.applicationProperties == nil {
+		m.applicationProperties = make(map[string]interface{})
 	}
-	return v
+	return m.applicationProperties
 }
 
 // ==== message set methods
 
-func setData(v interface{}, data *C.pn_data_t) {
-	C.pn_data_clear(data)
-	marshal(v, data)
-}
+func (m *message) SetBody(v interface{})          { m.body = v }
+func (m *message) SetInferred(x bool)             { m.inferred = x }
+func (m *message) SetDurable(x bool)              { m.durable = x }
+func (m *message) SetPriority(x uint8)            { m.priority = x }
+func (m *message) SetTTL(x time.Duration)         { m.ttl = x }
+func (m *message) SetFirstAcquirer(x bool)        { m.firstAcquirer = x }
+func (m *message) SetDeliveryCount(x uint32)      { m.deliveryCount = x }
+func (m *message) SetMessageId(x interface{})     { m.messageId = x }
+func (m *message) SetUserId(x string)             { m.userId = x }
+func (m *message) SetAddress(x string)            { m.address = x }
+func (m *message) SetSubject(x string)            { m.subject = x }
+func (m *message) SetReplyTo(x string)            { m.replyTo = x }
+func (m *message) SetCorrelationId(x interface{}) { m.correlationId = x }
+func (m *message) SetContentType(x string)        { m.contentType = x }
+func (m *message) SetContentEncoding(x string)    { m.contentEncoding = x }
+func (m *message) SetExpiryTime(x time.Time)      { m.expiryTime = x }
+func (m *message) SetCreationTime(x time.Time)    { m.creationTime = x }
+func (m *message) SetGroupId(x string)            { m.groupId = x }
+func (m *message) SetGroupSequence(x int32)       { m.groupSequence = x }
+func (m *message) SetReplyToGroupId(x string)     { m.replyToGroupId = x }
+
+func (m *message) SetDeliveryAnnotations(x map[AnnotationKey]interface{}) {
+	m.deliveryAnnotations = x
+}
+func (m *message) SetMessageAnnotations(x map[AnnotationKey]interface{}) {
+	m.messageAnnotations = x
+}
+func (m *message) SetApplicationProperties(x map[string]interface{}) {
+	m.applicationProperties = x
+}
+
+// Marshal body from v, same as SetBody(v). See amqp.Marshal.
+func (m *message) Marshal(v interface{}) { m.body = v }
 
-func (m *message) SetInferred(b bool)  { C.pn_message_set_inferred(m.pn, C.bool(b)) }
-func (m *message) SetDurable(b bool)   { C.pn_message_set_durable(m.pn, C.bool(b)) }
-func (m *message) SetPriority(b uint8) { C.pn_message_set_priority(m.pn, C.uint8_t(b)) }
-func (m *message) SetTTL(d time.Duration) {
-	C.pn_message_set_ttl(m.pn, C.pn_millis_t(d/time.Millisecond))
-}
-func (m *message) SetFirstAcquirer(b bool)     { C.pn_message_set_first_acquirer(m.pn, C.bool(b)) }
-func (m *message) SetDeliveryCount(c uint32)   { C.pn_message_set_delivery_count(m.pn, C.uint32_t(c)) }
-func (m *message) SetMessageId(id interface{}) { setData(id, C.pn_message_id(m.pn)) }
-func (m *message) SetUserId(s string)          { C.pn_message_set_user_id(m.pn, pnBytes(([]byte)(s))) }
-func (m *message) SetAddress(s string) {
-	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_address))
-}
-func (m *message) SetSubject(s string) {
-	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_subject))
-}
-func (m *message) SetReplyTo(s string) {
-	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to))
-}
-func (m *message) SetCorrelationId(c interface{}) { setData(c, C.pn_message_correlation_id(m.pn)) }
-func (m *message) SetContentType(s string) {
-	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_type))
-}
-func (m *message) SetContentEncoding(s string) {
-	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_encoding))
-}
-func (m *message) SetExpiryTime(t time.Time)   { C.pn_message_set_expiry_time(m.pn, pnTime(t)) }
-func (m *message) SetCreationTime(t time.Time) { C.pn_message_set_creation_time(m.pn, pnTime(t)) }
-func (m *message) SetGroupId(s string) {
-	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_group_id))
-}
-func (m *message) SetGroupSequence(s int32) {
-	C.pn_message_set_group_sequence(m.pn, C.pn_sequence_t(s))
-}
-func (m *message) SetReplyToGroupId(s string) {
-	C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to_group_id))
+func (m *message) Unmarshal(v interface{}) {
+	// FIXME aconway 2018-09-28: this is inefficient, replace with a
+	// reflective conversion from the existing body value that respects
+	// the Unmarshal() rules.
+	pnData := C.pn_data(2)
+	marshal(m.body, pnData)
+	unmarshal(v, pnData)
 }
 
-func (m *message) SetDeliveryAnnotations(v map[AnnotationKey]interface{}) {
-	setData(v, C.pn_message_instructions(m.pn))
-}
-func (m *message) SetMessageAnnotations(v map[AnnotationKey]interface{}) {
-	setData(v, C.pn_message_annotations(m.pn))
+// Internal use only
+type MessageCodec struct {
+	pn *C.pn_message_t // Cache a pn_message_t to speed up encode/decode
+	// Optionally remember a byte buffer to use with MessageCodec methods.
+	Buffer []byte
 }
-func (m *message) SetApplicationProperties(v map[string]interface{}) {
-	setData(v, C.pn_message_properties(m.pn))
-}
-
-// Marshal body from v
-func (m *message) Marshal(v interface{}) { clearMarshal(v, C.pn_message_body(m.pn)) }
 
-// Unmarshal body to v, which must be a pointer to a value. See amqp.Unmarshal
-func (m *message) Unmarshal(v interface{}) {
-	data := C.pn_message_body(m.pn)
-	if C.pn_data_size(data) > 0 {
-		C.pn_data_rewind(data)
-		C.pn_data_next(data)
-		unmarshal(v, data)
+func (mc *MessageCodec) pnMessage() *C.pn_message_t {
+	if mc.pn == nil {
+		mc.pn = C.pn_message()
 	}
-	return
+	return mc.pn
 }
 
-// Return the body value as an interface
-func (m *message) Body() (v interface{}) { m.Unmarshal(&v); return }
-
-func (m *message) Decode(data []byte) error {
-	m.Clear()
-	if len(data) == 0 {
-		return fmt.Errorf("empty buffer for decode")
+func (mc *MessageCodec) Close() {
+	if mc.pn != nil {
+		C.pn_message_free(mc.pn)
+		mc.pn = nil
 	}
-	if C.pn_message_decode(m.pn, cPtr(data), cLen(data)) < 0 {
-		return fmt.Errorf("decoding message: %s", PnError(C.pn_message_error(m.pn)))
+}
+
+func (mc *MessageCodec) Decode(m Message, data []byte) error {
+	pn := mc.pnMessage()
+	if C.pn_message_decode(pn, cPtr(data), cLen(data)) < 0 {
+		return fmt.Errorf("decoding message: %s", PnError(C.pn_message_error(pn)))
 	}
+	m.(*message).get(pn)
 	return nil
 }
 
+func (m *message) Decode(data []byte) error {
+	var mc MessageCodec
+	defer mc.Close()
+	return mc.Decode(m, data)
+}
+
 func DecodeMessage(data []byte) (m Message, err error) {
 	m = NewMessage()
 	err = m.Decode(data)
 	return
 }
 
-func (m *message) Encode(buffer []byte) ([]byte, error) {
+// Encode m using buffer. Return the final buffer used to hold m,
+// may be different if the initial buffer was not large enough.
+func (mc *MessageCodec) Encode(m Message, buffer []byte) ([]byte, error) {
+	pn := mc.pnMessage()
+	m.(*message).put(pn)
 	encode := func(buf []byte) ([]byte, error) {
 		len := cLen(buf)
-		result := C.pn_message_encode(m.pn, cPtr(buf), &len)
+		result := C.pn_message_encode(pn, cPtr(buf), &len)
 		switch {
 		case result == C.PN_OVERFLOW:
 			return buf, overflow
@@ -380,50 +395,214 @@ func (m *message) Encode(buffer []byte) ([]byte, error) {
 	return encodeGrow(buffer, encode)
 }
 
+func (m *message) Encode(buffer []byte) ([]byte, error) {
+	var mc MessageCodec
+	defer mc.Close()
+	return mc.Encode(m, buffer)
+}
+
 // TODO aconway 2015-09-14: Multi-section messages.
 
-func (m *message) String() string {
-	str := C.pn_string(C.CString(""))
-	defer C.pn_free(unsafe.Pointer(str))
-	C.pn_inspect(unsafe.Pointer(m.pn), str)
-	return C.GoString(C.pn_string_get(str))
+type ignoreFunc func(v interface{}) bool
+
+func isNil(v interface{}) bool   { return v == nil }
+func isZero(v interface{}) bool  { return v == reflect.Zero(reflect.TypeOf(v)).Interface() }
+func isEmpty(v interface{}) bool { return reflect.ValueOf(v).Len() == 0 }
+
+type stringBuilder struct {
+	bytes.Buffer
+	separator string
 }
 
-// ==== Deprecated functions
-func oldGetAnnotations(data *C.pn_data_t) (v map[string]interface{}) {
-	if C.pn_data_size(data) > 0 {
+func (b *stringBuilder) field(name string, value interface{}, ignore ignoreFunc) {
+	if !ignore(value) {
+		b.WriteString(b.separator)
+		b.separator = ", "
+		b.WriteString(name)
+		b.WriteString(": ")
+		fmt.Fprintf(&b.Buffer, "%v", value)
+	}
+}
+
+// Human-readable string describing message.
+// Includes only message fields with non-default values.
+func (m *message) String() string {
+	var b stringBuilder
+	b.WriteString("Message{")
+	b.field("address", m.address, isEmpty)
+	b.field("durable", m.durable, isZero)
+	// Priority has weird default
+	b.field("priority", m.priority, func(v interface{}) bool { return v.(uint8) == 4 })
+	b.field("ttl", m.ttl, isZero)
+	b.field("first-acquirer", m.firstAcquirer, isZero)
+	b.field("delivery-count", m.deliveryCount, isZero)
+	b.field("message-id", m.messageId, isNil)
+	b.field("user-id", m.userId, isEmpty)
+	b.field("subject", m.subject, isEmpty)
+	b.field("reply-to", m.replyTo, isEmpty)
+	b.field("correlation-id", m.correlationId, isNil)
+	b.field("content-type", m.contentType, isEmpty)
+	b.field("content-encoding", m.contentEncoding, isEmpty)
+	b.field("expiry-time", m.expiryTime, isZero)
+	b.field("creation-time", m.creationTime, isZero)
+	b.field("group-id", m.groupId, isEmpty)
+	b.field("group-sequence", m.groupSequence, isZero)
+	b.field("reply-to-group-id", m.replyToGroupId, isEmpty)
+	b.field("inferred", m.inferred, isZero)
+	b.field("delivery-annotations", m.deliveryAnnotations, isEmpty)
+	b.field("message-annotations", m.messageAnnotations, isEmpty)
+	b.field("application-properties", m.applicationProperties, isEmpty)
+	b.field("body", m.body, isNil)
+	b.WriteString("}")
+	return b.String()
+}
+
+// ==== get message from pn_message_t
+
+func getData(v interface{}, data *C.pn_data_t) {
+	if data != nil && C.pn_data_size(data) > 0 {
 		C.pn_data_rewind(data)
 		C.pn_data_next(data)
-		unmarshal(&v, data)
+		unmarshal(v, data)
+	}
+	return
+}
+
+func getString(c *C.char) string {
+	if c == nil {
+		return ""
+	}
+	return C.GoString(c)
+}
+
+func (m *message) get(pn *C.pn_message_t) {
+	m.Clear()
+	m.inferred = bool(C.pn_message_is_inferred(pn))
+	m.durable = bool(C.pn_message_is_durable(pn))
+	m.priority = uint8(C.pn_message_get_priority(pn))
+	m.ttl = goDuration(C.pn_message_get_ttl(pn))
+	m.firstAcquirer = bool(C.pn_message_is_first_acquirer(pn))
+	m.deliveryCount = uint32(C.pn_message_get_delivery_count(pn))
+	getData(&m.messageId, C.pn_message_id(pn))
+	m.userId = string(goBytes(C.pn_message_get_user_id(pn)))
+	m.address = getString(C.pn_message_get_address(pn))
+	m.subject = getString(C.pn_message_get_subject(pn))
+	m.replyTo = getString(C.pn_message_get_reply_to(pn))
+	getData(&m.correlationId, C.pn_message_correlation_id(pn))
+	m.contentType = getString(C.pn_message_get_content_type(pn))
+	m.contentEncoding = getString(C.pn_message_get_content_encoding(pn))
+	m.expiryTime = goTime(C.pn_message_get_expiry_time(pn))
+	m.creationTime = goTime(C.pn_message_get_creation_time(pn))
+	m.groupId = getString(C.pn_message_get_group_id(pn))
+	m.groupSequence = int32(C.pn_message_get_group_sequence(pn))
+	m.replyToGroupId = getString(C.pn_message_get_reply_to_group_id(pn))
+	getData(&m.deliveryAnnotations, C.pn_message_instructions(pn))
+	getData(&m.messageAnnotations, C.pn_message_annotations(pn))
+	getData(&m.applicationProperties, C.pn_message_properties(pn))
+	getData(&m.body, C.pn_message_body(pn))
+}
+
+// ==== put message to pn_message_t
+
+func putData(v interface{}, pn *C.pn_data_t) {
+	if v != nil {
+		C.pn_data_clear(pn)
+		marshal(v, pn)
+	}
+}
+
+// For pointer-based fields (pn_data_t, strings, bytes) only
+// put a field if it has a non-empty value
+func (m *message) put(pn *C.pn_message_t) {
+	C.pn_message_clear(pn)
+	C.pn_message_set_inferred(pn, C.bool(m.inferred))
+	C.pn_message_set_durable(pn, C.bool(m.durable))
+	C.pn_message_set_priority(pn, C.uint8_t(m.priority))
+	C.pn_message_set_ttl(pn, pnDuration(m.ttl))
+	C.pn_message_set_first_acquirer(pn, C.bool(m.firstAcquirer))
+	C.pn_message_set_delivery_count(pn, C.uint32_t(m.deliveryCount))
+	putData(m.messageId, C.pn_message_id(pn))
+	if m.userId != "" {
+		C.pn_message_set_user_id(pn, pnBytes(([]byte)(m.userId)))
+	}
+	if m.address != "" {
+		C.pn_message_set_address(pn, C.CString(m.address))
+	}
+	if m.subject != "" {
+		C.pn_message_set_subject(pn, C.CString(m.subject))
+	}
+	if m.replyTo != "" {
+		C.pn_message_set_reply_to(pn, C.CString(m.replyTo))
+	}
+	putData(m.correlationId, C.pn_message_correlation_id(pn))
+	if m.contentType != "" {
+		C.pn_message_set_content_type(pn, C.CString(m.contentType))
+	}
+	if m.contentEncoding != "" {
+		C.pn_message_set_content_encoding(pn, C.CString(m.contentEncoding))
+	}
+	C.pn_message_set_expiry_time(pn, pnTime(m.expiryTime))
+	C.pn_message_set_creation_time(pn, pnTime(m.creationTime))
+	if m.groupId != "" {
+		C.pn_message_set_group_id(pn, C.CString(m.groupId))
 	}
-	return v
+	C.pn_message_set_group_sequence(pn, C.pn_sequence_t(m.groupSequence))
+	if m.replyToGroupId != "" {
+		C.pn_message_set_reply_to_group_id(pn, C.CString(m.replyToGroupId))
+	}
+	if len(m.deliveryAnnotations) != 0 {
+		putData(m.deliveryAnnotations, C.pn_message_instructions(pn))
+	}
+	if len(m.messageAnnotations) != 0 {
+		putData(m.messageAnnotations, C.pn_message_annotations(pn))
+	}
+	if len(m.applicationProperties) != 0 {
+		putData(m.applicationProperties, C.pn_message_properties(pn))
+	}
+	putData(m.body, C.pn_message_body(pn))
+}
+
+// ==== Deprecated functions
+
+func oldAnnotations(in map[AnnotationKey]interface{}) (out map[string]interface{}) {
+	if len(in) == 0 {
+		return nil
+	}
+	out = make(map[string]interface{})
+	for k, v := range in {
+		out[k.String()] = v
+	}
+	return
 }
 
 func (m *message) Instructions() map[string]interface{} {
-	return oldGetAnnotations(C.pn_message_instructions(m.pn))
+	return oldAnnotations(m.deliveryAnnotations)
 }
 func (m *message) Annotations() map[string]interface{} {
-	return oldGetAnnotations(C.pn_message_annotations(m.pn))
+	return oldAnnotations(m.messageAnnotations)
 }
 func (m *message) Properties() map[string]interface{} {
-	return oldGetAnnotations(C.pn_message_properties(m.pn))
+	return m.applicationProperties
 }
 
 // Convert old string-keyed annotations to an AnnotationKey map
-func fixAnnotations(old map[string]interface{}) (annotations map[AnnotationKey]interface{}) {
-	annotations = make(map[AnnotationKey]interface{})
-	for k, v := range old {
-		annotations[AnnotationKeyString(k)] = v
+func newAnnotations(in map[string]interface{}) (out map[AnnotationKey]interface{}) {
+	if len(in) == 0 {
+		return nil
+	}
+	out = make(map[AnnotationKey]interface{})
+	for k, v := range in {
+		out[AnnotationKeyString(k)] = v
 	}
 	return
 }
 
 func (m *message) SetInstructions(v map[string]interface{}) {
-	setData(fixAnnotations(v), C.pn_message_instructions(m.pn))
+	m.deliveryAnnotations = newAnnotations(v)
 }
 func (m *message) SetAnnotations(v map[string]interface{}) {
-	setData(fixAnnotations(v), C.pn_message_annotations(m.pn))
+	m.messageAnnotations = newAnnotations(v)
 }
 func (m *message) SetProperties(v map[string]interface{}) {
-	setData(fixAnnotations(v), C.pn_message_properties(m.pn))
+	m.applicationProperties = v
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/886d2b93/go/src/qpid.apache.org/amqp/message_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/amqp/message_test.go b/go/src/qpid.apache.org/amqp/message_test.go
index b22c60d..668ca49 100644
--- a/go/src/qpid.apache.org/amqp/message_test.go
+++ b/go/src/qpid.apache.org/amqp/message_test.go
@@ -58,14 +58,14 @@ func TestDefaultMessage(t *testing.T) {
 		{"ReplyToGroupId", ""},
 		{"MessageId", nil},
 		{"CorrelationId", nil},
-		{"DeliveryAnnotations", map[AnnotationKey]interface{}(nil)},
-		{"MessageAnnotations", map[AnnotationKey]interface{}(nil)},
-		{"ApplicationProperties", map[string]interface{}(nil)},
+		{"DeliveryAnnotations", map[AnnotationKey]interface{}{}},
+		{"MessageAnnotations", map[AnnotationKey]interface{}{}},
+		{"ApplicationProperties", map[string]interface{}{}},
 
 		// Deprecated
 		{"Instructions", map[string]interface{}(nil)},
 		{"Annotations", map[string]interface{}(nil)},
-		{"Properties", map[string]interface{}(nil)},
+		{"Properties", map[string]interface{}{}},
 		{"Body", nil},
 	} {
 		ret := mv.MethodByName(x.method).Call(nil)
@@ -91,7 +91,7 @@ func TestMessageString(t *testing.T) {
 	if err := roundTrip(m); err != nil {
 		t.Error(err)
 	}
-	msgstr := `Message{user_id="user", instructions={:instructions="foo"}, annotations={:annotations="bar"}, properties={"int"=32}, body="hello"}`
+	msgstr := "Message{user-id: user, delivery-annotations: map[instructions:foo], message-annotations: map[annotations:bar], application-properties: map[int:32], body: hello}"
 	if err := checkEqual(msgstr, m.String()); err != nil {
 		t.Error(err)
 	}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org