You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2015/01/16 23:37:20 UTC
[1/2] qpid-jms git commit: Move Connection create to the test case
and ensure proper cleanup
Repository: qpid-jms
Updated Branches:
refs/heads/master 568270b78 -> 5ebe40f01
Move Connection create to the test case and ensure proper cleanup
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/db522529
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/db522529
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/db522529
Branch: refs/heads/master
Commit: db522529cba2e24007a0da50c85b74bd78567fc8
Parents: 568270b
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Jan 16 17:33:29 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Jan 16 17:33:29 2015 -0500
----------------------------------------------------------------------
.../jms/JmsConnectionConcurrentCloseCallsTest.java | 14 ++++++++------
1 file changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/db522529/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionConcurrentCloseCallsTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionConcurrentCloseCallsTest.java b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionConcurrentCloseCallsTest.java
index 4bf4f73..8a5b31c 100644
--- a/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionConcurrentCloseCallsTest.java
+++ b/qpid-jms-interop-tests/qpid-jms-activemq-tests/src/test/java/org/apache/qpid/jms/JmsConnectionConcurrentCloseCallsTest.java
@@ -26,7 +26,6 @@ import java.util.concurrent.TimeUnit;
import javax.jms.Session;
-import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.support.AmqpTestSupport;
import org.junit.Test;
@@ -41,15 +40,16 @@ public class JmsConnectionConcurrentCloseCallsTest extends AmqpTestSupport {
super.setUp();
executor = Executors.newFixedThreadPool(20);
- connection = (JmsConnection) createAmqpConnection();
- connection.start();
}
@Override
public void tearDown() throws Exception {
- if (connection.isStarted()) {
- connection.stop();
- }
+ try {
+ if (connection != null) {
+ connection.close();
+ }
+ } catch (Exception ex) {}
+
if (executor != null) {
executor.shutdownNow();
}
@@ -59,6 +59,8 @@ public class JmsConnectionConcurrentCloseCallsTest extends AmqpTestSupport {
@Test(timeout=200000)
public void testCloseMultipleTimes() throws Exception {
+ connection = (JmsConnection) createAmqpConnection();
+ connection.start();
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
assertTrue(connection.isStarted());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/2] qpid-jms git commit: Remove the abstract provider class as we
only use it in one place and improve the naming of the serializer threads for
the AmqpProvider.
Posted by ta...@apache.org.
Remove the abstract provider class as we only use it in one place and
improve the naming of the serializer threads for the AmqpProvider.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/5ebe40f0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/5ebe40f0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/5ebe40f0
Branch: refs/heads/master
Commit: 5ebe40f012ff92286683ab81a56ad7b46fc30d09
Parents: db52252
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Jan 16 17:37:02 2015 -0500
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Jan 16 17:37:02 2015 -0500
----------------------------------------------------------------------
.../qpid/jms/provider/AbstractProvider.java | 100 -------------------
.../qpid/jms/provider/amqp/AmqpProvider.java | 79 ++++++++++++++-
2 files changed, 76 insertions(+), 103 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5ebe40f0/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AbstractProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AbstractProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AbstractProvider.java
deleted file mode 100644
index 9856aa8..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/AbstractProvider.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.provider;
-
-import java.io.IOException;
-import java.net.URI;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.qpid.jms.util.IOExceptionSupport;
-
-/**
- * Base class used to implement the most common features of a Provider instance..
- *
- * Methods that are fully optional such as transaction commit and rollback are implemented
- * here to throw an UnsupportedOperationException.
- */
-public abstract class AbstractProvider implements Provider {
-
- protected final URI remoteURI;
- protected final AtomicBoolean closed = new AtomicBoolean();
- protected final ScheduledExecutorService serializer;
-
- protected ProviderListener listener;
-
- public AbstractProvider(URI remoteURI) {
- this.remoteURI = remoteURI;
-
- this.serializer = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
-
- @Override
- public Thread newThread(Runnable runner) {
- Thread serial = new Thread(runner);
- serial.setDaemon(true);
- serial.setName(toString());
- return serial;
- }
- });
- }
-
- @Override
- public void start() throws IOException, IllegalStateException {
- checkClosed();
-
- if (listener == null) {
- throw new IllegalStateException("No ProviderListener registered.");
- }
- }
-
- @Override
- public void setProviderListener(ProviderListener listener) {
- this.listener = listener;
- }
-
- @Override
- public ProviderListener getProviderListener() {
- return listener;
- }
-
- @Override
- public URI getRemoteURI() {
- return remoteURI;
- }
-
- public void fireConnectionEstablished() {
- ProviderListener listener = this.listener;
- if (listener != null) {
- listener.onConnectionEstablished(remoteURI);
- }
- }
-
- public void fireProviderException(Throwable ex) {
- ProviderListener listener = this.listener;
- if (listener != null) {
- listener.onConnectionFailure(IOExceptionSupport.create(ex));
- }
- }
-
- protected void checkClosed() throws ProviderClosedException {
- if (closed.get()) {
- throw new ProviderClosedException("The Provider is already closed");
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/5ebe40f0/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index 61e84e7..dea1750 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -23,7 +23,12 @@ import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.JMSException;
@@ -42,10 +47,12 @@ import org.apache.qpid.jms.meta.JmsResourceVistor;
import org.apache.qpid.jms.meta.JmsSessionId;
import org.apache.qpid.jms.meta.JmsSessionInfo;
import org.apache.qpid.jms.meta.JmsTransactionInfo;
-import org.apache.qpid.jms.provider.AbstractProvider;
import org.apache.qpid.jms.provider.AsyncResult;
+import org.apache.qpid.jms.provider.Provider;
+import org.apache.qpid.jms.provider.ProviderClosedException;
import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
import org.apache.qpid.jms.provider.ProviderFuture;
+import org.apache.qpid.jms.provider.ProviderListener;
import org.apache.qpid.jms.transports.TransportFactory;
import org.apache.qpid.jms.transports.TransportListener;
import org.apache.qpid.jms.util.IOExceptionSupport;
@@ -74,7 +81,7 @@ import org.slf4j.LoggerFactory;
* All work within this Provider is serialized to a single Thread. Any asynchronous exceptions
* will be dispatched from that Thread and all in-bound requests are handled there as well.
*/
-public class AmqpProvider extends AbstractProvider implements TransportListener {
+public class AmqpProvider implements Provider, TransportListener {
private static final Logger LOG = LoggerFactory.getLogger(AmqpProvider.class);
@@ -85,7 +92,9 @@ public class AmqpProvider extends AbstractProvider implements TransportListener
// brokers that don't currently handle the unsigned range well.
private static final int DEFAULT_CHANNEL_MAX = 32767;
private static final String DEFAULT_TRANSPORT_KEY = "tcp";
+ private static final AtomicInteger PROVIDER_SEQUENCE = new AtomicInteger();
+ private ProviderListener listener;
private AmqpConnection connection;
private org.apache.qpid.jms.transports.Transport transport;
private String transportKey = DEFAULT_TRANSPORT_KEY;
@@ -99,6 +108,9 @@ public class AmqpProvider extends AbstractProvider implements TransportListener
private long sendTimeout = JmsConnectionInfo.DEFAULT_SEND_TIMEOUT;
private int channelMax = DEFAULT_CHANNEL_MAX;
+ private final URI remoteURI;
+ private final AtomicBoolean closed = new AtomicBoolean();
+ private final ScheduledExecutorService serializer;
private final Transport protonTransport = Transport.Factory.create();
private final Collector protonCollector = new CollectorImpl();
@@ -119,7 +131,22 @@ public class AmqpProvider extends AbstractProvider implements TransportListener
* The URI of the AMQP broker this Provider instance will connect to.
*/
public AmqpProvider(URI remoteURI, Map<String, String> extraOptions) {
- super(remoteURI);
+ this.remoteURI = remoteURI;
+
+ this.serializer = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+
+ @Override
+ public Thread newThread(Runnable runner) {
+ Thread serial = new Thread(runner);
+ serial.setDaemon(true);
+ serial.setName(AmqpProvider.this.getClass().getSimpleName() + ":(" +
+ PROVIDER_SEQUENCE.incrementAndGet() + "):[" +
+ getRemoteURI() + "]");
+ LOG.info("Provider thread name: {}", serial.getName());
+ return serial;
+ }
+ });
+
updateTracer();
}
@@ -137,6 +164,15 @@ public class AmqpProvider extends AbstractProvider implements TransportListener
}
@Override
+ public void start() throws IOException, IllegalStateException {
+ checkClosed();
+
+ if (listener == null) {
+ throw new IllegalStateException("No ProviderListener registered.");
+ }
+ }
+
+ @Override
public void close() {
if (closed.compareAndSet(false, true)) {
final ProviderFuture request = new ProviderFuture();
@@ -582,6 +618,8 @@ public class AmqpProvider extends AbstractProvider implements TransportListener
});
}
+ //---------- Event handlers and Utility methods -------------------------//
+
private void updateTracer() {
if (isTraceFrames()) {
((TransportImpl) protonTransport).setProtocolTracer(new ProtocolTracer() {
@@ -761,6 +799,26 @@ public class AmqpProvider extends AbstractProvider implements TransportListener
}
}
+ void fireConnectionEstablished() {
+ ProviderListener listener = this.listener;
+ if (listener != null) {
+ listener.onConnectionEstablished(remoteURI);
+ }
+ }
+
+ void fireProviderException(Throwable ex) {
+ ProviderListener listener = this.listener;
+ if (listener != null) {
+ listener.onConnectionFailure(IOExceptionSupport.create(ex));
+ }
+ }
+
+ private void checkClosed() throws ProviderClosedException {
+ if (closed.get()) {
+ throw new ProviderClosedException("This Provider is already closed");
+ }
+ }
+
//---------- Property Setters and Getters --------------------------------//
@Override
@@ -878,4 +936,19 @@ public class AmqpProvider extends AbstractProvider implements TransportListener
void setTransportKey(String transportKey) {
this.transportKey = transportKey;
}
+
+ @Override
+ public void setProviderListener(ProviderListener listener) {
+ this.listener = listener;
+ }
+
+ @Override
+ public ProviderListener getProviderListener() {
+ return listener;
+ }
+
+ @Override
+ public URI getRemoteURI() {
+ return remoteURI;
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org