You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2017/02/23 22:43:17 UTC

[4/6] qpid-proton git commit: PROTON-1415: go binding does not create durable subscriber

PROTON-1415: go binding does not create durable subscriber

Added `DurableSubscription(name string) LinkOption`, pass to Connection.Receiver()
to create a named durable subscription.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/38592d16
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/38592d16
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/38592d16

Branch: refs/heads/master
Commit: 38592d1668eef9d192ed0c814b34e807039777d3
Parents: 540e74e
Author: Alan Conway <ac...@redhat.com>
Authored: Thu Feb 23 14:02:14 2017 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Thu Feb 23 15:30:05 2017 -0500

----------------------------------------------------------------------
 .../src/qpid.apache.org/electron/connection.go  |   2 +-
 .../go/src/qpid.apache.org/electron/link.go     | 128 ++++++++++++++-----
 .../src/qpid.apache.org/electron/link_test.go   |  63 +++++++++
 3 files changed, 160 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38592d16/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
index 7f3050f..8f62491 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/connection.go
@@ -276,7 +276,7 @@ func (c *connection) WaitTimeout(timeout time.Duration) error {
 }
 
 func (c *connection) Incoming() <-chan Incoming {
-	assert(c.incoming != nil, "electron.Connection.Incoming() disabled for %s", c)
+	assert(c.incoming != nil, "Incoming() is only allowed for a Connection created with the Server() option: %s", c)
 	return c.incoming
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38592d16/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
index 1d17894..4f927c1 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/link.go
@@ -22,6 +22,7 @@ package electron
 import (
 	"fmt"
 	"qpid.apache.org/proton"
+	"time"
 )
 
 // Settings associated with a link
@@ -50,6 +51,12 @@ type LinkSettings interface {
 
 	// Session containing the Link
 	Session() Session
+
+	// Advanced settings for the source
+	SourceSettings() TerminusSettings
+
+	// Advanced settings for the target
+	TargetSettings() TerminusSettings
 }
 
 // LinkOption can be passed when creating a sender or receiver link to set optional configuration.
@@ -62,7 +69,7 @@ func Source(s string) LinkOption { return func(l *linkSettings) { l.source = s }
 func Target(s string) LinkOption { return func(l *linkSettings) { l.target = s } }
 
 // LinkName returns a LinkOption that sets the link name.
-func LinkName(s string) LinkOption { return func(l *linkSettings) { l.target = s } }
+func LinkName(s string) LinkOption { return func(l *linkSettings) { l.linkName = s } }
 
 // SndSettle returns a LinkOption that sets the send settle mode
 func SndSettle(m SndSettleMode) LinkOption { return func(l *linkSettings) { l.sndSettle = m } }
@@ -70,16 +77,23 @@ func SndSettle(m SndSettleMode) LinkOption { return func(l *linkSettings) { l.sn
 // RcvSettle returns a LinkOption that sets the send settle mode
 func RcvSettle(m RcvSettleMode) LinkOption { return func(l *linkSettings) { l.rcvSettle = m } }
 
-// SndSettleMode returns a LinkOption that defines when the sending end of the
-// link settles message delivery.
-type SndSettleMode proton.SndSettleMode
-
 // Capacity returns a LinkOption that sets the link capacity
 func Capacity(n int) LinkOption { return func(l *linkSettings) { l.capacity = n } }
 
 // Prefetch returns a LinkOption that sets a receivers pre-fetch flag. Not relevant for a sender.
 func Prefetch(p bool) LinkOption { return func(l *linkSettings) { l.prefetch = p } }
 
+// DurableSubscription returns a LinkOption that configures a Receiver as a named durable
+// subscription.  The name overrides (and is overridden by) LinkName() so you should normally
+// only use one of these options.
+func DurableSubscription(name string) LinkOption {
+	return func(l *linkSettings) {
+		l.linkName = name
+		l.sourceSettings.Durability = proton.Deliveries
+		l.sourceSettings.Expiry = proton.ExpireNever
+	}
+}
+
 // AtMostOnce returns a LinkOption that sets "fire and forget" mode, messages
 // are sent but no acknowledgment is received, messages can be lost if there is
 // a network failure. Sets SndSettleMode=SendSettled and RcvSettleMode=RcvFirst
@@ -102,6 +116,21 @@ func AtLeastOnce() LinkOption {
 	}
 }
 
+// SourceSettings returns a LinkOption that sets all the SourceSettings.
+// Note: it will override the source address set by a Source() option
+func SourceSettings(ts TerminusSettings) LinkOption {
+	return func(l *linkSettings) { l.sourceSettings = ts }
+}
+
+// TargetSettings returns a LinkOption that sets all the TargetSettings.
+// Note: it will override the target address set by a Target() option
+func TargetSettings(ts TerminusSettings) LinkOption {
+	return func(l *linkSettings) { l.targetSettings = ts }
+}
+
+// SndSettleMode defines when the sending end of the link settles message delivery.
+type SndSettleMode proton.SndSettleMode
+
 const (
 	// Messages are sent unsettled
 	SndUnsettled = SndSettleMode(proton.SndUnsettled)
@@ -122,16 +151,37 @@ const (
 )
 
 type linkSettings struct {
-	source    string
-	target    string
-	linkName  string
-	isSender  bool
-	sndSettle SndSettleMode
-	rcvSettle RcvSettleMode
-	capacity  int
-	prefetch  bool
-	session   *session
-	pLink     proton.Link
+	source         string
+	sourceSettings TerminusSettings
+	target         string
+	targetSettings TerminusSettings
+	linkName       string
+	isSender       bool
+	sndSettle      SndSettleMode
+	rcvSettle      RcvSettleMode
+	capacity       int
+	prefetch       bool
+	session        *session
+	pLink          proton.Link
+}
+
+// Advanced AMQP settings for the source or target of a link.
+// Usually these can be set via a more descriptive LinkOption, e.g. DurableSubscription()
+// and do not need to be set/examined directly.
+type TerminusSettings struct {
+	Durability proton.Durability
+	Expiry     proton.ExpiryPolicy
+	Timeout    time.Duration
+	Dynamic    bool
+}
+
+func makeTerminusSettings(t proton.Terminus) TerminusSettings {
+	return TerminusSettings{
+		Durability: t.Durability(),
+		Expiry:     t.ExpiryPolicy(),
+		Timeout:    t.Timeout(),
+		Dynamic:    t.IsDynamic(),
+	}
 }
 
 type link struct {
@@ -139,13 +189,15 @@ type link struct {
 	linkSettings
 }
 
-func (l *linkSettings) Source() string           { return l.source }
-func (l *linkSettings) Target() string           { return l.target }
-func (l *linkSettings) LinkName() string         { return l.linkName }
-func (l *linkSettings) IsSender() bool           { return l.isSender }
-func (l *linkSettings) IsReceiver() bool         { return !l.isSender }
-func (l *linkSettings) SndSettle() SndSettleMode { return l.sndSettle }
-func (l *linkSettings) RcvSettle() RcvSettleMode { return l.rcvSettle }
+func (l *linkSettings) Source() string                   { return l.source }
+func (l *linkSettings) Target() string                   { return l.target }
+func (l *linkSettings) LinkName() string                 { return l.linkName }
+func (l *linkSettings) IsSender() bool                   { return l.isSender }
+func (l *linkSettings) IsReceiver() bool                 { return !l.isSender }
+func (l *linkSettings) SndSettle() SndSettleMode         { return l.sndSettle }
+func (l *linkSettings) RcvSettle() RcvSettleMode         { return l.rcvSettle }
+func (l *linkSettings) SourceSettings() TerminusSettings { return l.sourceSettings }
+func (l *linkSettings) TargetSettings() TerminusSettings { return l.targetSettings }
 
 func (l *link) Session() Session       { return l.session }
 func (l *link) Connection() Connection { return l.session.Connection() }
@@ -175,7 +227,17 @@ func makeLocalLink(sn *session, isSender bool, setting ...LinkOption) (linkSetti
 		return l, fmt.Errorf("cannot create link %s", l.pLink)
 	}
 	l.pLink.Source().SetAddress(l.source)
+	l.pLink.Source().SetDurability(l.sourceSettings.Durability)
+	l.pLink.Source().SetExpiryPolicy(l.sourceSettings.Expiry)
+	l.pLink.Source().SetTimeout(l.sourceSettings.Timeout)
+	l.pLink.Source().SetDynamic(l.sourceSettings.Dynamic)
+
 	l.pLink.Target().SetAddress(l.target)
+	l.pLink.Target().SetDurability(l.targetSettings.Durability)
+	l.pLink.Target().SetExpiryPolicy(l.targetSettings.Expiry)
+	l.pLink.Target().SetTimeout(l.targetSettings.Timeout)
+	l.pLink.Target().SetDynamic(l.targetSettings.Dynamic)
+
 	l.pLink.SetSndSettleMode(proton.SndSettleMode(l.sndSettle))
 	l.pLink.SetRcvSettleMode(proton.RcvSettleMode(l.rcvSettle))
 	l.pLink.Open()
@@ -184,16 +246,18 @@ func makeLocalLink(sn *session, isSender bool, setting ...LinkOption) (linkSetti
 
 func makeIncomingLinkSettings(pLink proton.Link, sn *session) linkSettings {
 	return linkSettings{
-		isSender:  pLink.IsSender(),
-		source:    pLink.RemoteSource().Address(),
-		target:    pLink.RemoteTarget().Address(),
-		linkName:  pLink.Name(),
-		sndSettle: SndSettleMode(pLink.RemoteSndSettleMode()),
-		rcvSettle: RcvSettleMode(pLink.RemoteRcvSettleMode()),
-		capacity:  1,
-		prefetch:  false,
-		pLink:     pLink,
-		session:   sn,
+		isSender:       pLink.IsSender(),
+		source:         pLink.RemoteSource().Address(),
+		sourceSettings: makeTerminusSettings(pLink.RemoteSource()),
+		target:         pLink.RemoteTarget().Address(),
+		targetSettings: makeTerminusSettings(pLink.RemoteTarget()),
+		linkName:       pLink.Name(),
+		sndSettle:      SndSettleMode(pLink.RemoteSndSettleMode()),
+		rcvSettle:      RcvSettleMode(pLink.RemoteRcvSettleMode()),
+		capacity:       1,
+		prefetch:       false,
+		pLink:          pLink,
+		session:        sn,
 	}
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/38592d16/proton-c/bindings/go/src/qpid.apache.org/electron/link_test.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/link_test.go b/proton-c/bindings/go/src/qpid.apache.org/electron/link_test.go
new file mode 100644
index 0000000..eb49c47
--- /dev/null
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/link_test.go
@@ -0,0 +1,63 @@
+/*
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+*/
+
+// Test that link settings are propagated correctly
+package electron
+
+import (
+	"net"
+	"qpid.apache.org/proton"
+	"testing"
+)
+
+func TestLinkSettings(t *testing.T) {
+	cConn, sConn := net.Pipe()
+	done := make(chan error)
+	// FIXME aconway 2017-02-23: bug in timeout conversion (pn_second_t)
+	settings := TerminusSettings{Durability: 1, Expiry: 2 /*, Timeout: 42 * time.Second*/, Dynamic: true}
+	go func() { // Server
+		close(done)
+		defer sConn.Close()
+		c, err := NewConnection(sConn, Server())
+		fatalIf(t, err)
+		for in := range c.Incoming() {
+			ep := in.Accept()
+			switch ep := ep.(type) {
+			case Receiver:
+				errorIf(t, checkEqual("one.source", ep.Source()))
+				errorIf(t, checkEqual(TerminusSettings{}, ep.SourceSettings()))
+				errorIf(t, checkEqual("one.target", ep.Target()))
+				errorIf(t, checkEqual(settings, ep.TargetSettings()))
+			case Sender:
+				errorIf(t, checkEqual("two", ep.LinkName()))
+				errorIf(t, checkEqual("two.source", ep.Source()))
+				errorIf(t, checkEqual(TerminusSettings{Durability: proton.Deliveries, Expiry: proton.ExpireNever}, ep.SourceSettings()))
+			}
+		}
+	}()
+
+	// Client
+	c, err := NewConnection(cConn)
+	fatalIf(t, err)
+	c.Sender(Source("one.source"), Target("one.target"), TargetSettings(settings))
+
+	c.Receiver(Source("two.source"), DurableSubscription("two"))
+	c.Close(nil)
+	<-done
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org