You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/06/09 01:26:11 UTC
[48/50] [abbrv] qpid-proton git commit: PROTON-1450: go electron
filter support
PROTON-1450: go electron filter support
electron.Filter() creates LinkOption to set filter, LinkSettings.Filter() gets filter.
proton.Data.Marshal()/Unmarshal() allow setting/getting proton.Data content.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/c31e2ecf
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/c31e2ecf
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/c31e2ecf
Branch: refs/heads/go1
Commit: c31e2ecf08fa3e5b46dc5c00a81049ec0d555196
Parents: 4cf899b
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Jun 8 21:05:51 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Jun 8 21:18:47 2017 -0400
----------------------------------------------------------------------
.../go/src/qpid.apache.org/amqp/marshal.go | 33 ++++++++-------
.../go/src/qpid.apache.org/amqp/message.go | 8 ----
.../go/src/qpid.apache.org/amqp/unmarshal.go | 7 ++++
.../go/src/qpid.apache.org/electron/link.go | 42 +++++++++++++++-----
.../src/qpid.apache.org/electron/link_test.go | 5 ++-
.../go/src/qpid.apache.org/proton/wrappers.go | 37 ++++++++++++-----
6 files changed, 89 insertions(+), 43 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c31e2ecf/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go
index dad3d25..ca5e380 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go
@@ -50,6 +50,16 @@ func dataMarshalError(v interface{}, data *C.pn_data_t) error {
return nil
}
+func recoverMarshal(err *error) {
+ if r := recover(); r != nil {
+ if merr, ok := r.(*MarshalError); ok {
+ *err = merr
+ } else {
+ panic(r)
+ }
+ }
+}
+
/*
Marshal encodes a Go value as AMQP data in buffer.
If buffer is nil, or is not large enough, a new buffer is created.
@@ -99,16 +109,7 @@ Go types: struct, complex64/128.
AMQP types: decimal32/64/128, char, timestamp, uuid, array.
*/
func Marshal(v interface{}, buffer []byte) (outbuf []byte, err error) {
- defer func() {
- if r := recover(); r != nil {
- if merr, ok := r.(*MarshalError); ok {
- err = merr
- } else {
- panic(r)
- }
- }
- }()
-
+ defer recoverMarshal(&err)
data := C.pn_data(0)
defer C.pn_data_free(data)
marshal(v, data)
@@ -126,6 +127,13 @@ func Marshal(v interface{}, buffer []byte) (outbuf []byte, err error) {
return encodeGrow(buffer, encode)
}
+// Internal
+func MarshalUnsafe(v interface{}, pn_data unsafe.Pointer) (err error) {
+ defer recoverMarshal(&err)
+ marshal(v, (*C.pn_data_t)(pn_data))
+ return
+}
+
const minEncode = 256
// overflow is returned when an encoding function can't fit data in the buffer.
@@ -271,8 +279,3 @@ func (e *Encoder) Encode(v interface{}) (err error) {
}
return err
}
-
-func replace(data *C.pn_data_t, v interface{}) {
- C.pn_data_clear(data)
- marshal(v, data)
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c31e2ecf/proton-c/bindings/go/src/qpid.apache.org/amqp/message.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/message.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/message.go
index f2270f0..d1ad2eb 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/amqp/message.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/message.go
@@ -38,7 +38,6 @@ import (
"fmt"
"runtime"
"time"
- "unsafe"
)
// Message is the interface to an AMQP message.
@@ -275,13 +274,6 @@ func setData(v interface{}, data *C.pn_data_t) {
marshal(v, data)
}
-func dataString(data *C.pn_data_t) string {
- str := C.pn_string(C.CString(""))
- defer C.pn_free(unsafe.Pointer(str))
- C.pn_inspect(unsafe.Pointer(data), str)
- return C.GoString(C.pn_string_get(str))
-}
-
func (m *message) SetInferred(b bool) { C.pn_message_set_inferred(m.pn, C.bool(b)) }
func (m *message) SetDurable(b bool) { C.pn_message_set_durable(m.pn, C.bool(b)) }
func (m *message) SetPriority(b uint8) { C.pn_message_set_priority(m.pn, C.uint8_t(b)) }
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c31e2ecf/proton-c/bindings/go/src/qpid.apache.org/amqp/unmarshal.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/unmarshal.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/unmarshal.go
index f679abc..253d66d 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/amqp/unmarshal.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/unmarshal.go
@@ -223,6 +223,13 @@ func Unmarshal(bytes []byte, v interface{}) (n int, err error) {
return n, nil
}
+// Internal
+func UnmarshalUnsafe(pn_data unsafe.Pointer, v interface{}) (err error) {
+ defer recoverUnmarshal(&err)
+ unmarshal(v, (*C.pn_data_t)(pn_data))
+ return
+}
+
// more reads more data when we can't parse a complete AMQP type
func (d *Decoder) more() error {
var readSize int64 = minDecode
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c31e2ecf/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
index 4f927c1..de8a995 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
@@ -21,6 +21,7 @@ package electron
import (
"fmt"
+ "qpid.apache.org/amqp"
"qpid.apache.org/proton"
"time"
)
@@ -52,6 +53,9 @@ type LinkSettings interface {
// Session containing the Link
Session() Session
+ // Filter for the link
+ Filter() map[amqp.Symbol]interface{}
+
// Advanced settings for the source
SourceSettings() TerminusSettings
@@ -116,6 +120,11 @@ func AtLeastOnce() LinkOption {
}
}
+// Filter returns a LinkOption that sets a filter.
+func Filter(m map[amqp.Symbol]interface{}) LinkOption {
+ return func(l *linkSettings) { l.filter = m }
+}
+
// SourceSettings returns a LinkOption that sets all the SourceSettings.
// Note: it will override the source address set by a Source() option
func SourceSettings(ts TerminusSettings) LinkOption {
@@ -161,6 +170,7 @@ type linkSettings struct {
rcvSettle RcvSettleMode
capacity int
prefetch bool
+ filter map[amqp.Symbol]interface{}
session *session
pLink proton.Link
}
@@ -189,15 +199,16 @@ type link struct {
linkSettings
}
-func (l *linkSettings) Source() string { return l.source }
-func (l *linkSettings) Target() string { return l.target }
-func (l *linkSettings) LinkName() string { return l.linkName }
-func (l *linkSettings) IsSender() bool { return l.isSender }
-func (l *linkSettings) IsReceiver() bool { return !l.isSender }
-func (l *linkSettings) SndSettle() SndSettleMode { return l.sndSettle }
-func (l *linkSettings) RcvSettle() RcvSettleMode { return l.rcvSettle }
-func (l *linkSettings) SourceSettings() TerminusSettings { return l.sourceSettings }
-func (l *linkSettings) TargetSettings() TerminusSettings { return l.targetSettings }
+func (l *linkSettings) Source() string { return l.source }
+func (l *linkSettings) Target() string { return l.target }
+func (l *linkSettings) LinkName() string { return l.linkName }
+func (l *linkSettings) IsSender() bool { return l.isSender }
+func (l *linkSettings) IsReceiver() bool { return !l.isSender }
+func (l *linkSettings) SndSettle() SndSettleMode { return l.sndSettle }
+func (l *linkSettings) RcvSettle() RcvSettleMode { return l.rcvSettle }
+func (l *linkSettings) Filter() map[amqp.Symbol]interface{} { return l.filter }
+func (l *linkSettings) SourceSettings() TerminusSettings { return l.sourceSettings }
+func (l *linkSettings) TargetSettings() TerminusSettings { return l.targetSettings }
func (l *link) Session() Session { return l.session }
func (l *link) Connection() Connection { return l.session.Connection() }
@@ -227,6 +238,12 @@ func makeLocalLink(sn *session, isSender bool, setting ...LinkOption) (linkSetti
return l, fmt.Errorf("cannot create link %s", l.pLink)
}
l.pLink.Source().SetAddress(l.source)
+
+ if len(l.filter) > 0 {
+ if err := l.pLink.Source().Filter().Marshal(l.filter); err != nil {
+ panic(err) // Shouldn't happen
+ }
+ }
l.pLink.Source().SetDurability(l.sourceSettings.Durability)
l.pLink.Source().SetExpiryPolicy(l.sourceSettings.Expiry)
l.pLink.Source().SetTimeout(l.sourceSettings.Timeout)
@@ -245,7 +262,7 @@ func makeLocalLink(sn *session, isSender bool, setting ...LinkOption) (linkSetti
}
func makeIncomingLinkSettings(pLink proton.Link, sn *session) linkSettings {
- return linkSettings{
+ l := linkSettings{
isSender: pLink.IsSender(),
source: pLink.RemoteSource().Address(),
sourceSettings: makeTerminusSettings(pLink.RemoteSource()),
@@ -259,6 +276,11 @@ func makeIncomingLinkSettings(pLink proton.Link, sn *session) linkSettings {
pLink: pLink,
session: sn,
}
+ filter := l.pLink.RemoteSource().Filter()
+ if !filter.Empty() {
+ filter.Unmarshal(&l.filter) // TODO aconway 2017-06-08: ignoring errors
+ }
+ return l
}
// Not part of Link interface but use by Sender and Receiver.
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c31e2ecf/proton-c/bindings/go/src/qpid.apache.org/electron/link_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/link_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/link_test.go
index 2576046..feb1f20 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/link_test.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/link_test.go
@@ -22,6 +22,7 @@ package electron
import (
"net"
+ "qpid.apache.org/amqp"
"qpid.apache.org/proton"
"testing"
"time"
@@ -31,6 +32,7 @@ func TestLinkSettings(t *testing.T) {
cConn, sConn := net.Pipe()
done := make(chan error)
settings := TerminusSettings{Durability: 1, Expiry: 2, Timeout: 42 * time.Second, Dynamic: true}
+ filterMap := map[amqp.Symbol]interface{}{"int": int32(33), "str": "hello"}
go func() { // Server
close(done)
defer sConn.Close()
@@ -48,6 +50,7 @@ func TestLinkSettings(t *testing.T) {
errorIf(t, checkEqual("two", ep.LinkName()))
errorIf(t, checkEqual("two.source", ep.Source()))
errorIf(t, checkEqual(TerminusSettings{Durability: proton.Deliveries, Expiry: proton.ExpireNever}, ep.SourceSettings()))
+ errorIf(t, checkEqual(filterMap, ep.Filter()))
}
}
}()
@@ -57,7 +60,7 @@ func TestLinkSettings(t *testing.T) {
fatalIf(t, err)
c.Sender(Source("one.source"), Target("one.target"), TargetSettings(settings))
- c.Receiver(Source("two.source"), DurableSubscription("two"))
+ c.Receiver(Source("two.source"), DurableSubscription("two"), Filter(filterMap))
c.Close(nil)
<-done
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c31e2ecf/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
index b6386b8..879ad53 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
@@ -114,18 +114,37 @@ func (e Event) String() string { return e.Type().String() }
// functions) to let them inject functions back into the handlers goroutine.
func (e Event) Injecter() Injecter { return e.injecter }
-// Data holds a pointer to decoded AMQP data.
-// Use amqp.marshal/unmarshal to access it as Go data types.
-//
+// Data is an intermediate form of decoded AMQP data.
type Data struct{ pn *C.pn_data_t }
-func NewData(p unsafe.Pointer) Data { return Data{(*C.pn_data_t)(p)} }
+func (d Data) Free() { C.pn_data_free(d.pn) }
+func (d Data) CPtr() unsafe.Pointer { return unsafe.Pointer(d.pn) }
+func (d Data) Clear() { C.pn_data_clear(d.pn) }
+func (d Data) Rewind() { C.pn_data_rewind(d.pn) }
+func (d Data) Next() { C.pn_data_next(d.pn) }
+func (d Data) Error() error { return PnError(C.pn_data_error(d.pn)) }
+func (d Data) Empty() bool { return C.pn_data_size(d.pn) == 0 }
+
+func (d Data) String() string {
+ str := C.pn_string(C.CString(""))
+ defer C.pn_free(unsafe.Pointer(str))
+ C.pn_inspect(unsafe.Pointer(d.pn), str)
+ return C.GoString(C.pn_string_get(str))
+}
+
+// Unmarshal the value of d into value pointed at by ptr, see amqp.Unmarshal() for details
+func (d Data) Unmarshal(ptr interface{}) error {
+ d.Rewind()
+ d.Next()
+ err := amqp.UnmarshalUnsafe(d.CPtr(), ptr)
+ return err
+}
-func (d Data) Free() { C.pn_data_free(d.pn) }
-func (d Data) Pointer() unsafe.Pointer { return unsafe.Pointer(d.pn) }
-func (d Data) Clear() { C.pn_data_clear(d.pn) }
-func (d Data) Rewind() { C.pn_data_rewind(d.pn) }
-func (d Data) Error() error { return PnError(C.pn_data_error(d.pn)) }
+// Marshal the value v into d, see amqp.Marshal() for details
+func (d Data) Marshal(v interface{}) error {
+ d.Clear()
+ return amqp.MarshalUnsafe(v, d.CPtr())
+}
// State holds the state flags for an AMQP endpoint.
type State byte
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org