You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2015/07/16 13:39:39 UTC

activemq git commit: AMQ-5400 - rework to remove static lock - impacted parallel delivery and hense performance. Fix and additional tet

Repository: activemq
Updated Branches:
  refs/heads/master 2b5b890db -> c85c7c147


AMQ-5400 - rework to remove static lock - impacted parallel delivery and hense performance. Fix and additional tet


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c85c7c14
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c85c7c14
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c85c7c14

Branch: refs/heads/master
Commit: c85c7c14720dffeb44633547db540533796a8388
Parents: 2b5b890
Author: gtully <ga...@gmail.com>
Authored: Thu Jul 16 12:38:52 2015 +0100
Committer: gtully <ga...@gmail.com>
Committed: Thu Jul 16 12:38:52 2015 +0100

----------------------------------------------------------------------
 .../org/apache/activemq/ActiveMQSession.java    |  9 +--
 .../java/org/apache/activemq/ra/MDBTest.java    | 74 ++++++++++++++++++++
 2 files changed, 79 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/c85c7c14/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
index 129ed51..b3e6ae6 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
@@ -200,7 +200,6 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
     }
 
     private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class);
-    private static final Object REDELIVERY_GUARD = new Object();
     private final ThreadPoolExecutor connectionExecutor;
 
     protected int acknowledgementMode;
@@ -220,7 +219,9 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
     protected boolean asyncDispatch;
     protected boolean sessionAsyncDispatch;
     protected final boolean debug;
-    protected Object sendMutex = new Object();
+    protected final Object sendMutex = new Object();
+    protected final Object redeliveryGuard = new Object();
+
     private final AtomicBoolean clearInProgress = new AtomicBoolean();
 
     private MessageListener messageListener;
@@ -930,7 +931,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
             * The redelivery guard is to allow the endpoint lifecycle to complete before the messsage is dispatched.
             * We dont want the after deliver being called after the redeliver as it may cause some weird stuff.
             * */
-            synchronized (REDELIVERY_GUARD) {
+            synchronized (redeliveryGuard) {
                 try {
                     ack.setFirstMessageId(md.getMessage().getMessageId());
                     doStartTransaction();
@@ -1011,7 +1012,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
                                             /*
                                             * wait for the first delivery to be complete, i.e. after delivery has been called.
                                             * */
-                                            synchronized (REDELIVERY_GUARD) {
+                                            synchronized (redeliveryGuard) {
                                                 /*
                                                 * If its non blocking then we can just dispatch in a new session.
                                                 * */

http://git-wip-us.apache.org/repos/asf/activemq/blob/c85c7c14/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
index d58da04..43785f1 100644
--- a/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
+++ b/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
@@ -312,6 +312,80 @@ public class MDBTest {
         adapter.stop();
     }
 
+    @Test
+    public void testParallelMessageDelivery() throws Exception {
+
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+        Connection connection = factory.createConnection();
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter();
+        adapter.setServerUrl("vm://localhost?broker.persistent=false");
+        adapter.start(new StubBootstrapContext());
+
+        final CountDownLatch messageDelivered = new CountDownLatch(10);
+
+        final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
+
+
+            @Override
+            public void beforeDelivery(Method method) throws NoSuchMethodException, ResourceException {
+            }
+
+            @Override
+            public void afterDelivery() throws ResourceException {
+            }
+
+            public void onMessage(Message message) {
+                LOG.info("Message:" + message);
+                super.onMessage(message);
+                try {
+                    TimeUnit.SECONDS.sleep(1);
+                } catch (InterruptedException e) {
+                    e.printStackTrace();
+                }
+                messageDelivered.countDown();
+            };
+        };
+
+        ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec();
+        activationSpec.setDestinationType(Queue.class.getName());
+        activationSpec.setDestination("TEST");
+        activationSpec.setResourceAdapter(adapter);
+        activationSpec.validate();
+
+        MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() {
+            public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException {
+                endpoint.xaresource = resource;
+                return endpoint;
+            }
+
+            public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
+                return false;
+            }
+        };
+
+        // Activate an Endpoint
+        adapter.endpointActivation(messageEndpointFactory, activationSpec);
+
+
+        // Send the broker a message to that endpoint
+        MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST"));
+        for (int i=0;i<10;i++) {
+            producer.send(session.createTextMessage(i+"-Hello!"));
+        }
+
+        connection.close();
+
+        // Wait for the message to be delivered.
+        assertTrue(messageDelivered.await(5000, TimeUnit.MILLISECONDS));
+
+        // Shut the Endpoint down.
+        adapter.endpointDeactivation(messageEndpointFactory, activationSpec);
+        adapter.stop();
+
+    }
+
     //https://issues.apache.org/jira/browse/AMQ-5811
     @Test(timeout = 90000)
     public void testAsyncStop() throws Exception {