You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ac...@apache.org on 2015/11/23 18:58:12 UTC

[44/50] [abbrv] qpid-proton git commit: NO-JIRA: Go: Fix cleanup of sessions by connection

NO-JIRA: Go: Fix cleanup of sessions by connection

More consistent handling for endpoint close.

Add endpoint valid check in all injected functions to catch injected functions
executed after an endpoint close.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/380f81d0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/380f81d0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/380f81d0

Branch: refs/heads/go1
Commit: 380f81d054d983bbf30fd116b9ee86d80ef66788
Parents: d221b27
Author: Alan Conway <ac...@redhat.com>
Authored: Fri Nov 20 10:01:55 2015 -0500
Committer: Alan Conway <ac...@redhat.com>
Committed: Mon Nov 23 10:05:57 2015 -0500

----------------------------------------------------------------------
 .../go/src/qpid.apache.org/electron/doc.go      | 11 ++++--
 .../go/src/qpid.apache.org/electron/endpoint.go | 24 ++++++------
 .../go/src/qpid.apache.org/electron/handler.go  | 40 ++++++++++++++------
 .../go/src/qpid.apache.org/proton/wrappers.go   |  4 +-
 4 files changed, 52 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/380f81d0/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go b/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
index a484900..46bde37 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/doc.go
@@ -49,10 +49,15 @@ import "C"
 
 There is a single proton.Engine per connection, each driving it's own event-loop goroutine,
 and each with a 'handler'. Most state for a connection is maintained on the handler, and
-only accessed in the event-loop goroutine, so no locks are required.
+only accessed in the event-loop goroutine, so no locks are required there.
 
 The handler sets up channels as needed to get or send data from user goroutines
-using electron types like Sender or Receiver. We also use Engine.Inject to inject
-actions into the event loop from user goroutines.
+using electron types like Sender or Receiver.
+
+We also use Engine.Inject to inject actions into the event loop from user
+goroutines. It is important to check at the start of an injected function that
+required objects are still valid, for example a link may be remotely closed
+between the time a Sender function calls Inject and the time the injected
+function is execute by the handler goroutine. See comments in endpoint.go for more.
 
 */

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/380f81d0/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go b/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
index f04b240..8cbeadb 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/endpoint.go
@@ -54,6 +54,10 @@ type Endpoint interface {
 	Done() <-chan struct{}
 }
 
+// DEVELOPER NOTES
+//
+// An electron.Endpoint corresponds to a proton.Endpoint, which can be invalidated
+//
 type endpoint struct {
 	err  proton.ErrorHolder
 	str  string // Must be set by the value that embeds endpoint.
@@ -62,10 +66,17 @@ type endpoint struct {
 
 func makeEndpoint(s string) endpoint { return endpoint{str: s, done: make(chan struct{})} }
 
-func (e *endpoint) closed(err error) {
+// Called in handler on a Closed event. Marks the endpoint as closed and the corresponding
+// proton.Endpoint pointer as invalid. Injected functions should check Error() to ensure
+// the pointer has not been invalidated.
+//
+// Returns the error stored on the endpoint, which may not be different to err if there was
+// already a n error
+func (e *endpoint) closed(err error) error {
 	e.err.Set(err)
 	e.err.Set(Closed)
 	close(e.done)
+	return e.err.Get()
 }
 
 func (e *endpoint) String() string { return e.str }
@@ -74,19 +85,10 @@ func (e *endpoint) Error() error { return e.err.Get() }
 
 func (e *endpoint) Done() <-chan struct{} { return e.done }
 
-// Call in proton goroutine to close an endpoint locally
+// Call in proton goroutine to initiate closing an endpoint locally
 // handler will complete the close when remote end closes.
 func localClose(ep proton.Endpoint, err error) {
 	if ep.State().LocalActive() {
 		proton.CloseError(ep, err)
 	}
 }
-
-// Used to indicate that a channel has closed which normally is because the endpoint is closed.
-func errorOrClosed(e Endpoint) error {
-	if e.Error() != nil {
-		return e.Error()
-	} else {
-		return Closed
-	}
-}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/380f81d0/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
index 1586026..0237156 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/electron/handler.go
@@ -49,6 +49,7 @@ func newHandler(c *connection) *handler {
 	h.delegator.AutoOpen = false
 	return h
 }
+
 func (h *handler) linkError(l proton.Link, msg string) {
 	proton.CloseError(l, amqp.Errorf(amqp.InternalError, "%s for %s %s", msg, l.Type(), l))
 }
@@ -83,13 +84,7 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
 		}
 
 	case proton.MSessionClosed:
-		err := proton.EndpointError(e.Session())
-		for l, _ := range h.links {
-			if l.Session() == e.Session() {
-				h.linkClosed(l, err)
-			}
-		}
-		delete(h.sessions, e.Session())
+		h.sessionClosed(e.Session(), proton.EndpointError(e.Session()))
 
 	case proton.MLinkOpening:
 		l := e.Link()
@@ -117,7 +112,7 @@ func (h *handler) HandleMessagingEvent(t proton.MessagingEvent, e proton.Event)
 		h.connection.err.Set(e.Connection().RemoteCondition().Error())
 
 	case proton.MConnectionClosed:
-		h.connection.err.Set(Closed) // If no error already set, this is an orderly close.
+		h.connectionClosed(proton.EndpointError(e.Connection()))
 
 	case proton.MDisconnected:
 		h.connection.err.Set(e.Transport().Condition().Error())
@@ -157,13 +152,36 @@ func (h *handler) incoming(in Incoming) {
 	}
 }
 
+func (h *handler) addLink(pl proton.Link, el Link) {
+	h.links[pl] = el
+}
+
 func (h *handler) linkClosed(l proton.Link, err error) {
-	if link := h.links[l]; link != nil {
+	if link, ok := h.links[l]; ok {
 		link.closed(err)
 		delete(h.links, l)
 	}
 }
 
-func (h *handler) addLink(rl proton.Link, ll Link) {
-	h.links[rl] = ll
+func (h *handler) sessionClosed(ps proton.Session, err error) {
+	if s, ok := h.sessions[ps]; ok {
+		delete(h.sessions, ps)
+		err = s.closed(err)
+		for l, _ := range h.links {
+			if l.Session() == ps {
+				h.linkClosed(l, err)
+			}
+		}
+	}
+}
+
+func (h *handler) connectionClosed(err error) {
+	err = h.connection.closed(err)
+	// Close links first to avoid repeated scans of the link list by sessions.
+	for l, _ := range h.links {
+		h.linkClosed(l, err)
+	}
+	for s, _ := range h.sessions {
+		h.sessionClosed(s, err)
+	}
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/380f81d0/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
----------------------------------------------------------------------
diff --git a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
index 0b881c1..a48aeab 100644
--- a/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
+++ b/proton-c/bindings/go/src/qpid.apache.org/proton/wrappers.go
@@ -172,8 +172,8 @@ type Endpoint interface {
 	Type() string
 }
 
-// CloseError sets an error condition on an endpoint and closes the endpoint
-// if not already closed
+// CloseError sets an error condition (if err != nil) on an endpoint and closes
+// the endpoint if not already closed
 func CloseError(e Endpoint, err error) {
 	if err != nil {
 		e.Condition().SetError(err)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org