You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2015/11/23 16:15:04 UTC
qpid-proton git commit: NO-JIRA: Go: Fix cleanup of sessions by
connection
Repository: qpid-proton
Updated Branches:
refs/heads/master d221b2744 -> 380f81d05
NO-JIRA: Go: Fix cleanup of sessions by connection
More consistent handling for endpoint close.
Add endpoint valid check in all injected functions to catch injected functions
executed after an endpoint close.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/380f81d0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/380f81d0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/380f81d0
Branch: refs/heads/master
Commit: 380f81d054d983bbf30fd116b9ee86d80ef66788
Parents: d221b27
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Nov 20 10:01:55 2015 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Nov 23 10:05:57 2015 -0500
----------------------------------------------------------------------
.../go/src/qpid.apache.org/electron/doc.go | 11 ++++--
.../go/src/qpid.apache.org/electron/endpoint.go | 24 ++++++------
.../go/src/qpid.apache.org/electron/handler.go | 40 ++++++++++++++------
.../go/src/qpid.apache.org/proton/wrappers.go | 4 +-
4 files changed, 52 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/380f81d0/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go b/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
index a484900..46bde37 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
@@ -49,10 +49,15 @@ import "C"
There is a single proton.Engine per connection, each driving it's own event-loop goroutine,
and each with a 'handler'. Most state for a connection is maintained on the handler, and
-only accessed in the event-loop goroutine, so no locks are required.
+only accessed in the event-loop goroutine, so no locks are required there.
The handler sets up channels as needed to get or send data from user goroutines
-using electron types like Sender or Receiver. We also use Engine.Inject to inject
-actions into the event loop from user goroutines.
+using electron types like Sender or Receiver.
+
+We also use Engine.Inject to inject actions into the event loop from user
+goroutines. It is important to check at the start of an injected function that
+required objects are still valid, for example a link may be remotely closed
+between the time a Sender function calls Inject and the time the injected
+function is execute by the handler goroutine. See comments in endpoint.go for more.
*/
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/380f81d0/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go b/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
index f04b240..8cbeadb 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
@@ -54,6 +54,10 @@ type Endpoint interface {
Done() <-chan struct{}
}
+// DEVELOPER NOTES
+//
+// An electron.Endpoint corresponds to a proton.Endpoint, which can be invalidated
+//
type endpoint struct {
err proton.ErrorHolder
str string // Must be set by the value that embeds endpoint.
@@ -62,10 +66,17 @@ type endpoint struct {
func makeEndpoint(s string) endpoint { return endpoint{str: s, done: make(chan struct{})} }
-func (e *endpoint) closed(err error) {
+// Called in handler on a Closed event. Marks the endpoint as closed and the corresponding
+// proton.Endpoint pointer as invalid. Injected functions should check Error() to ensure
+// the pointer has not been invalidated.
+//
+// Returns the error stored on the endpoint, which may not be different to err if there was
+// already a n error
+func (e *endpoint) closed(err error) error {
e.err.Set(err)
e.err.Set(Closed)
close(e.done)
+ return e.err.Get()
}
func (e *endpoint) String() string { return e.str }
@@ -74,19 +85,10 @@ func (e *endpoint) Error() error { return e.err.Get() }
func (e *endpoint) Done() <-chan struct{} { return e.done }
-// Call in proton goroutine to close an endpoint locally
+// Call in proton goroutine to initiate closing an endpoint locally
// handler will complete the close when remote end closes.
func localClose(ep proton.Endpoint, err error) {
if ep.State().LocalActive() {
proton.CloseError(ep, err)
}
}
-
-// Used to indicate that a channel has closed which normally is because the endpoint is closed.
-func errorOrClosed(e Endpoint) error {
- if e.Error() != nil {
- return e.Error()
- } else {
- return Closed
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/380f81d0/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
index 1586026..0237156 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
@@ -49,6 +49,7 @@ func newHandler(c *connection) *handler {
h.delegator.AutoOpen = false
return h
}
+
func (h *handler) linkError(l proton.Link, msg string) {
proton.CloseError(l, amqp.Errorf(amqp.InternalError, "%s for %s %s", msg, l.Type(), l))
}
@@ -83,13 +84,7 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
}
case proton.MSessionClosed:
- err := proton.EndpointError(e.Session())
- for l, _ := range h.links {
- if l.Session() == e.Session() {
- h.linkClosed(l, err)
- }
- }
- delete(h.sessions, e.Session())
+ h.sessionClosed(e.Session(), proton.EndpointError(e.Session()))
case proton.MLinkOpening:
l := e.Link()
@@ -117,7 +112,7 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
h.connection.err.Set(e.Connection().RemoteCondition().Error())
case proton.MConnectionClosed:
- h.connection.err.Set(Closed) // If no error already set, this is an orderly close.
+ h.connectionClosed(proton.EndpointError(e.Connection()))
case proton.MDisconnected:
h.connection.err.Set(e.Transport().Condition().Error())
@@ -157,13 +152,36 @@ func (h *handler) incoming(in Incoming) {
}
}
+func (h *handler) addLink(pl proton.Link, el Link) {
+ h.links[pl] = el
+}
+
func (h *handler) linkClosed(l proton.Link, err error) {
- if link := h.links[l]; link != nil {
+ if link, ok := h.links[l]; ok {
link.closed(err)
delete(h.links, l)
}
}
-func (h *handler) addLink(rl proton.Link, ll Link) {
- h.links[rl] = ll
+func (h *handler) sessionClosed(ps proton.Session, err error) {
+ if s, ok := h.sessions[ps]; ok {
+ delete(h.sessions, ps)
+ err = s.closed(err)
+ for l, _ := range h.links {
+ if l.Session() == ps {
+ h.linkClosed(l, err)
+ }
+ }
+ }
+}
+
+func (h *handler) connectionClosed(err error) {
+ err = h.connection.closed(err)
+ // Close links first to avoid repeated scans of the link list by sessions.
+ for l, _ := range h.links {
+ h.linkClosed(l, err)
+ }
+ for s, _ := range h.sessions {
+ h.sessionClosed(s, err)
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/380f81d0/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
index 0b881c1..a48aeab 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
@@ -172,8 +172,8 @@ type Endpoint interface {
Type() string
}
-// CloseError sets an error condition on an endpoint and closes the endpoint
-// if not already closed
+// CloseError sets an error condition (if err != nil) on an endpoint and closes
+// the endpoint if not already closed
func CloseError(e Endpoint, err error) {
if err != nil {
e.Condition().SetError(err)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org