You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/10/11 22:21:43 UTC
[11/12] qpid-proton git commit: PROTON-1910: [go] native Message
implementation
PROTON-1910: [go] native Message implementation
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/886d2b93
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/886d2b93
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/886d2b93
Branch: refs/heads/master
Commit: 886d2b9349f74c02219b29990ee04278124f5224
Parents: ef716fa
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Oct 11 15:20:23 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 11 15:20:23 2018 -0400
----------------------------------------------------------------------
go/src/qpid.apache.org/amqp/message.go | 521 +++++++++++++++--------
go/src/qpid.apache.org/amqp/message_test.go | 10 +-
2 files changed, 355 insertions(+), 176 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/886d2b93/go/src/qpid.apache.org/amqp/message.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/amqp/message.go b/go/src/qpid.apache.org/amqp/message.go
index e514b26..919904c 100644
--- a/go/src/qpid.apache.org/amqp/message.go
+++ b/go/src/qpid.apache.org/amqp/message.go
@@ -35,10 +35,10 @@ package amqp
import "C"
import (
+ "bytes"
"fmt"
- "runtime"
+ "reflect"
"time"
- "unsafe"
)
// Message is the interface to an AMQP message.
@@ -124,11 +124,12 @@ type Message interface {
// Per-delivery annotations to provide delivery instructions.
// May be added or removed by intermediaries during delivery.
+ // See ApplicationProperties() for properties set by the application.
DeliveryAnnotations() map[AnnotationKey]interface{}
SetDeliveryAnnotations(map[AnnotationKey]interface{})
// Message annotations added as part of the bare message at creation, usually
- // by an AMQP library. See ApplicationProperties() for adding application data.
+ // by an AMQP library. See ApplicationProperties() for properties set by the application.
MessageAnnotations() map[AnnotationKey]interface{}
SetMessageAnnotations(map[AnnotationKey]interface{})
@@ -141,15 +142,18 @@ type Message interface {
Inferred() bool
SetInferred(bool)
- // Marshal a Go value into the message body. See amqp.Marshal() for details.
+ // Get the message body, using the amqp.Unmarshal() rules for interface{}
+ Body() interface{}
+
+ // Set the body using amqp.Marshal()
+ SetBody(interface{})
+
+ // Marshal a Go value into the message body, synonym for SetBody()
Marshal(interface{})
- // Unmarshal the message body into the value pointed to by v. See amqp.Unmarshal() for details.
+ // Unmarshal the message body into the value pointed to by v. See amqp.Unmarshal()
Unmarshal(interface{})
- // Body value resulting from the default unmarshaling of message body as interface{}
- Body() interface{}
-
// Encode encodes the message as AMQP data. If buffer is non-nil and is large enough
// the message is encoded into it, otherwise a new buffer is created.
// Returns the buffer containing the message.
@@ -158,7 +162,7 @@ type Message interface {
// Decode data into this message. Overwrites an existing message content.
Decode(buffer []byte) error
- // Clear the message contents.
+ // Clear the message contents, set all fields to the default value.
Clear()
// Copy the contents of another message to this one.
@@ -180,194 +184,205 @@ type Message interface {
String() string
}
-type message struct{ pn *C.pn_message_t }
-
-func freeMessage(m *message) {
- C.pn_message_free(m.pn)
- m.pn = nil
-}
-
// NewMessage creates a new message instance.
func NewMessage() Message {
- m := &message{C.pn_message()}
- runtime.SetFinalizer(m, freeMessage)
+ m := &message{}
+ m.Clear()
return m
}
-// NewMessageWith creates a message with value as the body. Equivalent to
-// m := NewMessage(); m.Marshal(body)
+// NewMessageWith creates a message with value as the body.
func NewMessageWith(value interface{}) Message {
m := NewMessage()
- m.Marshal(value)
+ m.SetBody(value)
return m
}
-func (m *message) Clear() { C.pn_message_clear(m.pn) }
-
-func (m *message) Copy(x Message) error {
- if data, err := x.Encode(nil); err == nil {
- return m.Decode(data)
- } else {
- return err
- }
-}
-
-// ==== message get functions
-
-func rewindGet(data *C.pn_data_t) (v interface{}) {
- C.pn_data_rewind(data)
- C.pn_data_next(data)
- unmarshal(&v, data)
- return v
-}
-
-func (m *message) Inferred() bool { return bool(C.pn_message_is_inferred(m.pn)) }
-func (m *message) Durable() bool { return bool(C.pn_message_is_durable(m.pn)) }
-func (m *message) Priority() uint8 { return uint8(C.pn_message_get_priority(m.pn)) }
-func (m *message) TTL() time.Duration {
- return time.Duration(C.pn_message_get_ttl(m.pn)) * time.Millisecond
+// NewMessageCopy creates a copy of an existing message.
+func NewMessageCopy(m Message) Message {
+ m2 := NewMessage()
+ m2.Copy(m)
+ return m2
}
-func (m *message) FirstAcquirer() bool { return bool(C.pn_message_is_first_acquirer(m.pn)) }
-func (m *message) DeliveryCount() uint32 { return uint32(C.pn_message_get_delivery_count(m.pn)) }
-func (m *message) MessageId() interface{} { return rewindGet(C.pn_message_id(m.pn)) }
-func (m *message) UserId() string { return goString(C.pn_message_get_user_id(m.pn)) }
-func (m *message) Address() string { return C.GoString(C.pn_message_get_address(m.pn)) }
-func (m *message) Subject() string { return C.GoString(C.pn_message_get_subject(m.pn)) }
-func (m *message) ReplyTo() string { return C.GoString(C.pn_message_get_reply_to(m.pn)) }
-func (m *message) CorrelationId() interface{} { return rewindGet(C.pn_message_correlation_id(m.pn)) }
-func (m *message) ContentType() string { return C.GoString(C.pn_message_get_content_type(m.pn)) }
-func (m *message) ContentEncoding() string { return C.GoString(C.pn_message_get_content_encoding(m.pn)) }
-func (m *message) ExpiryTime() time.Time {
- return time.Unix(0, int64(time.Millisecond*time.Duration(C.pn_message_get_expiry_time(m.pn))))
-}
-func (m *message) CreationTime() time.Time {
- return time.Unix(0, int64(time.Millisecond)*int64(C.pn_message_get_creation_time(m.pn)))
-}
-func (m *message) GroupId() string { return C.GoString(C.pn_message_get_group_id(m.pn)) }
-func (m *message) GroupSequence() int32 { return int32(C.pn_message_get_group_sequence(m.pn)) }
-func (m *message) ReplyToGroupId() string { return C.GoString(C.pn_message_get_reply_to_group_id(m.pn)) }
+// Reset message to all default values
+func (m *message) Clear() { *m = message{priority: 4} }
-func getAnnotations(data *C.pn_data_t) (v map[AnnotationKey]interface{}) {
- if C.pn_data_size(data) > 0 {
- C.pn_data_rewind(data)
- C.pn_data_next(data)
- unmarshal(&v, data)
+// Copy makes a deep copy of message x
+func (m *message) Copy(x Message) error {
+ var mc MessageCodec
+ bytes, err := mc.Encode(x, nil)
+ if err == nil {
+ err = mc.Decode(m, bytes)
}
- return v
-}
+ return err
+}
+
+type message struct {
+ address string
+ applicationProperties map[string]interface{}
+ contentEncoding string
+ contentType string
+ correlationId interface{}
+ creationTime time.Time
+ deliveryAnnotations map[AnnotationKey]interface{}
+ deliveryCount uint32
+ durable bool
+ expiryTime time.Time
+ firstAcquirer bool
+ groupId string
+ groupSequence int32
+ inferred bool
+ messageAnnotations map[AnnotationKey]interface{}
+ messageId interface{}
+ priority uint8
+ replyTo string
+ replyToGroupId string
+ subject string
+ ttl time.Duration
+ userId string
+ body interface{}
+ // Keep the original data to support Unmarshal to a non-interface{} type
+ // Waste of memory, consider deprecating or making it optional.
+ pnBody *C.pn_data_t
+}
+
+// ==== message get methods
+func (m *message) Body() interface{} { return m.body }
+func (m *message) Inferred() bool { return m.inferred }
+func (m *message) Durable() bool { return m.durable }
+func (m *message) Priority() uint8 { return m.priority }
+func (m *message) TTL() time.Duration { return m.ttl }
+func (m *message) FirstAcquirer() bool { return m.firstAcquirer }
+func (m *message) DeliveryCount() uint32 { return m.deliveryCount }
+func (m *message) MessageId() interface{} { return m.messageId }
+func (m *message) UserId() string { return m.userId }
+func (m *message) Address() string { return m.address }
+func (m *message) Subject() string { return m.subject }
+func (m *message) ReplyTo() string { return m.replyTo }
+func (m *message) CorrelationId() interface{} { return m.correlationId }
+func (m *message) ContentType() string { return m.contentType }
+func (m *message) ContentEncoding() string { return m.contentEncoding }
+func (m *message) ExpiryTime() time.Time { return m.expiryTime }
+func (m *message) CreationTime() time.Time { return m.creationTime }
+func (m *message) GroupId() string { return m.groupId }
+func (m *message) GroupSequence() int32 { return m.groupSequence }
+func (m *message) ReplyToGroupId() string { return m.replyToGroupId }
func (m *message) DeliveryAnnotations() map[AnnotationKey]interface{} {
- return getAnnotations(C.pn_message_instructions(m.pn))
+ if m.deliveryAnnotations == nil {
+ m.deliveryAnnotations = make(map[AnnotationKey]interface{})
+ }
+ return m.deliveryAnnotations
}
func (m *message) MessageAnnotations() map[AnnotationKey]interface{} {
- return getAnnotations(C.pn_message_annotations(m.pn))
+ if m.messageAnnotations == nil {
+ m.messageAnnotations = make(map[AnnotationKey]interface{})
+ }
+ return m.messageAnnotations
}
-
func (m *message) ApplicationProperties() map[string]interface{} {
- var v map[string]interface{}
- data := C.pn_message_properties(m.pn)
- if C.pn_data_size(data) > 0 {
- C.pn_data_rewind(data)
- C.pn_data_next(data)
- unmarshal(&v, data)
+ if m.applicationProperties == nil {
+ m.applicationProperties = make(map[string]interface{})
}
- return v
+ return m.applicationProperties
}
// ==== message set methods
-func setData(v interface{}, data *C.pn_data_t) {
- C.pn_data_clear(data)
- marshal(v, data)
-}
+func (m *message) SetBody(v interface{}) { m.body = v }
+func (m *message) SetInferred(x bool) { m.inferred = x }
+func (m *message) SetDurable(x bool) { m.durable = x }
+func (m *message) SetPriority(x uint8) { m.priority = x }
+func (m *message) SetTTL(x time.Duration) { m.ttl = x }
+func (m *message) SetFirstAcquirer(x bool) { m.firstAcquirer = x }
+func (m *message) SetDeliveryCount(x uint32) { m.deliveryCount = x }
+func (m *message) SetMessageId(x interface{}) { m.messageId = x }
+func (m *message) SetUserId(x string) { m.userId = x }
+func (m *message) SetAddress(x string) { m.address = x }
+func (m *message) SetSubject(x string) { m.subject = x }
+func (m *message) SetReplyTo(x string) { m.replyTo = x }
+func (m *message) SetCorrelationId(x interface{}) { m.correlationId = x }
+func (m *message) SetContentType(x string) { m.contentType = x }
+func (m *message) SetContentEncoding(x string) { m.contentEncoding = x }
+func (m *message) SetExpiryTime(x time.Time) { m.expiryTime = x }
+func (m *message) SetCreationTime(x time.Time) { m.creationTime = x }
+func (m *message) SetGroupId(x string) { m.groupId = x }
+func (m *message) SetGroupSequence(x int32) { m.groupSequence = x }
+func (m *message) SetReplyToGroupId(x string) { m.replyToGroupId = x }
+
+func (m *message) SetDeliveryAnnotations(x map[AnnotationKey]interface{}) {
+ m.deliveryAnnotations = x
+}
+func (m *message) SetMessageAnnotations(x map[AnnotationKey]interface{}) {
+ m.messageAnnotations = x
+}
+func (m *message) SetApplicationProperties(x map[string]interface{}) {
+ m.applicationProperties = x
+}
+
+// Marshal body from v, same as SetBody(v). See amqp.Marshal.
+func (m *message) Marshal(v interface{}) { m.body = v }
-func (m *message) SetInferred(b bool) { C.pn_message_set_inferred(m.pn, C.bool(b)) }
-func (m *message) SetDurable(b bool) { C.pn_message_set_durable(m.pn, C.bool(b)) }
-func (m *message) SetPriority(b uint8) { C.pn_message_set_priority(m.pn, C.uint8_t(b)) }
-func (m *message) SetTTL(d time.Duration) {
- C.pn_message_set_ttl(m.pn, C.pn_millis_t(d/time.Millisecond))
-}
-func (m *message) SetFirstAcquirer(b bool) { C.pn_message_set_first_acquirer(m.pn, C.bool(b)) }
-func (m *message) SetDeliveryCount(c uint32) { C.pn_message_set_delivery_count(m.pn, C.uint32_t(c)) }
-func (m *message) SetMessageId(id interface{}) { setData(id, C.pn_message_id(m.pn)) }
-func (m *message) SetUserId(s string) { C.pn_message_set_user_id(m.pn, pnBytes(([]byte)(s))) }
-func (m *message) SetAddress(s string) {
- C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_address))
-}
-func (m *message) SetSubject(s string) {
- C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_subject))
-}
-func (m *message) SetReplyTo(s string) {
- C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to))
-}
-func (m *message) SetCorrelationId(c interface{}) { setData(c, C.pn_message_correlation_id(m.pn)) }
-func (m *message) SetContentType(s string) {
- C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_type))
-}
-func (m *message) SetContentEncoding(s string) {
- C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_content_encoding))
-}
-func (m *message) SetExpiryTime(t time.Time) { C.pn_message_set_expiry_time(m.pn, pnTime(t)) }
-func (m *message) SetCreationTime(t time.Time) { C.pn_message_set_creation_time(m.pn, pnTime(t)) }
-func (m *message) SetGroupId(s string) {
- C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_group_id))
-}
-func (m *message) SetGroupSequence(s int32) {
- C.pn_message_set_group_sequence(m.pn, C.pn_sequence_t(s))
-}
-func (m *message) SetReplyToGroupId(s string) {
- C.msg_set_str(m.pn, C.CString(s), C.set_fn(C.pn_message_set_reply_to_group_id))
+func (m *message) Unmarshal(v interface{}) {
+ // FIXME aconway 2018-09-28: this is inefficient, replace with a
+ // reflective conversion from the existing body value that respects
+ // the Unmarshal() rules.
+ pnData := C.pn_data(2)
+ marshal(m.body, pnData)
+ unmarshal(v, pnData)
}
-func (m *message) SetDeliveryAnnotations(v map[AnnotationKey]interface{}) {
- setData(v, C.pn_message_instructions(m.pn))
-}
-func (m *message) SetMessageAnnotations(v map[AnnotationKey]interface{}) {
- setData(v, C.pn_message_annotations(m.pn))
+// Internal use only
+type MessageCodec struct {
+ pn *C.pn_message_t // Cache a pn_message_t to speed up encode/decode
+ // Optionally remember a byte buffer to use with MessageCodec methods.
+ Buffer []byte
}
-func (m *message) SetApplicationProperties(v map[string]interface{}) {
- setData(v, C.pn_message_properties(m.pn))
-}
-
-// Marshal body from v
-func (m *message) Marshal(v interface{}) { clearMarshal(v, C.pn_message_body(m.pn)) }
-// Unmarshal body to v, which must be a pointer to a value. See amqp.Unmarshal
-func (m *message) Unmarshal(v interface{}) {
- data := C.pn_message_body(m.pn)
- if C.pn_data_size(data) > 0 {
- C.pn_data_rewind(data)
- C.pn_data_next(data)
- unmarshal(v, data)
+func (mc *MessageCodec) pnMessage() *C.pn_message_t {
+ if mc.pn == nil {
+ mc.pn = C.pn_message()
}
- return
+ return mc.pn
}
-// Return the body value as an interface
-func (m *message) Body() (v interface{}) { m.Unmarshal(&v); return }
-
-func (m *message) Decode(data []byte) error {
- m.Clear()
- if len(data) == 0 {
- return fmt.Errorf("empty buffer for decode")
+func (mc *MessageCodec) Close() {
+ if mc.pn != nil {
+ C.pn_message_free(mc.pn)
+ mc.pn = nil
}
- if C.pn_message_decode(m.pn, cPtr(data), cLen(data)) < 0 {
- return fmt.Errorf("decoding message: %s", PnError(C.pn_message_error(m.pn)))
+}
+
+func (mc *MessageCodec) Decode(m Message, data []byte) error {
+ pn := mc.pnMessage()
+ if C.pn_message_decode(pn, cPtr(data), cLen(data)) < 0 {
+ return fmt.Errorf("decoding message: %s", PnError(C.pn_message_error(pn)))
}
+ m.(*message).get(pn)
return nil
}
+func (m *message) Decode(data []byte) error {
+ var mc MessageCodec
+ defer mc.Close()
+ return mc.Decode(m, data)
+}
+
func DecodeMessage(data []byte) (m Message, err error) {
m = NewMessage()
err = m.Decode(data)
return
}
-func (m *message) Encode(buffer []byte) ([]byte, error) {
+// Encode m using buffer. Return the final buffer used to hold m,
+// may be different if the initial buffer was not large enough.
+func (mc *MessageCodec) Encode(m Message, buffer []byte) ([]byte, error) {
+ pn := mc.pnMessage()
+ m.(*message).put(pn)
encode := func(buf []byte) ([]byte, error) {
len := cLen(buf)
- result := C.pn_message_encode(m.pn, cPtr(buf), &len)
+ result := C.pn_message_encode(pn, cPtr(buf), &len)
switch {
case result == C.PN_OVERFLOW:
return buf, overflow
@@ -380,50 +395,214 @@ func (m *message) Encode(buffer []byte) ([]byte, error) {
return encodeGrow(buffer, encode)
}
+func (m *message) Encode(buffer []byte) ([]byte, error) {
+ var mc MessageCodec
+ defer mc.Close()
+ return mc.Encode(m, buffer)
+}
+
// TODO aconway 2015-09-14: Multi-section messages.
-func (m *message) String() string {
- str := C.pn_string(C.CString(""))
- defer C.pn_free(unsafe.Pointer(str))
- C.pn_inspect(unsafe.Pointer(m.pn), str)
- return C.GoString(C.pn_string_get(str))
+type ignoreFunc func(v interface{}) bool
+
+func isNil(v interface{}) bool { return v == nil }
+func isZero(v interface{}) bool { return v == reflect.Zero(reflect.TypeOf(v)).Interface() }
+func isEmpty(v interface{}) bool { return reflect.ValueOf(v).Len() == 0 }
+
+type stringBuilder struct {
+ bytes.Buffer
+ separator string
}
-// ==== Deprecated functions
-func oldGetAnnotations(data *C.pn_data_t) (v map[string]interface{}) {
- if C.pn_data_size(data) > 0 {
+func (b *stringBuilder) field(name string, value interface{}, ignore ignoreFunc) {
+ if !ignore(value) {
+ b.WriteString(b.separator)
+ b.separator = ", "
+ b.WriteString(name)
+ b.WriteString(": ")
+ fmt.Fprintf(&b.Buffer, "%v", value)
+ }
+}
+
+// Human-readable string describing message.
+// Includes only message fields with non-default values.
+func (m *message) String() string {
+ var b stringBuilder
+ b.WriteString("Message{")
+ b.field("address", m.address, isEmpty)
+ b.field("durable", m.durable, isZero)
+ // Priority has weird default
+ b.field("priority", m.priority, func(v interface{}) bool { return v.(uint8) == 4 })
+ b.field("ttl", m.ttl, isZero)
+ b.field("first-acquirer", m.firstAcquirer, isZero)
+ b.field("delivery-count", m.deliveryCount, isZero)
+ b.field("message-id", m.messageId, isNil)
+ b.field("user-id", m.userId, isEmpty)
+ b.field("subject", m.subject, isEmpty)
+ b.field("reply-to", m.replyTo, isEmpty)
+ b.field("correlation-id", m.correlationId, isNil)
+ b.field("content-type", m.contentType, isEmpty)
+ b.field("content-encoding", m.contentEncoding, isEmpty)
+ b.field("expiry-time", m.expiryTime, isZero)
+ b.field("creation-time", m.creationTime, isZero)
+ b.field("group-id", m.groupId, isEmpty)
+ b.field("group-sequence", m.groupSequence, isZero)
+ b.field("reply-to-group-id", m.replyToGroupId, isEmpty)
+ b.field("inferred", m.inferred, isZero)
+ b.field("delivery-annotations", m.deliveryAnnotations, isEmpty)
+ b.field("message-annotations", m.messageAnnotations, isEmpty)
+ b.field("application-properties", m.applicationProperties, isEmpty)
+ b.field("body", m.body, isNil)
+ b.WriteString("}")
+ return b.String()
+}
+
+// ==== get message from pn_message_t
+
+func getData(v interface{}, data *C.pn_data_t) {
+ if data != nil && C.pn_data_size(data) > 0 {
C.pn_data_rewind(data)
C.pn_data_next(data)
- unmarshal(&v, data)
+ unmarshal(v, data)
+ }
+ return
+}
+
+func getString(c *C.char) string {
+ if c == nil {
+ return ""
+ }
+ return C.GoString(c)
+}
+
+func (m *message) get(pn *C.pn_message_t) {
+ m.Clear()
+ m.inferred = bool(C.pn_message_is_inferred(pn))
+ m.durable = bool(C.pn_message_is_durable(pn))
+ m.priority = uint8(C.pn_message_get_priority(pn))
+ m.ttl = goDuration(C.pn_message_get_ttl(pn))
+ m.firstAcquirer = bool(C.pn_message_is_first_acquirer(pn))
+ m.deliveryCount = uint32(C.pn_message_get_delivery_count(pn))
+ getData(&m.messageId, C.pn_message_id(pn))
+ m.userId = string(goBytes(C.pn_message_get_user_id(pn)))
+ m.address = getString(C.pn_message_get_address(pn))
+ m.subject = getString(C.pn_message_get_subject(pn))
+ m.replyTo = getString(C.pn_message_get_reply_to(pn))
+ getData(&m.correlationId, C.pn_message_correlation_id(pn))
+ m.contentType = getString(C.pn_message_get_content_type(pn))
+ m.contentEncoding = getString(C.pn_message_get_content_encoding(pn))
+ m.expiryTime = goTime(C.pn_message_get_expiry_time(pn))
+ m.creationTime = goTime(C.pn_message_get_creation_time(pn))
+ m.groupId = getString(C.pn_message_get_group_id(pn))
+ m.groupSequence = int32(C.pn_message_get_group_sequence(pn))
+ m.replyToGroupId = getString(C.pn_message_get_reply_to_group_id(pn))
+ getData(&m.deliveryAnnotations, C.pn_message_instructions(pn))
+ getData(&m.messageAnnotations, C.pn_message_annotations(pn))
+ getData(&m.applicationProperties, C.pn_message_properties(pn))
+ getData(&m.body, C.pn_message_body(pn))
+}
+
+// ==== put message to pn_message_t
+
+func putData(v interface{}, pn *C.pn_data_t) {
+ if v != nil {
+ C.pn_data_clear(pn)
+ marshal(v, pn)
+ }
+}
+
+// For pointer-based fields (pn_data_t, strings, bytes) only
+// put a field if it has a non-empty value
+func (m *message) put(pn *C.pn_message_t) {
+ C.pn_message_clear(pn)
+ C.pn_message_set_inferred(pn, C.bool(m.inferred))
+ C.pn_message_set_durable(pn, C.bool(m.durable))
+ C.pn_message_set_priority(pn, C.uint8_t(m.priority))
+ C.pn_message_set_ttl(pn, pnDuration(m.ttl))
+ C.pn_message_set_first_acquirer(pn, C.bool(m.firstAcquirer))
+ C.pn_message_set_delivery_count(pn, C.uint32_t(m.deliveryCount))
+ putData(m.messageId, C.pn_message_id(pn))
+ if m.userId != "" {
+ C.pn_message_set_user_id(pn, pnBytes(([]byte)(m.userId)))
+ }
+ if m.address != "" {
+ C.pn_message_set_address(pn, C.CString(m.address))
+ }
+ if m.subject != "" {
+ C.pn_message_set_subject(pn, C.CString(m.subject))
+ }
+ if m.replyTo != "" {
+ C.pn_message_set_reply_to(pn, C.CString(m.replyTo))
+ }
+ putData(m.correlationId, C.pn_message_correlation_id(pn))
+ if m.contentType != "" {
+ C.pn_message_set_content_type(pn, C.CString(m.contentType))
+ }
+ if m.contentEncoding != "" {
+ C.pn_message_set_content_encoding(pn, C.CString(m.contentEncoding))
+ }
+ C.pn_message_set_expiry_time(pn, pnTime(m.expiryTime))
+ C.pn_message_set_creation_time(pn, pnTime(m.creationTime))
+ if m.groupId != "" {
+ C.pn_message_set_group_id(pn, C.CString(m.groupId))
}
- return v
+ C.pn_message_set_group_sequence(pn, C.pn_sequence_t(m.groupSequence))
+ if m.replyToGroupId != "" {
+ C.pn_message_set_reply_to_group_id(pn, C.CString(m.replyToGroupId))
+ }
+ if len(m.deliveryAnnotations) != 0 {
+ putData(m.deliveryAnnotations, C.pn_message_instructions(pn))
+ }
+ if len(m.messageAnnotations) != 0 {
+ putData(m.messageAnnotations, C.pn_message_annotations(pn))
+ }
+ if len(m.applicationProperties) != 0 {
+ putData(m.applicationProperties, C.pn_message_properties(pn))
+ }
+ putData(m.body, C.pn_message_body(pn))
+}
+
+// ==== Deprecated functions
+
+func oldAnnotations(in map[AnnotationKey]interface{}) (out map[string]interface{}) {
+ if len(in) == 0 {
+ return nil
+ }
+ out = make(map[string]interface{})
+ for k, v := range in {
+ out[k.String()] = v
+ }
+ return
}
func (m *message) Instructions() map[string]interface{} {
- return oldGetAnnotations(C.pn_message_instructions(m.pn))
+ return oldAnnotations(m.deliveryAnnotations)
}
func (m *message) Annotations() map[string]interface{} {
- return oldGetAnnotations(C.pn_message_annotations(m.pn))
+ return oldAnnotations(m.messageAnnotations)
}
func (m *message) Properties() map[string]interface{} {
- return oldGetAnnotations(C.pn_message_properties(m.pn))
+ return m.applicationProperties
}
// Convert old string-keyed annotations to an AnnotationKey map
-func fixAnnotations(old map[string]interface{}) (annotations map[AnnotationKey]interface{}) {
- annotations = make(map[AnnotationKey]interface{})
- for k, v := range old {
- annotations[AnnotationKeyString(k)] = v
+func newAnnotations(in map[string]interface{}) (out map[AnnotationKey]interface{}) {
+ if len(in) == 0 {
+ return nil
+ }
+ out = make(map[AnnotationKey]interface{})
+ for k, v := range in {
+ out[AnnotationKeyString(k)] = v
}
return
}
func (m *message) SetInstructions(v map[string]interface{}) {
- setData(fixAnnotations(v), C.pn_message_instructions(m.pn))
+ m.deliveryAnnotations = newAnnotations(v)
}
func (m *message) SetAnnotations(v map[string]interface{}) {
- setData(fixAnnotations(v), C.pn_message_annotations(m.pn))
+ m.messageAnnotations = newAnnotations(v)
}
func (m *message) SetProperties(v map[string]interface{}) {
- setData(fixAnnotations(v), C.pn_message_properties(m.pn))
+ m.applicationProperties = v
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/886d2b93/go/src/qpid.apache.org/amqp/message_test.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/amqp/message_test.go b/go/src/qpid.apache.org/amqp/message_test.go
index b22c60d..668ca49 100644
--- a/go/src/qpid.apache.org/amqp/message_test.go
+++ b/go/src/qpid.apache.org/amqp/message_test.go
@@ -58,14 +58,14 @@ func TestDefaultMessage(t *testing.T) {
{"ReplyToGroupId", ""},
{"MessageId", nil},
{"CorrelationId", nil},
- {"DeliveryAnnotations", map[AnnotationKey]interface{}(nil)},
- {"MessageAnnotations", map[AnnotationKey]interface{}(nil)},
- {"ApplicationProperties", map[string]interface{}(nil)},
+ {"DeliveryAnnotations", map[AnnotationKey]interface{}{}},
+ {"MessageAnnotations", map[AnnotationKey]interface{}{}},
+ {"ApplicationProperties", map[string]interface{}{}},
// Deprecated
{"Instructions", map[string]interface{}(nil)},
{"Annotations", map[string]interface{}(nil)},
- {"Properties", map[string]interface{}(nil)},
+ {"Properties", map[string]interface{}{}},
{"Body", nil},
} {
ret := mv.MethodByName(x.method).Call(nil)
@@ -91,7 +91,7 @@ func TestMessageString(t *testing.T) {
if err := roundTrip(m); err != nil {
t.Error(err)
}
- msgstr := `Message{user_id="user", instructions={:instructions="foo"}, annotations={:annotations="bar"}, properties={"int"=32}, body="hello"}`
+ msgstr := "Message{user-id: user, delivery-annotations: map[instructions:foo], message-annotations: map[annotations:bar], application-properties: map[int:32], body: hello}"
if err := checkEqual(msgstr, m.String()); err != nil {
t.Error(err)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org