You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/02/23 22:43:17 UTC
[4/6] qpid-proton git commit: PROTON-1415: go binding does not create
durable subscriber
PROTON-1415: go binding does not create durable subscriber
Added `DurableSubscription(name string) LinkOption`, pass to Connection.Receiver()
to create a named durable subscription.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/38592d16
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/38592d16
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/38592d16
Branch: refs/heads/master
Commit: 38592d1668eef9d192ed0c814b34e807039777d3
Parents: 540e74e
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Feb 23 14:02:14 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Feb 23 15:30:05 2017 -0500
----------------------------------------------------------------------
.../src/qpid.apache.org/electron/connection.go | 2 +-
.../go/src/qpid.apache.org/electron/link.go | 128 ++++++++++++++-----
.../src/qpid.apache.org/electron/link_test.go | 63 +++++++++
3 files changed, 160 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38592d16/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
index 7f3050f..8f62491 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
@@ -276,7 +276,7 @@ func (c *connection) WaitTimeout(timeout time.Duration) error {
}
func (c *connection) Incoming() <-chan Incoming {
- assert(c.incoming != nil, "electron.Connection.Incoming() disabled for %s", c)
+ assert(c.incoming != nil, "Incoming() is only allowed for a Connection created with the Server() option: %s", c)
return c.incoming
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38592d16/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
index 1d17894..4f927c1 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
@@ -22,6 +22,7 @@ package electron
import (
"fmt"
"qpid.apache.org/proton"
+ "time"
)
// Settings associated with a link
@@ -50,6 +51,12 @@ type LinkSettings interface {
// Session containing the Link
Session() Session
+
+ // Advanced settings for the source
+ SourceSettings() TerminusSettings
+
+ // Advanced settings for the target
+ TargetSettings() TerminusSettings
}
// LinkOption can be passed when creating a sender or receiver link to set optional configuration.
@@ -62,7 +69,7 @@ func Source(s string) LinkOption { return func(l *linkSettings) { l.source = s }
func Target(s string) LinkOption { return func(l *linkSettings) { l.target = s } }
// LinkName returns a LinkOption that sets the link name.
-func LinkName(s string) LinkOption { return func(l *linkSettings) { l.target = s } }
+func LinkName(s string) LinkOption { return func(l *linkSettings) { l.linkName = s } }
// SndSettle returns a LinkOption that sets the send settle mode
func SndSettle(m SndSettleMode) LinkOption { return func(l *linkSettings) { l.sndSettle = m } }
@@ -70,16 +77,23 @@ func SndSettle(m SndSettleMode) LinkOption { return func(l *linkSettings) { l.sn
// RcvSettle returns a LinkOption that sets the send settle mode
func RcvSettle(m RcvSettleMode) LinkOption { return func(l *linkSettings) { l.rcvSettle = m } }
-// SndSettleMode returns a LinkOption that defines when the sending end of the
-// link settles message delivery.
-type SndSettleMode proton.SndSettleMode
-
// Capacity returns a LinkOption that sets the link capacity
func Capacity(n int) LinkOption { return func(l *linkSettings) { l.capacity = n } }
// Prefetch returns a LinkOption that sets a receivers pre-fetch flag. Not relevant for a sender.
func Prefetch(p bool) LinkOption { return func(l *linkSettings) { l.prefetch = p } }
+// DurableSubscription returns a LinkOption that configures a Receiver as a named durable
+// subscription. The name overrides (and is overridden by) LinkName() so you should normally
+// only use one of these options.
+func DurableSubscription(name string) LinkOption {
+ return func(l *linkSettings) {
+ l.linkName = name
+ l.sourceSettings.Durability = proton.Deliveries
+ l.sourceSettings.Expiry = proton.ExpireNever
+ }
+}
+
// AtMostOnce returns a LinkOption that sets "fire and forget" mode, messages
// are sent but no acknowledgment is received, messages can be lost if there is
// a network failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst
@@ -102,6 +116,21 @@ func AtLeastOnce() LinkOption {
}
}
+// SourceSettings returns a LinkOption that sets all the SourceSettings.
+// Note: it will override the source address set by a Source() option
+func SourceSettings(ts TerminusSettings) LinkOption {
+ return func(l *linkSettings) { l.sourceSettings = ts }
+}
+
+// TargetSettings returns a LinkOption that sets all the TargetSettings.
+// Note: it will override the target address set by a Target() option
+func TargetSettings(ts TerminusSettings) LinkOption {
+ return func(l *linkSettings) { l.targetSettings = ts }
+}
+
+// SndSettleMode defines when the sending end of the link settles message delivery.
+type SndSettleMode proton.SndSettleMode
+
const (
// Messages are sent unsettled
SndUnsettled = SndSettleMode(proton.SndUnsettled)
@@ -122,16 +151,37 @@ const (
)
type linkSettings struct {
- source string
- target string
- linkName string
- isSender bool
- sndSettle SndSettleMode
- rcvSettle RcvSettleMode
- capacity int
- prefetch bool
- session *session
- pLink proton.Link
+ source string
+ sourceSettings TerminusSettings
+ target string
+ targetSettings TerminusSettings
+ linkName string
+ isSender bool
+ sndSettle SndSettleMode
+ rcvSettle RcvSettleMode
+ capacity int
+ prefetch bool
+ session *session
+ pLink proton.Link
+}
+
+// Advanced AMQP settings for the source or target of a link.
+// Usually these can be set via a more descriptive LinkOption, e.g. DurableSubscription()
+// and do not need to be set/examined directly.
+type TerminusSettings struct {
+ Durability proton.Durability
+ Expiry proton.ExpiryPolicy
+ Timeout time.Duration
+ Dynamic bool
+}
+
+func makeTerminusSettings(t proton.Terminus) TerminusSettings {
+ return TerminusSettings{
+ Durability: t.Durability(),
+ Expiry: t.ExpiryPolicy(),
+ Timeout: t.Timeout(),
+ Dynamic: t.IsDynamic(),
+ }
}
type link struct {
@@ -139,13 +189,15 @@ type link struct {
linkSettings
}
-func (l *linkSettings) Source() string { return l.source }
-func (l *linkSettings) Target() string { return l.target }
-func (l *linkSettings) LinkName() string { return l.linkName }
-func (l *linkSettings) IsSender() bool { return l.isSender }
-func (l *linkSettings) IsReceiver() bool { return !l.isSender }
-func (l *linkSettings) SndSettle() SndSettleMode { return l.sndSettle }
-func (l *linkSettings) RcvSettle() RcvSettleMode { return l.rcvSettle }
+func (l *linkSettings) Source() string { return l.source }
+func (l *linkSettings) Target() string { return l.target }
+func (l *linkSettings) LinkName() string { return l.linkName }
+func (l *linkSettings) IsSender() bool { return l.isSender }
+func (l *linkSettings) IsReceiver() bool { return !l.isSender }
+func (l *linkSettings) SndSettle() SndSettleMode { return l.sndSettle }
+func (l *linkSettings) RcvSettle() RcvSettleMode { return l.rcvSettle }
+func (l *linkSettings) SourceSettings() TerminusSettings { return l.sourceSettings }
+func (l *linkSettings) TargetSettings() TerminusSettings { return l.targetSettings }
func (l *link) Session() Session { return l.session }
func (l *link) Connection() Connection { return l.session.Connection() }
@@ -175,7 +227,17 @@ func makeLocalLink(sn *session, isSender bool, setting ...LinkOption) (linkSetti
return l, fmt.Errorf("cannot create link %s", l.pLink)
}
l.pLink.Source().SetAddress(l.source)
+ l.pLink.Source().SetDurability(l.sourceSettings.Durability)
+ l.pLink.Source().SetExpiryPolicy(l.sourceSettings.Expiry)
+ l.pLink.Source().SetTimeout(l.sourceSettings.Timeout)
+ l.pLink.Source().SetDynamic(l.sourceSettings.Dynamic)
+
l.pLink.Target().SetAddress(l.target)
+ l.pLink.Target().SetDurability(l.targetSettings.Durability)
+ l.pLink.Target().SetExpiryPolicy(l.targetSettings.Expiry)
+ l.pLink.Target().SetTimeout(l.targetSettings.Timeout)
+ l.pLink.Target().SetDynamic(l.targetSettings.Dynamic)
+
l.pLink.SetSndSettleMode(proton.SndSettleMode(l.sndSettle))
l.pLink.SetRcvSettleMode(proton.RcvSettleMode(l.rcvSettle))
l.pLink.Open()
@@ -184,16 +246,18 @@ func makeLocalLink(sn *session, isSender bool, setting ...LinkOption) (linkSetti
func makeIncomingLinkSettings(pLink proton.Link, sn *session) linkSettings {
return linkSettings{
- isSender: pLink.IsSender(),
- source: pLink.RemoteSource().Address(),
- target: pLink.RemoteTarget().Address(),
- linkName: pLink.Name(),
- sndSettle: SndSettleMode(pLink.RemoteSndSettleMode()),
- rcvSettle: RcvSettleMode(pLink.RemoteRcvSettleMode()),
- capacity: 1,
- prefetch: false,
- pLink: pLink,
- session: sn,
+ isSender: pLink.IsSender(),
+ source: pLink.RemoteSource().Address(),
+ sourceSettings: makeTerminusSettings(pLink.RemoteSource()),
+ target: pLink.RemoteTarget().Address(),
+ targetSettings: makeTerminusSettings(pLink.RemoteTarget()),
+ linkName: pLink.Name(),
+ sndSettle: SndSettleMode(pLink.RemoteSndSettleMode()),
+ rcvSettle: RcvSettleMode(pLink.RemoteRcvSettleMode()),
+ capacity: 1,
+ prefetch: false,
+ pLink: pLink,
+ session: sn,
}
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38592d16/proton-c/bindings/go/src/qpid.apache.org/electron/link_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/link_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/link_test.go
new file mode 100644
index 0000000..eb49c47
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/link_test.go
@@ -0,0 +1,63 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements. See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership. The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied. See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+// Test that link settings are propagated correctly
+package electron
+
+import (
+ "net"
+ "qpid.apache.org/proton"
+ "testing"
+)
+
+func TestLinkSettings(t *testing.T) {
+ cConn, sConn := net.Pipe()
+ done := make(chan error)
+ // FIXME aconway 2017-02-23: bug in timeout conversion (pn_second_t)
+ settings := TerminusSettings{Durability: 1, Expiry: 2 /*, Timeout: 42 * time.Second*/, Dynamic: true}
+ go func() { // Server
+ close(done)
+ defer sConn.Close()
+ c, err := NewConnection(sConn, Server())
+ fatalIf(t, err)
+ for in := range c.Incoming() {
+ ep := in.Accept()
+ switch ep := ep.(type) {
+ case Receiver:
+ errorIf(t, checkEqual("one.source", ep.Source()))
+ errorIf(t, checkEqual(TerminusSettings{}, ep.SourceSettings()))
+ errorIf(t, checkEqual("one.target", ep.Target()))
+ errorIf(t, checkEqual(settings, ep.TargetSettings()))
+ case Sender:
+ errorIf(t, checkEqual("two", ep.LinkName()))
+ errorIf(t, checkEqual("two.source", ep.Source()))
+ errorIf(t, checkEqual(TerminusSettings{Durability: proton.Deliveries, Expiry: proton.ExpireNever}, ep.SourceSettings()))
+ }
+ }
+ }()
+
+ // Client
+ c, err := NewConnection(cConn)
+ fatalIf(t, err)
+ c.Sender(Source("one.source"), Target("one.target"), TargetSettings(settings))
+
+ c.Receiver(Source("two.source"), DurableSubscription("two"))
+ c.Close(nil)
+ <-done
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org