You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2018/10/26 20:50:42 UTC

[1/2] qpid-jms git commit: QPIDJMS-420 Allow for direct dispatch to target JMS MessageConsumer

Repository: qpid-jms
Updated Branches:
  refs/heads/master 50118e010 -> d96734e0f


QPIDJMS-420 Allow for direct dispatch to target JMS MessageConsumer

Allow the connection to be provided a direct path to the targeted
consumer of an inbound message as opposed to looking one up via the
session map and then the session's consumer map.  Preserves the legacy
lookups for compatibility with connection consumer paths.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/56b24dc4
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/56b24dc4
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/56b24dc4

Branch: refs/heads/master
Commit: 56b24dc43a58bdfdfdd7ed18d34d51db5c3b66e7
Parents: 50118e0
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Oct 26 15:55:08 2018 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Oct 26 15:55:35 2018 -0400

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/JmsConnection.java | 31 ++++++++---------
 .../org/apache/qpid/jms/JmsMessageConsumer.java |  2 +-
 .../apache/qpid/jms/meta/JmsConsumerInfo.java   | 12 +++++--
 .../qpid/jms/provider/amqp/AmqpConsumer.java    | 35 +++++++++-----------
 .../qpid/jms/meta/JmsConsumerInfoTest.java      | 33 +++++++++---------
 .../jms/meta/JmsDefaultResourceVisitorTest.java |  2 +-
 .../amqp/AmqpSubscriptionTrackerTest.java       |  2 +-
 .../provider/amqp/message/AmqpCodecTest.java    |  2 +-
 .../message/AmqpJmsMessageTypesTestCase.java    |  2 +-
 .../failover/FailoverProviderTestSupport.java   |  2 +-
 10 files changed, 64 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/56b24dc4/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index cf78fe0..a3537e3 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -469,7 +469,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
             messageQueue = new FifoMessageQueue(configuredPrefetch);
         }
 
-        JmsConsumerInfo consumerInfo = new JmsConsumerInfo(getNextConnectionConsumerId(), messageQueue);
+        JmsConsumerInfo consumerInfo = new JmsConsumerInfo(getNextConnectionConsumerId(), messageQueue, null);
         consumerInfo.setExplicitClientID(isExplicitClientID());
         consumerInfo.setSelector(messageSelector);
         consumerInfo.setDurable(durable);
@@ -1177,36 +1177,37 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
 
     @Override
     public void onInboundMessage(final JmsInboundMessageDispatch envelope) {
-
         JmsMessage incoming = envelope.getMessage();
         if (incoming != null) {
-            // Ensure incoming Messages are in readonly mode.
+            // Ensure incoming Messages are in read-only mode and configured properly
             incoming.setReadOnlyBody(true);
             incoming.setReadOnlyProperties(true);
-
             incoming.setValidatePropertyNames(isValidatePropertyNames());
         }
 
-        JmsMessageDispatcher dispatcher = sessions.get(envelope.getConsumerId().getParentId());
+        JmsMessageDispatcher dispatcher = null;
+
+        if (envelope.getConsumerInfo() != null && envelope.getConsumerInfo().getDispatcher() != null) {
+            dispatcher = envelope.getConsumerInfo().getDispatcher();
+        } else {
+            dispatcher = sessions.get(envelope.getConsumerId().getParentId());
+            if (dispatcher == null) {
+                dispatcher = connectionConsumers.get(envelope.getConsumerId());
+            }
+        }
+
         if (dispatcher != null) {
             dispatcher.onInboundMessage(envelope);
         } else {
-            dispatcher = connectionConsumers.get(envelope.getConsumerId());
-            if (dispatcher != null) {
-                dispatcher.onInboundMessage(envelope);
-            }
+            LOG.debug("Message inbound with no dispatcher registered for its consumer: {}", envelope.getConsumerId());
         }
 
         // Run the application callbacks on the connection executor to allow the provider to
         // return to its normal processing without waiting for client level processing to finish.
         if (!connectionListeners.isEmpty()) {
             for (final JmsConnectionListener listener : connectionListeners) {
-                executor.submit(new Runnable() {
-
-                    @Override
-                    public void run() {
-                        listener.onInboundMessage(envelope);
-                    }
+                executor.submit(() -> {
+                    listener.onInboundMessage(envelope);
                 });
             }
         }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/56b24dc4/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index 98caf28..5662b2e 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -100,7 +100,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
             this.messageQueue = new FifoMessageQueue(configuredPrefetch);
         }
 
-        consumerInfo = new JmsConsumerInfo(consumerId, messageQueue);
+        consumerInfo = new JmsConsumerInfo(consumerId, messageQueue, this);
         consumerInfo.setExplicitClientID(connection.isExplicitClientID());
         consumerInfo.setSelector(selector);
         consumerInfo.setDurable(isDurableSubscription());

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/56b24dc4/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
index b2c1d07..7addbb7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java
@@ -17,6 +17,7 @@
 package org.apache.qpid.jms.meta;
 
 import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.JmsMessageDispatcher;
 import org.apache.qpid.jms.policy.JmsDefaultDeserializationPolicy;
 import org.apache.qpid.jms.policy.JmsDefaultRedeliveryPolicy;
 import org.apache.qpid.jms.policy.JmsDeserializationPolicy;
@@ -49,16 +50,19 @@ public final class JmsConsumerInfo extends JmsAbstractResource implements Compar
     // Can be used to track the last consumed message.
     private transient long lastDeliveredSequenceId;
 
-    public JmsConsumerInfo(JmsConsumerId consumerId, MessageQueue messageQueue) {
+    private final JmsMessageDispatcher dispatcher;
+
+    public JmsConsumerInfo(JmsConsumerId consumerId, MessageQueue messageQueue, JmsMessageDispatcher dispatcher) {
         if (consumerId == null) {
             throw new IllegalArgumentException("Consumer ID cannot be null");
         }
         this.consumerId = consumerId;
         this.messageQueue = messageQueue;
+        this.dispatcher = dispatcher;
     }
 
     public JmsConsumerInfo copy() {
-        JmsConsumerInfo info = new JmsConsumerInfo(consumerId, messageQueue);
+        JmsConsumerInfo info = new JmsConsumerInfo(consumerId, messageQueue, dispatcher);
         copy(info);
         return info;
     }
@@ -245,6 +249,10 @@ public final class JmsConsumerInfo extends JmsAbstractResource implements Compar
         this.maxMessages = maxMessages;
     }
 
+    public JmsMessageDispatcher getDispatcher() {
+        return dispatcher;
+    }
+
     @Override
     public String toString() {
         return "JmsConsumerInfo: { " + getId() + ", destination = " + getDestination() + " }";

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/56b24dc4/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index c39b55d..d2d7c39 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -125,18 +125,15 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
                 // blocked receive or stop calls that are waiting, unless the consumer is
                 // a participant in a transaction in which case we will just fail the request
                 // and leave the consumer open since the TX needs it to remain active.
-                final ScheduledFuture<?> future = getSession().schedule(new Runnable() {
-                    @Override
-                    public void run() {
-                        LOG.trace("Consumer {} drain request timed out", getConsumerId());
-                        Exception cause = new JmsOperationTimedOutException("Remote did not respond to a drain request in time");
-                        if (session.isTransacted() && session.getTransactionContext().isInTransaction(getConsumerId())) {
-                            stopRequest.onFailure(cause);
-                            stopRequest = null;
-                        } else {
-                            closeResource(session.getProvider(), cause, false);
-                            session.getProvider().pumpToProtonTransport();
-                        }
+                final ScheduledFuture<?> future = getSession().schedule(() -> {
+                    LOG.trace("Consumer {} drain request timed out", getConsumerId());
+                    Exception cause = new JmsOperationTimedOutException("Remote did not respond to a drain request in time");
+                    if (session.isTransacted() && session.getTransactionContext().isInTransaction(getConsumerId())) {
+                        stopRequest.onFailure(cause);
+                        stopRequest = null;
+                    } else {
+                        closeResource(session.getProvider(), cause, false);
+                        session.getProvider().pumpToProtonTransport();
                     }
                 }, getDrainTimeout());
 
@@ -148,14 +145,11 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
     private void stopOnSchedule(long timeout, final AsyncResult request) {
         LOG.trace("Consumer {} scheduling stop", getConsumerId());
         // We need to drain the credit if no message(s) arrive to use it.
-        final ScheduledFuture<?> future = getSession().schedule(new Runnable() {
-            @Override
-            public void run() {
-                LOG.trace("Consumer {} running scheduled stop", getConsumerId());
-                if (getEndpoint().getRemoteCredit() != 0) {
-                    stop(request);
-                    session.getProvider().pumpToProtonTransport(request);
-                }
+        final ScheduledFuture<?> future = getSession().schedule(() -> {
+            LOG.trace("Consumer {} running scheduled stop", getConsumerId());
+            if (getEndpoint().getRemoteCredit() != 0) {
+                stop(request);
+                session.getProvider().pumpToProtonTransport(request);
             }
         }, timeout);
 
@@ -500,6 +494,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver
             JmsInboundMessageDispatch envelope = new JmsInboundMessageDispatch(getNextIncomingSequenceNumber());
             envelope.setMessage(message);
             envelope.setConsumerId(getResourceInfo().getId());
+            envelope.setConsumerInfo(getResourceInfo());
             // Store link to delivery in the hint for use in acknowledge requests.
             envelope.setProviderHint(incoming);
             envelope.setMessageId(message.getFacade().getProviderMessageIdObject());

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/56b24dc4/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java
index db8e0a8..0a2b62d 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsConsumerInfoTest.java
@@ -58,12 +58,12 @@ public class JmsConsumerInfoTest {
 
     @Test(expected=IllegalArgumentException.class)
     public void testExceptionWhenCreatedWithNullConnectionId() {
-        new JmsConsumerInfo(null, null);
+        new JmsConsumerInfo(null, null, null);
     }
 
     @Test
     public void testCreateFromConsumerId() {
-        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null);
+        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null, null);
         assertSame(firstId, info.getId());
         assertSame(firstId.getParentId(), info.getParentId());
         assertNotNull(info.toString());
@@ -71,7 +71,7 @@ public class JmsConsumerInfoTest {
 
     @Test
     public void testCopy() {
-        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null);
+        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null, null);
 
         info.setAcknowledgementMode(1);
         info.setBrowser(true);
@@ -108,7 +108,7 @@ public class JmsConsumerInfoTest {
 
     @Test
     public void testIsDurable() {
-        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null);
+        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null, null);
         assertFalse(info.isDurable());
         info.setDurable(true);
         assertTrue(info.isDurable());
@@ -116,7 +116,7 @@ public class JmsConsumerInfoTest {
 
     @Test
     public void testIsExplicitClientID() {
-        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null);
+        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null, null);
         assertFalse(info.isExplicitClientID());
         info.setExplicitClientID(true);
         assertTrue(info.isExplicitClientID());
@@ -124,7 +124,7 @@ public class JmsConsumerInfoTest {
 
     @Test
     public void testIsShared() {
-        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null);
+        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null, null);
         assertFalse(info.isShared());
         info.setShared(true);
         assertTrue(info.isShared());
@@ -134,7 +134,7 @@ public class JmsConsumerInfoTest {
     public void testGetSubscriptionName() {
         String subName = "name";
 
-        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null);
+        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null, null);
         assertNull(info.getSubscriptionName());
         info.setSubscriptionName(subName);
         assertEquals(subName, info.getSubscriptionName());
@@ -142,8 +142,8 @@ public class JmsConsumerInfoTest {
 
     @Test
     public void testCompareTo() {
-        JmsConsumerInfo first = new JmsConsumerInfo(firstId, null);
-        JmsConsumerInfo second = new JmsConsumerInfo(secondId, null);
+        JmsConsumerInfo first = new JmsConsumerInfo(firstId, null, null);
+        JmsConsumerInfo second = new JmsConsumerInfo(secondId, null, null);
 
         assertEquals(-1, first.compareTo(second));
         assertEquals(0, first.compareTo(first));
@@ -152,8 +152,8 @@ public class JmsConsumerInfoTest {
 
     @Test
     public void testHashCode() {
-        JmsConsumerInfo first = new JmsConsumerInfo(firstId, null);
-        JmsConsumerInfo second = new JmsConsumerInfo(secondId, null);
+        JmsConsumerInfo first = new JmsConsumerInfo(firstId, null, null);
+        JmsConsumerInfo second = new JmsConsumerInfo(secondId, null, null);
 
         assertEquals(first.hashCode(), first.hashCode());
         assertEquals(second.hashCode(), second.hashCode());
@@ -161,10 +161,11 @@ public class JmsConsumerInfoTest {
         assertFalse(first.hashCode() == second.hashCode());
     }
 
+    @SuppressWarnings("unlikely-arg-type")
     @Test
     public void testEqualsCode() {
-        JmsConsumerInfo first = new JmsConsumerInfo(firstId, null);
-        JmsConsumerInfo second = new JmsConsumerInfo(secondId, null);
+        JmsConsumerInfo first = new JmsConsumerInfo(firstId, null, null);
+        JmsConsumerInfo second = new JmsConsumerInfo(secondId, null, null);
 
         assertEquals(first, first);
         assertEquals(second, second);
@@ -178,7 +179,7 @@ public class JmsConsumerInfoTest {
 
     @Test
     public void testVisit() throws Exception {
-        final JmsConsumerInfo first = new JmsConsumerInfo(firstId, null);
+        final JmsConsumerInfo first = new JmsConsumerInfo(firstId, null, null);
 
         final AtomicBoolean visited = new AtomicBoolean();
 
@@ -196,7 +197,7 @@ public class JmsConsumerInfoTest {
 
     @Test
     public void testGetRedeliveryPolicyDefaults() {
-        final JmsConsumerInfo info = new JmsConsumerInfo(firstId, null);
+        final JmsConsumerInfo info = new JmsConsumerInfo(firstId, null, null);
 
         assertNotNull(info.getRedeliveryPolicy());
         info.setRedeliveryPolicy(null);
@@ -206,7 +207,7 @@ public class JmsConsumerInfoTest {
 
     @Test
     public void testIsListener() {
-        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null);
+        JmsConsumerInfo info = new JmsConsumerInfo(firstId, null, null);
         assertFalse(info.isListener());
         info.setListener(true);
         assertTrue(info.isListener());

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/56b24dc4/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsDefaultResourceVisitorTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsDefaultResourceVisitorTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsDefaultResourceVisitorTest.java
index 00bf2f4..39f5895 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsDefaultResourceVisitorTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsDefaultResourceVisitorTest.java
@@ -48,7 +48,7 @@ public class JmsDefaultResourceVisitorTest {
         JmsDefaultResourceVisitor visitor = new JmsDefaultResourceVisitor();
         visitor.processConnectionInfo(new JmsConnectionInfo(connectionId));
         visitor.processSessionInfo(new JmsSessionInfo(sessionId));
-        visitor.processConsumerInfo(new JmsConsumerInfo(consumerId, null));
+        visitor.processConsumerInfo(new JmsConsumerInfo(consumerId, null, null));
         visitor.processProducerInfo(new JmsProducerInfo(producerId));
         visitor.processDestination(new JmsTemporaryTopic("Test"));
         visitor.processTransactionInfo(new JmsTransactionInfo(sessionId, transactionId));

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/56b24dc4/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTrackerTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTrackerTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTrackerTest.java
index f60af09..755bd21 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTrackerTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSubscriptionTrackerTest.java
@@ -45,7 +45,7 @@ public class AmqpSubscriptionTrackerTest {
         JmsConsumerId consumerId = new JmsConsumerId("ID:MOCK:1", 1, consumerIdCounter.incrementAndGet());
         JmsTopic topic = new JmsTopic(topicName);
 
-        JmsConsumerInfo consumerInfo = new JmsConsumerInfo(consumerId, null);
+        JmsConsumerInfo consumerInfo = new JmsConsumerInfo(consumerId, null, null);
 
         consumerInfo.setSubscriptionName(subscriptionName);
         consumerInfo.setDestination(topic);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/56b24dc4/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodecTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodecTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodecTest.java
index 472d756..858cc7f 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodecTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpCodecTest.java
@@ -84,7 +84,7 @@ public class AmqpCodecTest extends QpidJmsTestCase {
         JmsConsumerId consumerId = new JmsConsumerId("ID:MOCK:1", 1, 1);
         mockConnection = Mockito.mock(AmqpConnection.class);
         mockConsumer = Mockito.mock(AmqpConsumer.class);
-        Mockito.when(mockConsumer.getResourceInfo()).thenReturn(new JmsConsumerInfo(consumerId, null));
+        Mockito.when(mockConsumer.getResourceInfo()).thenReturn(new JmsConsumerInfo(consumerId, null, null));
     }
 
     //----- AmqpHeader encode and decode -------------------------------------//

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/56b24dc4/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageTypesTestCase.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageTypesTestCase.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageTypesTestCase.java
index dede595..1db2744 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageTypesTestCase.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageTypesTestCase.java
@@ -131,7 +131,7 @@ public class AmqpJmsMessageTypesTestCase extends QpidJmsTestCase {
         AmqpConsumer consumer = Mockito.mock(AmqpConsumer.class);
         Mockito.when(consumer.getConnection()).thenReturn(connection);
         Mockito.when(consumer.getDestination()).thenReturn(consumerDestination);
-        Mockito.when(consumer.getResourceInfo()).thenReturn(new JmsConsumerInfo(consumerId, null));
+        Mockito.when(consumer.getResourceInfo()).thenReturn(new JmsConsumerInfo(consumerId, null, null));
         return consumer;
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/56b24dc4/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTestSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTestSupport.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTestSupport.java
index 19f6d3e..ebce5d7 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTestSupport.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverProviderTestSupport.java
@@ -78,7 +78,7 @@ public class FailoverProviderTestSupport extends QpidJmsTestCase {
 
     protected JmsConsumerInfo createConsumerInfo(JmsSessionInfo session) {
         JmsConsumerId id = new JmsConsumerId(session.getId(), nextConsumerId.incrementAndGet());
-        JmsConsumerInfo consumer = new JmsConsumerInfo(id, null);
+        JmsConsumerInfo consumer = new JmsConsumerInfo(id, null, null);
         return consumer;
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/2] qpid-jms git commit: QPIDJMS-420 Refactor the MessageConsumer prefetch queue

Posted by ta...@apache.org.
QPIDJMS-420 Refactor the MessageConsumer prefetch queue

Refactor the MessageQueue based implementations to remove some unused
APIs and simplify the interface.  Improves performance of the default
FifoMessageQueue and separates the implementation from the priority
based variant to allow for more efficiencies to be added in.


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/d96734e0
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/d96734e0
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/d96734e0

Branch: refs/heads/master
Commit: d96734e0f4e3c11245e3aae774ef112b158bacfe
Parents: 56b24dc
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Oct 26 16:44:23 2018 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Oct 26 16:44:23 2018 -0400

----------------------------------------------------------------------
 .../qpid/jms/util/AbstractMessageQueue.java     | 148 -----------------
 .../apache/qpid/jms/util/FifoMessageQueue.java  | 158 +++++++++++++++----
 .../org/apache/qpid/jms/util/MessageQueue.java  |  43 ++---
 .../qpid/jms/util/PriorityMessageQueue.java     | 131 ++++++++++-----
 .../qpid/jms/util/FifoMessageQueueTest.java     | 118 ++++++--------
 .../qpid/jms/util/PriorityMessageQueueTest.java | 115 +++++---------
 6 files changed, 308 insertions(+), 405 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d96734e0/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/AbstractMessageQueue.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/AbstractMessageQueue.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/AbstractMessageQueue.java
deleted file mode 100644
index 788aca3..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/AbstractMessageQueue.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms.util;
-
-import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
-
-/**
- * Abstract Message Queue class used to implement the common functions of a Message Queue
- * instance.
- */
-public abstract class AbstractMessageQueue implements MessageQueue {
-
-    private volatile boolean closed;
-    private volatile boolean running;
-    private final Object lock = new Object();
-
-    private int waiters;
-
-    @Override
-    public final JmsInboundMessageDispatch peek() {
-        synchronized (lock) {
-            return peekFirst();
-        }
-    }
-
-    @Override
-    public final JmsInboundMessageDispatch dequeue(long timeout) throws InterruptedException {
-        synchronized (lock) {
-            // Wait until the consumer is ready to deliver messages.
-            while (timeout != 0 && !closed && isEmpty() && running) {
-                waiters++;
-                try {
-                    if (timeout == -1) {
-                        lock.wait();
-                    } else {
-                        long start = System.currentTimeMillis();
-                        lock.wait(timeout);
-                        timeout = Math.max(timeout + start - System.currentTimeMillis(), 0);
-                    }
-                } finally {
-                    waiters--;
-                }
-            }
-
-            if (closed || !running || isEmpty()) {
-                return null;
-            }
-
-            return removeFirst();
-        }
-    }
-
-    @Override
-    public final JmsInboundMessageDispatch dequeueNoWait() {
-        synchronized (lock) {
-            if (closed || !running || isEmpty()) {
-                return null;
-            }
-            return removeFirst();
-        }
-    }
-
-    @Override
-    public final void start() {
-        synchronized (lock) {
-            if (!closed) {
-                running = true;
-            }
-
-            if (hasWaiters()) {
-                lock.notifyAll();
-            }
-        }
-    }
-
-    @Override
-    public final void stop() {
-        synchronized (lock) {
-            running = false;
-            if (hasWaiters()) {
-                lock.notifyAll();
-            }
-        }
-    }
-
-    @Override
-    public final boolean isRunning() {
-        return running;
-    }
-
-    @Override
-    public final void close() {
-        synchronized (lock) {
-            running = false;
-            closed = true;
-            if (hasWaiters()) {
-                lock.notifyAll();
-            }
-        }
-    }
-
-    @Override
-    public final boolean isClosed() {
-        return closed;
-    }
-
-    @Override
-    public final Object getLock() {
-        return lock;
-    }
-
-    protected boolean hasWaiters() {
-        return waiters > 0;
-    }
-
-    /**
-     * Removes and returns the first entry in the implementation queue.  This method
-     * is always called under lock and does not need to protect itself or check running
-     * state etc.
-     *
-     * @return the first message queued in the implemented queue.
-     */
-    protected abstract JmsInboundMessageDispatch removeFirst();
-
-    /**
-     * Returns but does not remove the first entry in the implementation queue.  This method
-     * is always called under lock and does not need to protect itself or check running
-     * state etc.
-     *
-     * @return the first message queued in the implemented queue.
-     */
-    protected abstract JmsInboundMessageDispatch peekFirst();
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d96734e0/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java
index 40f3961..83fa532 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/FifoMessageQueue.java
@@ -17,16 +17,30 @@
 package org.apache.qpid.jms.util;
 
 import java.util.ArrayDeque;
-import java.util.ArrayList;
 import java.util.Deque;
-import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 
 import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
 
 /**
  * Simple first in / first out Message Queue.
  */
-public final class FifoMessageQueue extends AbstractMessageQueue {
+public final class FifoMessageQueue implements MessageQueue {
+
+    protected static final AtomicIntegerFieldUpdater<FifoMessageQueue> STATE_FIELD_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(FifoMessageQueue.class, "state");
+
+    protected static final int CLOSED = 0;
+    protected static final int STOPPED = 1;
+    protected static final int RUNNING = 2;
+
+    private volatile int state = STOPPED;
+
+    protected final ReentrantLock lock = new ReentrantLock();
+    protected final Condition condition = lock.newCondition();
 
     protected final Deque<JmsInboundMessageDispatch> queue;
 
@@ -36,71 +50,149 @@ public final class FifoMessageQueue extends AbstractMessageQueue {
 
     @Override
     public void enqueueFirst(JmsInboundMessageDispatch envelope) {
-        synchronized (getLock()) {
+        lock.lock();
+        try {
             queue.addFirst(envelope);
-            if (hasWaiters()) {
-                getLock().notify();
-            }
+            condition.signal();
+        } finally {
+            lock.unlock();
         }
     }
 
     @Override
     public void enqueue(JmsInboundMessageDispatch envelope) {
-        synchronized (getLock()) {
+        lock.lock();
+        try {
             queue.addLast(envelope);
-            if (hasWaiters()) {
-                getLock().notify();
+            condition.signal();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+
+    @Override
+    public JmsInboundMessageDispatch dequeue(long timeout) throws InterruptedException {
+        lock.lock();
+        try {
+            // Wait until the consumer is ready to deliver messages.
+            while (timeout != 0 && isRunning() && queue.isEmpty()) {
+                if (timeout == -1) {
+                    condition.await();
+                } else {
+                    long start = System.currentTimeMillis();
+                    condition.await(timeout, TimeUnit.MILLISECONDS);
+                    timeout = Math.max(timeout + start - System.currentTimeMillis(), 0);
+                }
+            }
+
+            if (!isRunning()) {
+                return null;
             }
+
+            return queue.pollFirst();
+        } finally {
+            lock.unlock();
         }
     }
 
     @Override
-    public boolean isEmpty() {
-        synchronized (getLock()) {
-            return queue.isEmpty();
+    public final JmsInboundMessageDispatch dequeueNoWait() {
+        lock.lock();
+        try {
+            if (!isRunning()) {
+                return null;
+            }
+
+            return queue.pollFirst();
+        } finally {
+            lock.unlock();
         }
     }
 
     @Override
-    public int size() {
-        synchronized (getLock()) {
-            return queue.size();
+    public final void start() {
+        if (STATE_FIELD_UPDATER.compareAndSet(this, STOPPED, RUNNING)) {
+            lock.lock();
+            try {
+                condition.signalAll();
+            } finally {
+                lock.unlock();
+            }
         }
     }
 
     @Override
-    public void clear() {
-        synchronized (getLock()) {
-            queue.clear();
+    public final void stop() {
+        if (STATE_FIELD_UPDATER.compareAndSet(this, RUNNING, STOPPED)) {
+            lock.lock();
+            try {
+                condition.signalAll();
+            } finally {
+                lock.unlock();
+            }
         }
     }
 
     @Override
-    public List<JmsInboundMessageDispatch> removeAll() {
-        synchronized (getLock()) {
-            ArrayList<JmsInboundMessageDispatch> rc = new ArrayList<JmsInboundMessageDispatch>(queue.size());
-            for (JmsInboundMessageDispatch entry : queue) {
-                rc.add(entry);
+    public final void close() {
+        if (STATE_FIELD_UPDATER.getAndSet(this, CLOSED) > CLOSED) {
+            lock.lock();
+            try {
+                condition.signalAll();
+            } finally {
+                lock.unlock();
             }
-            queue.clear();
-            return rc;
         }
     }
 
     @Override
-    public String toString() {
-        synchronized (getLock()) {
-            return queue.toString();
+    public final boolean isRunning() {
+        return state == RUNNING;
+    }
+
+    @Override
+    public final boolean isClosed() {
+        return state == CLOSED;
+    }
+
+    @Override
+    public boolean isEmpty() {
+        lock.lock();
+        try {
+            return queue.isEmpty();
+        } finally {
+            lock.unlock();
         }
     }
 
     @Override
-    protected JmsInboundMessageDispatch removeFirst() {
-        return queue.removeFirst();
+    public int size() {
+        lock.lock();
+        try {
+            return queue.size();
+        } finally {
+            lock.unlock();
+        }
     }
 
     @Override
-    protected JmsInboundMessageDispatch peekFirst() {
-        return queue.peekFirst();
+    public void clear() {
+        lock.lock();
+        try {
+            queue.clear();
+        } finally {
+            lock.unlock();
+        }
+    }
+
+    @Override
+    public String toString() {
+        lock.lock();
+        try {
+            return queue.toString();
+        } finally {
+            lock.unlock();
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d96734e0/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/MessageQueue.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/MessageQueue.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/MessageQueue.java
index 1279b1a..9fcb0b6 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/MessageQueue.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/MessageQueue.java
@@ -16,8 +16,6 @@
  */
 package org.apache.qpid.jms.util;
 
-import java.util.List;
-
 import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
 
 /**
@@ -42,18 +40,6 @@ public interface MessageQueue {
     void enqueueFirst(JmsInboundMessageDispatch envelope);
 
     /**
-     * @return true if there are no messages in the queue.
-     */
-    boolean isEmpty();
-
-    /**
-     * Return but do not remove the first element in the Message queue.
-     *
-     * @return the first element in the Queue or null if empty.
-     */
-    JmsInboundMessageDispatch peek();
-
-    /**
      * Used to get an enqueued message. The amount of time this method blocks is
      * based on the timeout value. - if timeout==-1 then it blocks until a
      * message is received. - if timeout==0 then it it tries to not block at
@@ -85,27 +71,32 @@ public interface MessageQueue {
 
     /**
      * Stops the Message Queue.  Messages cannot be read from the Queue when it is in
-     * the stopped state and any waiters will be awoken.
+     * the stopped state and any waiters will be woken.
      */
     void stop();
 
     /**
-     * @return true if the Queue is not in the stopped or closed state.
-     */
-    boolean isRunning();
-
-    /**
      * Closes the Message Queue.  No messages can be added or removed from the Queue
      * once it has entered the closed state.
      */
     void close();
 
     /**
+     * @return true if the Queue is not in the stopped or closed state.
+     */
+    boolean isRunning();
+
+    /**
      * @return true if the Queue has been closed.
      */
     boolean isClosed();
 
     /**
+     * @return true if there are no messages in the queue.
+     */
+    boolean isEmpty();
+
+    /**
      * Returns the number of Messages currently in the Queue.  This value is only
      * meaningful at the time of the call as the size of the Queue changes rapidly
      * as Messages arrive and are consumed.
@@ -119,16 +110,4 @@ public interface MessageQueue {
      */
     void clear();
 
-    /**
-     * Removes and returns all Messages in the Queue.
-     *
-     * @return a List containing all Messages removed from the Queue.
-     */
-    List<JmsInboundMessageDispatch> removeAll();
-
-    /**
-     * @return the lock object used to protect against concurrent access.
-     */
-    Object getLock();
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d96734e0/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PriorityMessageQueue.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PriorityMessageQueue.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PriorityMessageQueue.java
index 88b7cf4..5a3fb93 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PriorityMessageQueue.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/util/PriorityMessageQueue.java
@@ -16,9 +16,8 @@
  */
 package org.apache.qpid.jms.util;
 
-import java.util.ArrayList;
 import java.util.LinkedList;
-import java.util.List;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
 
 import javax.jms.JMSException;
 
@@ -29,7 +28,18 @@ import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
  * Queue based on their priority value, except where {@link #enqueueFirst} is
  * used.
  */
-public final class PriorityMessageQueue extends AbstractMessageQueue {
+public final class PriorityMessageQueue implements MessageQueue {
+
+    protected static final AtomicIntegerFieldUpdater<PriorityMessageQueue> STATE_FIELD_UPDATER =
+            AtomicIntegerFieldUpdater.newUpdater(PriorityMessageQueue.class, "state");
+
+    protected static final int CLOSED = 0;
+    protected static final int STOPPED = 1;
+    protected static final int RUNNING = 2;
+
+    private volatile int state = STOPPED;
+
+    protected final Object lock = new Object();
 
     // There are 10 priorities, values 0-9
     private static final Integer MAX_PRIORITY = 9;
@@ -47,85 +57,122 @@ public final class PriorityMessageQueue extends AbstractMessageQueue {
 
     @Override
     public void enqueue(JmsInboundMessageDispatch envelope) {
-        synchronized (getLock()) {
+        synchronized (lock) {
             getList(envelope).addLast(envelope);
             this.size++;
-            if (hasWaiters()) {
-                getLock().notify();
-            }
+            lock.notify();
         }
     }
 
     @Override
     public void enqueueFirst(JmsInboundMessageDispatch envelope) {
-        synchronized (getLock()) {
+        synchronized (lock) {
             getList(MAX_PRIORITY).addFirst(envelope);
             this.size++;
-            if (hasWaiters()) {
-                getLock().notify();
-            }
+            lock.notify();
         }
     }
 
     @Override
-    public boolean isEmpty() {
-        synchronized (getLock()) {
-            return size == 0;
+    public final JmsInboundMessageDispatch dequeue(long timeout) throws InterruptedException {
+        synchronized (lock) {
+            // Wait until the consumer is ready to deliver messages.
+            while (timeout != 0 && isRunning() && isEmpty()) {
+                if (timeout == -1) {
+                    lock.wait();
+                } else {
+                    long start = System.currentTimeMillis();
+                    lock.wait(timeout);
+                    timeout = Math.max(timeout + start - System.currentTimeMillis(), 0);
+                }
+            }
+
+            if (!isRunning() || isEmpty()) {
+                return null;
+            }
+
+            return removeFirst();
         }
     }
 
     @Override
-    public int size() {
-        synchronized (getLock()) {
-            return size;
+    public final JmsInboundMessageDispatch dequeueNoWait() {
+        synchronized (lock) {
+            if (!isRunning() || isEmpty()) {
+                return null;
+            }
+            return removeFirst();
         }
     }
 
     @Override
-    public void clear() {
-        synchronized (getLock()) {
-            for (int i = 0; i <= MAX_PRIORITY; i++) {
-                lists[i].clear();
+    public final void start() {
+        if (STATE_FIELD_UPDATER.compareAndSet(this, STOPPED, RUNNING)) {
+            synchronized (lock) {
+                lock.notifyAll();
             }
-            this.size = 0;
         }
     }
 
     @Override
-    public List<JmsInboundMessageDispatch> removeAll() {
-        synchronized (getLock()) {
-            ArrayList<JmsInboundMessageDispatch> result = new ArrayList<JmsInboundMessageDispatch>(size());
-            for (int i = MAX_PRIORITY; i >= 0; i--) {
-                List<JmsInboundMessageDispatch> list = lists[i];
-                result.addAll(list);
-                size -= list.size();
-                list.clear();
+    public final void stop() {
+        if (STATE_FIELD_UPDATER.compareAndSet(this, RUNNING, STOPPED)) {
+            synchronized (lock) {
+                lock.notifyAll();
             }
-            return result;
         }
     }
 
     @Override
-    protected JmsInboundMessageDispatch removeFirst() {
-        if (this.size > 0) {
-            for (int i = MAX_PRIORITY; i >= 0; i--) {
-                LinkedList<JmsInboundMessageDispatch> list = lists[i];
-                if (!list.isEmpty()) {
-                    this.size--;
-                    return list.removeFirst();
-                }
+    public final void close() {
+        if (STATE_FIELD_UPDATER.getAndSet(this, CLOSED) > CLOSED) {
+            synchronized (lock) {
+                lock.notifyAll();
             }
         }
-        return null;
     }
 
     @Override
-    protected JmsInboundMessageDispatch peekFirst() {
+    public final boolean isRunning() {
+        return state == RUNNING;
+    }
+
+    @Override
+    public final boolean isClosed() {
+        return state == CLOSED;
+    }
+
+    @Override
+    public boolean isEmpty() {
+        synchronized (lock) {
+            return size == 0;
+        }
+    }
+
+    @Override
+    public int size() {
+        synchronized (lock) {
+            return size;
+        }
+    }
+
+    @Override
+    public void clear() {
+        synchronized (lock) {
+            for (int i = 0; i <= MAX_PRIORITY; i++) {
+                lists[i].clear();
+            }
+            this.size = 0;
+        }
+    }
+
+    private JmsInboundMessageDispatch removeFirst() {
         if (this.size > 0) {
             for (int i = MAX_PRIORITY; i >= 0; i--) {
                 LinkedList<JmsInboundMessageDispatch> list = lists[i];
                 if (!list.isEmpty()) {
-                    return list.peekFirst();
+                    this.size--;
+                    return list.removeFirst();
                 }
             }
         }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d96734e0/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/FifoMessageQueueTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/FifoMessageQueueTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/FifoMessageQueueTest.java
index 3d3416e..590e9e9 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/FifoMessageQueueTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/FifoMessageQueueTest.java
@@ -23,10 +23,13 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
 
 import javax.jms.JMSException;
 
@@ -41,7 +44,7 @@ import org.junit.Test;
  */
 public class FifoMessageQueueTest {
 
-    private MessageQueue queue;
+    private FifoMessageQueue queue;
     private final IdGenerator messageId = new IdGenerator();
     private long sequence;
 
@@ -57,11 +60,6 @@ public class FifoMessageQueueTest {
     }
 
     @Test
-    public void testGetLock() {
-        assertNotNull(queue.getLock());
-    }
-
-    @Test
     public void testCreate() {
         FifoMessageQueue queue = new FifoMessageQueue(1000);
 
@@ -159,26 +157,6 @@ public class FifoMessageQueueTest {
     }
 
     @Test
-    public void testRemoveAll() throws JMSException {
-        List<JmsInboundMessageDispatch> messages = createFullRangePrioritySet();
-        Collections.shuffle(messages);
-
-        for (JmsInboundMessageDispatch envelope: messages) {
-            queue.enqueue(envelope);
-        }
-
-        assertFalse(queue.isEmpty());
-        List<JmsInboundMessageDispatch> result = queue.removeAll();
-        assertTrue(queue.isEmpty());
-
-        assertEquals(10, result.size());
-
-        for (byte i = 0; i < 10; ++i) {
-            assertEquals(result.get(i), messages.get(i));
-        }
-    }
-
-    @Test
     public void testRemoveFirstOnEmptyQueue() {
         assertNull(queue.dequeueNoWait());
     }
@@ -216,48 +194,6 @@ public class FifoMessageQueueTest {
         assertTrue(queue.isEmpty());
     }
 
-    @Test
-    public void testPeekOnEmptyQueue() {
-        assertNull(queue.peek());
-    }
-
-    @Test
-    public void testPeekFirst() throws JMSException {
-        List<JmsInboundMessageDispatch> messages = createFullRangePrioritySet();
-        Collections.shuffle(messages);
-
-        for (JmsInboundMessageDispatch envelope: messages) {
-            queue.enqueue(envelope);
-        }
-
-        for (byte i = 0; i < 10; ++i) {
-            JmsInboundMessageDispatch first = queue.peek();
-            assertEquals(first, messages.get(i));
-            queue.dequeueNoWait();
-        }
-
-        assertTrue(queue.isEmpty());
-    }
-
-    @Test
-    public void testPeekFirstSparse() throws JMSException {
-        queue.enqueue(createEnvelope(9));
-        queue.enqueue(createEnvelope(4));
-        queue.enqueue(createEnvelope(1));
-
-        JmsInboundMessageDispatch envelope = queue.peek();
-        assertEquals(9, envelope.getMessage().getJMSPriority());
-        queue.dequeueNoWait();
-        envelope = queue.peek();
-        assertEquals(4, envelope.getMessage().getJMSPriority());
-        queue.dequeueNoWait();
-        envelope = queue.peek();
-        assertEquals(1, envelope.getMessage().getJMSPriority());
-        queue.dequeueNoWait();
-
-        assertTrue(queue.isEmpty());
-    }
-
     @Test(timeout = 10000)
     public void testDequeueWaitsUntilMessageArrives() throws InterruptedException {
         final JmsInboundMessageDispatch message = createEnvelope();
@@ -278,16 +214,16 @@ public class FifoMessageQueueTest {
     }
 
     @Test(timeout = 10000)
-    public void testDequeueWaitsUntilMessageArrivesWhenLockNotified() throws InterruptedException {
+    public void testDequeueWaitsUntilMessageArrivesWhenLockNotified() throws Exception {
         doDequeueWaitsUntilMessageArrivesWhenLockNotifiedTestImpl(-1);
     }
 
     @Test(timeout = 10000)
-    public void testTimedDequeueWaitsUntilMessageArrivesWhenLockNotified() throws InterruptedException {
+    public void testTimedDequeueWaitsUntilMessageArrivesWhenLockNotified() throws Exception {
         doDequeueWaitsUntilMessageArrivesWhenLockNotifiedTestImpl(100000);
     }
 
-    private void doDequeueWaitsUntilMessageArrivesWhenLockNotifiedTestImpl(int timeout) throws InterruptedException {
+    private void doDequeueWaitsUntilMessageArrivesWhenLockNotifiedTestImpl(int timeout) throws Exception {
         final JmsInboundMessageDispatch message = createEnvelope();
         Thread runner = new Thread(new Runnable() {
 
@@ -297,9 +233,13 @@ public class FifoMessageQueueTest {
                     TimeUnit.MILLISECONDS.sleep(100);
                 } catch (InterruptedException e) {
                 }
-                synchronized (queue.getLock()) {
-                    queue.getLock().notify();
+
+                try {
+                    singalQueue(queue);
+                } catch (Exception e1) {
+                    return;
                 }
+
                 try {
                     TimeUnit.MILLISECONDS.sleep(100);
                 } catch (InterruptedException e) {
@@ -388,4 +328,36 @@ public class FifoMessageQueueTest {
 
         return message;
     }
+
+    private void singalQueue(FifoMessageQueue queue) throws Exception {
+        Field lock = null;
+        Field condition = null;
+        Class<?> queueType = queue.getClass();
+
+        while (queueType != null && lock == null) {
+            try {
+                lock = queueType.getDeclaredField("lock");
+                condition = queueType.getDeclaredField("condition");
+            } catch (NoSuchFieldException error) {
+                queueType = queueType.getSuperclass();
+                if (Object.class.equals(queueType)) {
+                    queueType = null;
+                }
+            }
+        }
+
+        assertNotNull("MessageQueue implementation unknown", lock);
+        lock.setAccessible(true);
+        condition.setAccessible(true);
+
+        ReentrantLock lockView = (ReentrantLock) lock.get(queue);
+        Condition conditionView = (Condition) condition.get(queue);
+
+        lockView.lock();
+        try {
+            conditionView.signal();
+        } finally {
+            lockView.unlock();
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d96734e0/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/PriorityMessageQueueTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/PriorityMessageQueueTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/PriorityMessageQueueTest.java
index 2c38dfa..2c10e74 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/PriorityMessageQueueTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/util/PriorityMessageQueueTest.java
@@ -24,8 +24,8 @@ import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.lang.reflect.Field;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
@@ -44,7 +44,7 @@ import org.mockito.Mockito;
  */
 public class PriorityMessageQueueTest {
 
-    private MessageQueue queue;
+    private PriorityMessageQueue queue;
     private final IdGenerator messageId = new IdGenerator();
     private long sequence;
 
@@ -55,11 +55,6 @@ public class PriorityMessageQueueTest {
     }
 
     @Test
-    public void testGetLock() {
-        assertNotNull(queue.getLock());
-    }
-
-    @Test
     public void testCreate() {
         PriorityMessageQueue queue = new PriorityMessageQueue();
 
@@ -188,27 +183,6 @@ public class PriorityMessageQueueTest {
     }
 
     @Test
-    public void testRemoveAll() throws JMSException {
-        List<JmsInboundMessageDispatch> messages = createFullRangePrioritySet();
-        Collections.shuffle(messages);
-
-        for (JmsInboundMessageDispatch envelope: messages) {
-            queue.enqueue(envelope);
-        }
-
-        assertFalse(queue.isEmpty());
-        List<JmsInboundMessageDispatch> result = queue.removeAll();
-        assertTrue(queue.isEmpty());
-
-        assertEquals(10, result.size());
-
-        for (byte i = 9; i >= 0; --i) {
-            JmsInboundMessageDispatch envelope = result.remove(0);
-            assertEquals(i, envelope.getMessage().getJMSPriority());
-        }
-    }
-
-    @Test
     public void testRemoveFirstOnEmptyQueue() {
         assertNull(queue.dequeueNoWait());
     }
@@ -245,47 +219,6 @@ public class PriorityMessageQueueTest {
         assertTrue(queue.isEmpty());
     }
 
-    @Test
-    public void testPeekOnEmptyQueue() {
-        assertNull(queue.peek());
-    }
-
-    @Test
-    public void testPeekFirst() throws JMSException {
-        List<JmsInboundMessageDispatch> messages = createFullRangePrioritySet();
-
-        for (JmsInboundMessageDispatch envelope: messages) {
-            queue.enqueue(envelope);
-        }
-
-        for (byte i = 9; i >= 0; --i) {
-            JmsInboundMessageDispatch first = queue.peek();
-            assertEquals(i, first.getMessage().getJMSPriority());
-            queue.dequeueNoWait();
-        }
-
-        assertTrue(queue.isEmpty());
-    }
-
-    @Test
-    public void testPeekFirstSparse() throws JMSException {
-        queue.enqueue(createEnvelope(9));
-        queue.enqueue(createEnvelope(4));
-        queue.enqueue(createEnvelope(1));
-
-        JmsInboundMessageDispatch envelope = queue.peek();
-        assertEquals(9, envelope.getMessage().getJMSPriority());
-        queue.dequeueNoWait();
-        envelope = queue.peek();
-        assertEquals(4, envelope.getMessage().getJMSPriority());
-        queue.dequeueNoWait();
-        envelope = queue.peek();
-        assertEquals(1, envelope.getMessage().getJMSPriority());
-        queue.dequeueNoWait();
-
-        assertTrue(queue.isEmpty());
-    }
-
     @Test(timeout = 10000)
     public void testDequeueWaitsUntilMessageArrives() throws InterruptedException {
         doDequeueWaitsUntilMessageArrivesTestImpl(-1);
@@ -325,6 +258,7 @@ public class PriorityMessageQueueTest {
     }
 
     private void doDequeueWaitsUntilMessageArrivesWhenLockNotifiedTestImpl(int timeout) throws InterruptedException {
+
         final JmsInboundMessageDispatch message = createEnvelope();
         Thread runner = new Thread(new Runnable() {
 
@@ -334,9 +268,13 @@ public class PriorityMessageQueueTest {
                     TimeUnit.MILLISECONDS.sleep(100);
                 } catch (InterruptedException e) {
                 }
-                synchronized (queue.getLock()) {
-                    queue.getLock().notify();
+
+                try {
+                    singalQueue(queue);
+                } catch (Exception e1) {
+                    return;
                 }
+
                 try {
                     TimeUnit.MILLISECONDS.sleep(100);
                 } catch (InterruptedException e) {
@@ -400,18 +338,16 @@ public class PriorityMessageQueueTest {
         queue.enqueue(message);
         queue.enqueue(createEnvelope(1));
 
-        JmsInboundMessageDispatch envelope = queue.peek();
+        JmsInboundMessageDispatch envelope = queue.dequeueNoWait();
         assertEquals(9, envelope.getMessage().getJMSPriority());
-        queue.dequeueNoWait();
-        envelope = queue.peek();
+
+        envelope = queue.dequeueNoWait();
         try {
             envelope.getMessage().getJMSPriority();
             fail("Unreadable priority message should sit at default level");
         } catch (MessageNotReadableException mnre) {}
-        queue.dequeueNoWait();
-        envelope = queue.peek();
+        envelope = queue.dequeueNoWait();
         assertEquals(1, envelope.getMessage().getJMSPriority());
-        queue.dequeueNoWait();
 
         assertTrue(queue.isEmpty());
     }
@@ -458,4 +394,29 @@ public class PriorityMessageQueueTest {
 
         return message;
     }
+
+    private void singalQueue(PriorityMessageQueue queue) throws Exception {
+        Field lock = null;
+        Class<?> queueType = queue.getClass();
+
+        while (queueType != null && lock == null) {
+            try {
+                lock = queueType.getDeclaredField("lock");
+            } catch (NoSuchFieldException error) {
+                queueType = queueType.getSuperclass();
+                if (Object.class.equals(queueType)) {
+                    queueType = null;
+                }
+            }
+        }
+
+        assertNotNull("MessageQueue implementation unknown", lock);
+        lock.setAccessible(true);
+
+        Object lockView = lock.get(queue);
+
+        synchronized (lockView) {
+            lockView.notify();
+        }
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org