You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2018/10/11 22:21:42 UTC
[10/12] qpid-proton git commit: PROTON-1910: [go] proton.Link allow
sending/receiving message as bytes
PROTON-1910: [go] proton.Link allow sending/receiving message as bytes
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/ef716fa0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/ef716fa0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/ef716fa0
Branch: refs/heads/master
Commit: ef716fa008e709efbab6ed64edfa53f0361c3262
Parents: 2a84494
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Oct 11 15:19:00 2018 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Oct 11 15:19:00 2018 -0400
----------------------------------------------------------------------
go/src/qpid.apache.org/proton/message.go | 42 ++++++++++++++++++++-------
1 file changed, 32 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ef716fa0/go/src/qpid.apache.org/proton/message.go
----------------------------------------------------------------------
diff --git a/go/src/qpid.apache.org/proton/message.go b/go/src/qpid.apache.org/proton/message.go
index fbb1d48..7c166a5 100644
--- a/go/src/qpid.apache.org/proton/message.go
+++ b/go/src/qpid.apache.org/proton/message.go
@@ -26,9 +26,10 @@ import "C"
import (
"fmt"
- "qpid.apache.org/amqp"
"strconv"
"sync/atomic"
+
+ "qpid.apache.org/amqp"
)
// HasMessage is true if all message data is available.
@@ -41,7 +42,23 @@ func (d Delivery) HasMessage() bool { return !d.IsNil() && d.Readable() && !d.Pa
// handling an MMessage event is always a safe context to call this function.
//
// Will return an error if message is incomplete or not current.
-func (delivery Delivery) Message() (m amqp.Message, err error) {
+func (delivery Delivery) Message() (amqp.Message, error) {
+ var err error
+ if bytes, err := delivery.MessageBytes(); err == nil {
+ m := amqp.NewMessage()
+ err = m.Decode(bytes)
+ return m, err
+ }
+ return nil, err
+}
+
+// MessageBytes extracts the raw message bytes contained in a delivery.
+//
+// Must be called in the correct link context with this delivery as the current message,
+// handling an MMessage event is always a safe context to call this function.
+//
+// Will return an error if message is incomplete or not current.
+func (delivery Delivery) MessageBytes() ([]byte, error) {
if !delivery.Readable() {
return nil, fmt.Errorf("delivery is not readable")
}
@@ -53,9 +70,7 @@ func (delivery Delivery) Message() (m amqp.Message, err error) {
if result != len(data) {
return nil, fmt.Errorf("cannot receive message: %s", PnErrorCode(result))
}
- m = amqp.NewMessage()
- err = m.Decode(data)
- return
+ return data, nil
}
// Process-wide atomic counter for generating tag names
@@ -68,15 +83,22 @@ func nextTag() string {
// Send sends a amqp.Message over a Link.
// Returns a Delivery that can be use to determine the outcome of the message.
func (link Link) Send(m amqp.Message) (Delivery, error) {
+ var err error
+ if bytes, err := m.Encode(nil); err == nil {
+ if d, err := link.SendMessageBytes(bytes); err == nil {
+ return d, err
+ }
+ }
+ return Delivery{}, err
+}
+
+// SendMessageBytes sends encoded bytes of an amqp.Message over a Link.
+// Returns a Delivery that can be use to determine the outcome of the message.
+func (link Link) SendMessageBytes(bytes []byte) (Delivery, error) {
if !link.IsSender() {
return Delivery{}, fmt.Errorf("attempt to send message on receiving link")
}
-
delivery := link.Delivery(nextTag())
- bytes, err := m.Encode(nil)
- if err != nil {
- return Delivery{}, fmt.Errorf("cannot send message %s", err)
- }
result := link.SendBytes(bytes)
link.Advance()
if result != len(bytes) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org