You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/01/16 23:37:20 UTC

[1/2] qpid-jms git commit: Move Connection create to the test case and ensure proper cleanup

Repository: qpid-jms
Updated Branches:
  refs/heads/master 568270b78 -> 5ebe40f01


Move Connection create to the test case and ensure proper cleanup

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/db522529
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/db522529
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/db522529

Branch: refs/heads/master
Commit: db522529cba2e24007a0da50c85b74bd78567fc8
Parents: 568270b
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Jan 16 17:33:29 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Jan 16 17:33:29 2015 -0500

----------------------------------------------------------------------
 .../jms/JmsConnectionConcurrentCloseCallsTest.java    | 14 ++++++++------
 1 file changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/db522529/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionConcurrentCloseCallsTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionConcurrentCloseCallsTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionConcurrentCloseCallsTest.java
index 4bf4f73..8a5b31c 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionConcurrentCloseCallsTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionConcurrentCloseCallsTest.java
@@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit;
 
 import javax.jms.Session;
 
-import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.support.AmqpTestSupport;
 import org.junit.Test;
 
@@ -41,15 +40,16 @@ public class JmsConnectionConcurrentCloseCallsTest extends AmqpTestSupport {
         super.setUp();
 
         executor = Executors.newFixedThreadPool(20);
-        connection = (JmsConnection) createAmqpConnection();
-        connection.start();
     }
 
     @Override
     public void tearDown() throws Exception {
-        if (connection.isStarted()) {
-            connection.stop();
-        }
+        try {
+            if (connection != null) {
+                connection.close();
+            }
+        } catch (Exception ex) {}
+
         if (executor != null) {
             executor.shutdownNow();
         }
@@ -59,6 +59,8 @@ public class JmsConnectionConcurrentCloseCallsTest extends AmqpTestSupport {
 
     @Test(timeout=200000)
     public void testCloseMultipleTimes() throws Exception {
+        connection = (JmsConnection) createAmqpConnection();
+        connection.start();
         connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
         assertTrue(connection.isStarted());


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-jms git commit: Remove the abstract provider class as we only use it in one place and improve the naming of the serializer threads for the AmqpProvider.

Posted by ta...@apache.org.
Remove the abstract provider class as we only use it in one place and
improve the naming of the serializer threads for the AmqpProvider.  

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/5ebe40f0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/5ebe40f0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/5ebe40f0

Branch: refs/heads/master
Commit: 5ebe40f012ff92286683ab81a56ad7b46fc30d09
Parents: db52252
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Jan 16 17:37:02 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Jan 16 17:37:02 2015 -0500

----------------------------------------------------------------------
 .../qpid/jms/provider/AbstractProvider.java     | 100 -------------------
 .../qpid/jms/provider/amqp/AmqpProvider.java    |  79 ++++++++++++++-
 2 files changed, 76 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5ebe40f0/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AbstractProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AbstractProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AbstractProvider.java
deleted file mode 100644
index 9856aa8..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AbstractProvider.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.provider;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.qpid.jms.util.IOExceptionSupport;
-
-/**
- * Base class used to implement the most common features of a Provider instance..
- *
- * Methods that are fully optional such as transaction commit and rollback are implemented
- * here to throw an UnsupportedOperationException.
- */
-public abstract class AbstractProvider implements Provider {
-
-    protected final URI remoteURI;
-    protected final AtomicBoolean closed = new AtomicBoolean();
-    protected final ScheduledExecutorService serializer;
-
-    protected ProviderListener listener;
-
-    public AbstractProvider(URI remoteURI) {
-        this.remoteURI = remoteURI;
-
-        this.serializer = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
-
-            @Override
-            public Thread newThread(Runnable runner) {
-                Thread serial = new Thread(runner);
-                serial.setDaemon(true);
-                serial.setName(toString());
-                return serial;
-            }
-        });
-    }
-
-    @Override
-    public void start() throws IOException, IllegalStateException {
-        checkClosed();
-
-        if (listener == null) {
-            throw new IllegalStateException("No ProviderListener registered.");
-        }
-    }
-
-    @Override
-    public void setProviderListener(ProviderListener listener) {
-        this.listener = listener;
-    }
-
-    @Override
-    public ProviderListener getProviderListener() {
-        return listener;
-    }
-
-    @Override
-    public URI getRemoteURI() {
-        return remoteURI;
-    }
-
-    public void fireConnectionEstablished() {
-        ProviderListener listener = this.listener;
-        if (listener != null) {
-            listener.onConnectionEstablished(remoteURI);
-        }
-    }
-
-    public void fireProviderException(Throwable ex) {
-        ProviderListener listener = this.listener;
-        if (listener != null) {
-            listener.onConnectionFailure(IOExceptionSupport.create(ex));
-        }
-    }
-
-    protected void checkClosed() throws ProviderClosedException {
-        if (closed.get()) {
-            throw new ProviderClosedException("The Provider is already closed");
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5ebe40f0/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 61e84e7..dea1750 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -23,7 +23,12 @@ import java.io.IOException;
 import java.net.URI;
 import java.nio.ByteBuffer;
 import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.JMSException;
 
@@ -42,10 +47,12 @@ import org.apache.qpid.jms.meta.JmsResourceVistor;
 import org.apache.qpid.jms.meta.JmsSessionId;
 import org.apache.qpid.jms.meta.JmsSessionInfo;
 import org.apache.qpid.jms.meta.JmsTransactionInfo;
-import org.apache.qpid.jms.provider.AbstractProvider;
 import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.provider.Provider;
+import org.apache.qpid.jms.provider.ProviderClosedException;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
 import org.apache.qpid.jms.provider.ProviderFuture;
+import org.apache.qpid.jms.provider.ProviderListener;
 import org.apache.qpid.jms.transports.TransportFactory;
 import org.apache.qpid.jms.transports.TransportListener;
 import org.apache.qpid.jms.util.IOExceptionSupport;
@@ -74,7 +81,7 @@ import org.slf4j.LoggerFactory;
  * All work within this Provider is serialized to a single Thread.  Any asynchronous exceptions
  * will be dispatched from that Thread and all in-bound requests are handled there as well.
  */
-public class AmqpProvider extends AbstractProvider implements TransportListener {
+public class AmqpProvider implements Provider, TransportListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(AmqpProvider.class);
 
@@ -85,7 +92,9 @@ public class AmqpProvider extends AbstractProvider implements TransportListener
     //       brokers that don't currently handle the unsigned range well.
     private static final int DEFAULT_CHANNEL_MAX = 32767;
     private static final String DEFAULT_TRANSPORT_KEY = "tcp";
+    private static final AtomicInteger PROVIDER_SEQUENCE = new AtomicInteger();
 
+    private ProviderListener listener;
     private AmqpConnection connection;
     private org.apache.qpid.jms.transports.Transport transport;
     private String transportKey = DEFAULT_TRANSPORT_KEY;
@@ -99,6 +108,9 @@ public class AmqpProvider extends AbstractProvider implements TransportListener
     private long sendTimeout = JmsConnectionInfo.DEFAULT_SEND_TIMEOUT;
     private int channelMax = DEFAULT_CHANNEL_MAX;
 
+    private final URI remoteURI;
+    private final AtomicBoolean closed = new AtomicBoolean();
+    private final ScheduledExecutorService serializer;
     private final Transport protonTransport = Transport.Factory.create();
     private final Collector protonCollector = new CollectorImpl();
 
@@ -119,7 +131,22 @@ public class AmqpProvider extends AbstractProvider implements TransportListener
      *        The URI of the AMQP broker this Provider instance will connect to.
      */
     public AmqpProvider(URI remoteURI, Map<String, String> extraOptions) {
-        super(remoteURI);
+        this.remoteURI = remoteURI;
+
+        this.serializer = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+
+            @Override
+            public Thread newThread(Runnable runner) {
+                Thread serial = new Thread(runner);
+                serial.setDaemon(true);
+                serial.setName(AmqpProvider.this.getClass().getSimpleName() + ":(" +
+                               PROVIDER_SEQUENCE.incrementAndGet() + "):[" +
+                               getRemoteURI() + "]");
+                LOG.info("Provider thread name: {}", serial.getName());
+                return serial;
+            }
+        });
+
         updateTracer();
     }
 
@@ -137,6 +164,15 @@ public class AmqpProvider extends AbstractProvider implements TransportListener
     }
 
     @Override
+    public void start() throws IOException, IllegalStateException {
+        checkClosed();
+
+        if (listener == null) {
+            throw new IllegalStateException("No ProviderListener registered.");
+        }
+    }
+
+    @Override
     public void close() {
         if (closed.compareAndSet(false, true)) {
             final ProviderFuture request = new ProviderFuture();
@@ -582,6 +618,8 @@ public class AmqpProvider extends AbstractProvider implements TransportListener
         });
     }
 
+    //---------- Event handlers and Utility methods  -------------------------//
+
     private void updateTracer() {
         if (isTraceFrames()) {
             ((TransportImpl) protonTransport).setProtocolTracer(new ProtocolTracer() {
@@ -761,6 +799,26 @@ public class AmqpProvider extends AbstractProvider implements TransportListener
         }
     }
 
+    void fireConnectionEstablished() {
+        ProviderListener listener = this.listener;
+        if (listener != null) {
+            listener.onConnectionEstablished(remoteURI);
+        }
+    }
+
+    void fireProviderException(Throwable ex) {
+        ProviderListener listener = this.listener;
+        if (listener != null) {
+            listener.onConnectionFailure(IOExceptionSupport.create(ex));
+        }
+    }
+
+    private void checkClosed() throws ProviderClosedException {
+        if (closed.get()) {
+            throw new ProviderClosedException("This Provider is already closed");
+        }
+    }
+
     //---------- Property Setters and Getters --------------------------------//
 
     @Override
@@ -878,4 +936,19 @@ public class AmqpProvider extends AbstractProvider implements TransportListener
     void setTransportKey(String transportKey) {
         this.transportKey = transportKey;
     }
+
+    @Override
+    public void setProviderListener(ProviderListener listener) {
+        this.listener = listener;
+    }
+
+    @Override
+    public ProviderListener getProviderListener() {
+        return listener;
+    }
+
+    @Override
+    public URI getRemoteURI() {
+        return remoteURI;
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org