You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/01/27 22:14:08 UTC

[2/3] qpid-jms git commit: Clean up a bit and remove some TODO values that are just notes at this point. Dispatch connection established on the executor so that client work that uses the established connection doesn't hold the provider thread hostage.

Clean up a bit and remove some TODO values that are just notes at this
point.  Dispatch connection established on the executor so that client
work that uses the established connection doesn't hold the provider
thread hostage.  Tag the executor thread with the connection ID for
easier correlation. 

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/6d501765
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/6d501765
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/6d501765

Branch: refs/heads/master
Commit: 6d50176541b2369e8a90624d634f640581ac2d0e
Parents: 95a1eb3
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Jan 27 15:46:58 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Jan 27 15:46:58 2015 -0500

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsConnection.java | 33 +++++++++++++-------
 1 file changed, 21 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6d501765/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index 1eaada5..dd4dbb6 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -111,7 +111,7 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti
     private final AtomicLong transactionIdGenerator = new AtomicLong();
     private JmsMessageFactory messageFactory;
 
-    protected JmsConnection(String connectionId, Provider provider, IdGenerator clientIdGenerator) throws JMSException {
+    protected JmsConnection(final String connectionId, Provider provider, IdGenerator clientIdGenerator) throws JMSException {
 
         // This executor can be used for dispatching asynchronous tasks that might block or result
         // in reentrant calls to this Connection that could block.  The thread in this executor
@@ -120,7 +120,8 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti
         executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
             @Override
             public Thread newThread(Runnable r) {
-                Thread thread = new Thread(r, "QpidJMS Connection Executor: ");
+                Thread thread = new Thread(r, "QpidJMS Connection Executor: " + connectionId);
+                thread.setDaemon(false);
                 return thread;
             }
         });
@@ -176,7 +177,7 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti
                         try {
                             request.sync();
                         } catch (Exception ex) {
-                            // TODO - Spec is a bit vague here, we don't fail if already closed but
+                            // NOTE - Spec is a bit vague here, we don't fail if already closed but
                             //        in this case we really aren't closed yet so there could be an
                             //        argument that at this point an exception is still valid.
                             if (ex.getCause() instanceof InterruptedException) {
@@ -219,7 +220,7 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti
      */
     protected void shutdown() throws JMSException {
 
-        // TODO - Once ConnectionConsumer is added we must shutdown those as well.
+        // NOTE - Once ConnectionConsumer is added we must shutdown those as well.
 
         for (JmsSession session : this.sessions) {
             session.shutdown();
@@ -334,9 +335,12 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti
         this.connectionInfo.setClientId(clientID);
         this.clientIdSet = true;
 
-        //We weren't connected if we got this far, we should now connect now to ensure the clientID is valid.
-        //TODO: determine if any resulting failure is only the result of the ClientID value, or other reasons such as auth.
+        // We weren't connected if we got this far, we should now connect to ensure the
+        // configured clientID is valid.
         connect();
+
+        // TODO: determine if any resulting failure is only the result of the ClientID value,
+        //       or other reasons such as auth.  (Provider should have thrown the correct error)
     }
 
     /**
@@ -1039,15 +1043,20 @@ public class JmsConnection implements Connection, TopicConnection, QueueConnecti
     }
 
     @Override
-    public void onConnectionEstablished(URI remoteURI) {
+    public void onConnectionEstablished(final URI remoteURI) {
         LOG.info("Connection {} connected to remote Broker: {}", connectionInfo.getConnectionId(), remoteURI);
         this.messageFactory = provider.getMessageFactory();
 
-        // TODO - For events triggered from the Provider thread, we might want to consider always
-        //        firing the client level events on the Connection executor to prevent the client
-        //        from stalling the provider thread.
-        for (JmsConnectionListener listener : connectionListeners) {
-            listener.onConnectionEstablished(remoteURI);
+        // Run the callbacks on the connection executor to allow the provider to return
+        // to its normal processing without waiting for client level processing to finish.
+        for (final JmsConnectionListener listener : connectionListeners) {
+            executor.submit(new Runnable() {
+
+                @Override
+                public void run() {
+                    listener.onConnectionEstablished(remoteURI);
+                }
+            });
         }
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org