You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2015/07/16 13:39:39 UTC
activemq git commit: AMQ-5400 - rework to remove static lock -
impacted parallel delivery and hense performance. Fix and additional tet
Repository: activemq
Updated Branches:
refs/heads/master 2b5b890db -> c85c7c147
AMQ-5400 - rework to remove static lock - impacted parallel delivery and hense performance. Fix and additional tet
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c85c7c14
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c85c7c14
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c85c7c14
Branch: refs/heads/master
Commit: c85c7c14720dffeb44633547db540533796a8388
Parents: 2b5b890
Author: gtully <ga...@gmail.com>
Authored: Thu Jul 16 12:38:52 2015 +0100
Committer: gtully <ga...@gmail.com>
Committed: Thu Jul 16 12:38:52 2015 +0100
----------------------------------------------------------------------
.../org/apache/activemq/ActiveMQSession.java | 9 +--
.../java/org/apache/activemq/ra/MDBTest.java | 74 ++++++++++++++++++++
2 files changed, 79 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/c85c7c14/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
index 129ed51..b3e6ae6 100755
--- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
+++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java
@@ -200,7 +200,6 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
}
private static final Logger LOG = LoggerFactory.getLogger(ActiveMQSession.class);
- private static final Object REDELIVERY_GUARD = new Object();
private final ThreadPoolExecutor connectionExecutor;
protected int acknowledgementMode;
@@ -220,7 +219,9 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
protected boolean asyncDispatch;
protected boolean sessionAsyncDispatch;
protected final boolean debug;
- protected Object sendMutex = new Object();
+ protected final Object sendMutex = new Object();
+ protected final Object redeliveryGuard = new Object();
+
private final AtomicBoolean clearInProgress = new AtomicBoolean();
private MessageListener messageListener;
@@ -930,7 +931,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
* The redelivery guard is to allow the endpoint lifecycle to complete before the messsage is dispatched.
* We dont want the after deliver being called after the redeliver as it may cause some weird stuff.
* */
- synchronized (REDELIVERY_GUARD) {
+ synchronized (redeliveryGuard) {
try {
ack.setFirstMessageId(md.getMessage().getMessageId());
doStartTransaction();
@@ -1011,7 +1012,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta
/*
* wait for the first delivery to be complete, i.e. after delivery has been called.
* */
- synchronized (REDELIVERY_GUARD) {
+ synchronized (redeliveryGuard) {
/*
* If its non blocking then we can just dispatch in a new session.
* */
http://git-wip-us.apache.org/repos/asf/activemq/blob/c85c7c14/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
index d58da04..43785f1 100644
--- a/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
+++ b/activemq-ra/src/test/java/org/apache/activemq/ra/MDBTest.java
@@ -312,6 +312,80 @@ public class MDBTest {
adapter.stop();
}
+ @Test
+ public void testParallelMessageDelivery() throws Exception {
+
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?broker.persistent=false");
+ Connection connection = factory.createConnection();
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ ActiveMQResourceAdapter adapter = new ActiveMQResourceAdapter();
+ adapter.setServerUrl("vm://localhost?broker.persistent=false");
+ adapter.start(new StubBootstrapContext());
+
+ final CountDownLatch messageDelivered = new CountDownLatch(10);
+
+ final StubMessageEndpoint endpoint = new StubMessageEndpoint() {
+
+
+ @Override
+ public void beforeDelivery(Method method) throws NoSuchMethodException, ResourceException {
+ }
+
+ @Override
+ public void afterDelivery() throws ResourceException {
+ }
+
+ public void onMessage(Message message) {
+ LOG.info("Message:" + message);
+ super.onMessage(message);
+ try {
+ TimeUnit.SECONDS.sleep(1);
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ messageDelivered.countDown();
+ };
+ };
+
+ ActiveMQActivationSpec activationSpec = new ActiveMQActivationSpec();
+ activationSpec.setDestinationType(Queue.class.getName());
+ activationSpec.setDestination("TEST");
+ activationSpec.setResourceAdapter(adapter);
+ activationSpec.validate();
+
+ MessageEndpointFactory messageEndpointFactory = new MessageEndpointFactory() {
+ public MessageEndpoint createEndpoint(XAResource resource) throws UnavailableException {
+ endpoint.xaresource = resource;
+ return endpoint;
+ }
+
+ public boolean isDeliveryTransacted(Method method) throws NoSuchMethodException {
+ return false;
+ }
+ };
+
+ // Activate an Endpoint
+ adapter.endpointActivation(messageEndpointFactory, activationSpec);
+
+
+ // Send the broker a message to that endpoint
+ MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST"));
+ for (int i=0;i<10;i++) {
+ producer.send(session.createTextMessage(i+"-Hello!"));
+ }
+
+ connection.close();
+
+ // Wait for the message to be delivered.
+ assertTrue(messageDelivered.await(5000, TimeUnit.MILLISECONDS));
+
+ // Shut the Endpoint down.
+ adapter.endpointDeactivation(messageEndpointFactory, activationSpec);
+ adapter.stop();
+
+ }
+
//https://issues.apache.org/jira/browse/AMQ-5811
@Test(timeout = 90000)
public void testAsyncStop() throws Exception {