You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/06/09 01:26:11 UTC

[48/50] [abbrv] qpid-proton git commit: PROTON-1450: go electron filter support

PROTON-1450: go electron filter support

electron.Filter() creates LinkOption to set filter, LinkSettings.Filter() gets filter.
proton.Data.Marshal()/Unmarshal() allow setting/getting proton.Data content.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/c31e2ecf
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/c31e2ecf
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/c31e2ecf

Branch: refs/heads/go1
Commit: c31e2ecf08fa3e5b46dc5c00a81049ec0d555196
Parents: 4cf899b
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Jun 8 21:05:51 2017 -0400
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Jun 8 21:18:47 2017 -0400

----------------------------------------------------------------------
 .../go/src/qpid.apache.org/amqp/marshal.go      | 33 ++++++++-------
 .../go/src/qpid.apache.org/amqp/message.go      |  8 ----
 .../go/src/qpid.apache.org/amqp/unmarshal.go    |  7 ++++
 .../go/src/qpid.apache.org/electron/link.go     | 42 +++++++++++++++-----
 .../src/qpid.apache.org/electron/link_test.go   |  5 ++-
 .../go/src/qpid.apache.org/proton/wrappers.go   | 37 ++++++++++++-----
 6 files changed, 89 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c31e2ecf/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go
index dad3d25..ca5e380 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/marshal.go
@@ -50,6 +50,16 @@ func dataMarshalError(v interface{}, data *C.pn_data_t) error {
 	return nil
 }
 
+func recoverMarshal(err *error) {
+	if r := recover(); r != nil {
+		if merr, ok := r.(*MarshalError); ok {
+			*err = merr
+		} else {
+			panic(r)
+		}
+	}
+}
+
 /*
 Marshal encodes a Go value as AMQP data in buffer.
 If buffer is nil, or is not large enough, a new buffer  is created.
@@ -99,16 +109,7 @@ Go types: struct, complex64/128.
 AMQP types: decimal32/64/128, char, timestamp, uuid, array.
 */
 func Marshal(v interface{}, buffer []byte) (outbuf []byte, err error) {
-	defer func() {
-		if r := recover(); r != nil {
-			if merr, ok := r.(*MarshalError); ok {
-				err = merr
-			} else {
-				panic(r)
-			}
-		}
-	}()
-
+	defer recoverMarshal(&err)
 	data := C.pn_data(0)
 	defer C.pn_data_free(data)
 	marshal(v, data)
@@ -126,6 +127,13 @@ func Marshal(v interface{}, buffer []byte) (outbuf []byte, err error) {
 	return encodeGrow(buffer, encode)
 }
 
+// Internal
+func MarshalUnsafe(v interface{}, pn_data unsafe.Pointer) (err error) {
+	defer recoverMarshal(&err)
+	marshal(v, (*C.pn_data_t)(pn_data))
+	return
+}
+
 const minEncode = 256
 
 // overflow is returned when an encoding function can't fit data in the buffer.
@@ -271,8 +279,3 @@ func (e *Encoder) Encode(v interface{}) (err error) {
 	}
 	return err
 }
-
-func replace(data *C.pn_data_t, v interface{}) {
-	C.pn_data_clear(data)
-	marshal(v, data)
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c31e2ecf/proton-c/bindings/go/src/qpid.apache.org/amqp/message.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/message.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/message.go
index f2270f0..d1ad2eb 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/amqp/message.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/message.go
@@ -38,7 +38,6 @@ import (
 	"fmt"
 	"runtime"
 	"time"
-	"unsafe"
 )
 
 // Message is the interface to an AMQP message.
@@ -275,13 +274,6 @@ func setData(v interface{}, data *C.pn_data_t) {
 	marshal(v, data)
 }
 
-func dataString(data *C.pn_data_t) string {
-	str := C.pn_string(C.CString(""))
-	defer C.pn_free(unsafe.Pointer(str))
-	C.pn_inspect(unsafe.Pointer(data), str)
-	return C.GoString(C.pn_string_get(str))
-}
-
 func (m *message) SetInferred(b bool)  { C.pn_message_set_inferred(m.pn, C.bool(b)) }
 func (m *message) SetDurable(b bool)   { C.pn_message_set_durable(m.pn, C.bool(b)) }
 func (m *message) SetPriority(b uint8) { C.pn_message_set_priority(m.pn, C.uint8_t(b)) }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c31e2ecf/proton-c/bindings/go/src/qpid.apache.org/amqp/unmarshal.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/amqp/unmarshal.go b/proton-c/bindings/go/src/qpid.apache.org/amqp/unmarshal.go
index f679abc..253d66d 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/amqp/unmarshal.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/amqp/unmarshal.go
@@ -223,6 +223,13 @@ func Unmarshal(bytes []byte, v interface{}) (n int, err error) {
 	return n, nil
 }
 
+// Internal
+func UnmarshalUnsafe(pn_data unsafe.Pointer, v interface{}) (err error) {
+	defer recoverUnmarshal(&err)
+	unmarshal(v, (*C.pn_data_t)(pn_data))
+	return
+}
+
 // more reads more data when we can't parse a complete AMQP type
 func (d *Decoder) more() error {
 	var readSize int64 = minDecode

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c31e2ecf/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
index 4f927c1..de8a995 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
@@ -21,6 +21,7 @@ package electron
 
 import (
 	"fmt"
+	"qpid.apache.org/amqp"
 	"qpid.apache.org/proton"
 	"time"
 )
@@ -52,6 +53,9 @@ type LinkSettings interface {
 	// Session containing the Link
 	Session() Session
 
+	// Filter for the link
+	Filter() map[amqp.Symbol]interface{}
+
 	// Advanced settings for the source
 	SourceSettings() TerminusSettings
 
@@ -116,6 +120,11 @@ func AtLeastOnce() LinkOption {
 	}
 }
 
+// Filter returns a LinkOption that sets a filter.
+func Filter(m map[amqp.Symbol]interface{}) LinkOption {
+	return func(l *linkSettings) { l.filter = m }
+}
+
 // SourceSettings returns a LinkOption that sets all the SourceSettings.
 // Note: it will override the source address set by a Source() option
 func SourceSettings(ts TerminusSettings) LinkOption {
@@ -161,6 +170,7 @@ type linkSettings struct {
 	rcvSettle      RcvSettleMode
 	capacity       int
 	prefetch       bool
+	filter         map[amqp.Symbol]interface{}
 	session        *session
 	pLink          proton.Link
 }
@@ -189,15 +199,16 @@ type link struct {
 	linkSettings
 }
 
-func (l *linkSettings) Source() string                   { return l.source }
-func (l *linkSettings) Target() string                   { return l.target }
-func (l *linkSettings) LinkName() string                 { return l.linkName }
-func (l *linkSettings) IsSender() bool                   { return l.isSender }
-func (l *linkSettings) IsReceiver() bool                 { return !l.isSender }
-func (l *linkSettings) SndSettle() SndSettleMode         { return l.sndSettle }
-func (l *linkSettings) RcvSettle() RcvSettleMode         { return l.rcvSettle }
-func (l *linkSettings) SourceSettings() TerminusSettings { return l.sourceSettings }
-func (l *linkSettings) TargetSettings() TerminusSettings { return l.targetSettings }
+func (l *linkSettings) Source() string                      { return l.source }
+func (l *linkSettings) Target() string                      { return l.target }
+func (l *linkSettings) LinkName() string                    { return l.linkName }
+func (l *linkSettings) IsSender() bool                      { return l.isSender }
+func (l *linkSettings) IsReceiver() bool                    { return !l.isSender }
+func (l *linkSettings) SndSettle() SndSettleMode            { return l.sndSettle }
+func (l *linkSettings) RcvSettle() RcvSettleMode            { return l.rcvSettle }
+func (l *linkSettings) Filter() map[amqp.Symbol]interface{} { return l.filter }
+func (l *linkSettings) SourceSettings() TerminusSettings    { return l.sourceSettings }
+func (l *linkSettings) TargetSettings() TerminusSettings    { return l.targetSettings }
 
 func (l *link) Session() Session       { return l.session }
 func (l *link) Connection() Connection { return l.session.Connection() }
@@ -227,6 +238,12 @@ func makeLocalLink(sn *session, isSender bool, setting ...LinkOption) (linkSetti
 		return l, fmt.Errorf("cannot create link %s", l.pLink)
 	}
 	l.pLink.Source().SetAddress(l.source)
+
+	if len(l.filter) > 0 {
+		if err := l.pLink.Source().Filter().Marshal(l.filter); err != nil {
+			panic(err) // Shouldn't happen
+		}
+	}
 	l.pLink.Source().SetDurability(l.sourceSettings.Durability)
 	l.pLink.Source().SetExpiryPolicy(l.sourceSettings.Expiry)
 	l.pLink.Source().SetTimeout(l.sourceSettings.Timeout)
@@ -245,7 +262,7 @@ func makeLocalLink(sn *session, isSender bool, setting ...LinkOption) (linkSetti
 }
 
 func makeIncomingLinkSettings(pLink proton.Link, sn *session) linkSettings {
-	return linkSettings{
+	l := linkSettings{
 		isSender:       pLink.IsSender(),
 		source:         pLink.RemoteSource().Address(),
 		sourceSettings: makeTerminusSettings(pLink.RemoteSource()),
@@ -259,6 +276,11 @@ func makeIncomingLinkSettings(pLink proton.Link, sn *session) linkSettings {
 		pLink:          pLink,
 		session:        sn,
 	}
+	filter := l.pLink.RemoteSource().Filter()
+	if !filter.Empty() {
+		filter.Unmarshal(&l.filter) // TODO aconway 2017-06-08: ignoring errors
+	}
+	return l
 }
 
 // Not part of Link interface but use by Sender and Receiver.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c31e2ecf/proton-c/bindings/go/src/qpid.apache.org/electron/link_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/link_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/link_test.go
index 2576046..feb1f20 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/link_test.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/link_test.go
@@ -22,6 +22,7 @@ package electron
 
 import (
 	"net"
+	"qpid.apache.org/amqp"
 	"qpid.apache.org/proton"
 	"testing"
 	"time"
@@ -31,6 +32,7 @@ func TestLinkSettings(t *testing.T) {
 	cConn, sConn := net.Pipe()
 	done := make(chan error)
 	settings := TerminusSettings{Durability: 1, Expiry: 2, Timeout: 42 * time.Second, Dynamic: true}
+	filterMap := map[amqp.Symbol]interface{}{"int": int32(33), "str": "hello"}
 	go func() { // Server
 		close(done)
 		defer sConn.Close()
@@ -48,6 +50,7 @@ func TestLinkSettings(t *testing.T) {
 				errorIf(t, checkEqual("two", ep.LinkName()))
 				errorIf(t, checkEqual("two.source", ep.Source()))
 				errorIf(t, checkEqual(TerminusSettings{Durability: proton.Deliveries, Expiry: proton.ExpireNever}, ep.SourceSettings()))
+				errorIf(t, checkEqual(filterMap, ep.Filter()))
 			}
 		}
 	}()
@@ -57,7 +60,7 @@ func TestLinkSettings(t *testing.T) {
 	fatalIf(t, err)
 	c.Sender(Source("one.source"), Target("one.target"), TargetSettings(settings))
 
-	c.Receiver(Source("two.source"), DurableSubscription("two"))
+	c.Receiver(Source("two.source"), DurableSubscription("two"), Filter(filterMap))
 	c.Close(nil)
 	<-done
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/c31e2ecf/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
index b6386b8..879ad53 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
@@ -114,18 +114,37 @@ func (e Event) String() string         { return e.Type().String() }
 // functions) to let them inject functions back into the handlers goroutine.
 func (e Event) Injecter() Injecter { return e.injecter }
 
-// Data holds a pointer to decoded AMQP data.
-// Use amqp.marshal/unmarshal to access it as Go data types.
-//
+// Data is an intermediate form of decoded AMQP data.
 type Data struct{ pn *C.pn_data_t }
 
-func NewData(p unsafe.Pointer) Data { return Data{(*C.pn_data_t)(p)} }
+func (d Data) Free()                { C.pn_data_free(d.pn) }
+func (d Data) CPtr() unsafe.Pointer { return unsafe.Pointer(d.pn) }
+func (d Data) Clear()               { C.pn_data_clear(d.pn) }
+func (d Data) Rewind()              { C.pn_data_rewind(d.pn) }
+func (d Data) Next()                { C.pn_data_next(d.pn) }
+func (d Data) Error() error         { return PnError(C.pn_data_error(d.pn)) }
+func (d Data) Empty() bool          { return C.pn_data_size(d.pn) == 0 }
+
+func (d Data) String() string {
+	str := C.pn_string(C.CString(""))
+	defer C.pn_free(unsafe.Pointer(str))
+	C.pn_inspect(unsafe.Pointer(d.pn), str)
+	return C.GoString(C.pn_string_get(str))
+}
+
+// Unmarshal the value of d into value pointed at by ptr, see amqp.Unmarshal() for details
+func (d Data) Unmarshal(ptr interface{}) error {
+	d.Rewind()
+	d.Next()
+	err := amqp.UnmarshalUnsafe(d.CPtr(), ptr)
+	return err
+}
 
-func (d Data) Free()                   { C.pn_data_free(d.pn) }
-func (d Data) Pointer() unsafe.Pointer { return unsafe.Pointer(d.pn) }
-func (d Data) Clear()                  { C.pn_data_clear(d.pn) }
-func (d Data) Rewind()                 { C.pn_data_rewind(d.pn) }
-func (d Data) Error() error            { return PnError(C.pn_data_error(d.pn)) }
+// Marshal the value v into d, see amqp.Marshal() for details
+func (d Data) Marshal(v interface{}) error {
+	d.Clear()
+	return amqp.MarshalUnsafe(v, d.CPtr())
+}
 
 // State holds the state flags for an AMQP endpoint.
 type State byte


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org