You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2014/02/10 16:12:53 UTC
git commit: https://issues.apache.org/jira/browse/AMQ-5038 - close
active sessions on deactivate to ensure consumption stops for an endpoint
Updated Branches:
refs/heads/trunk 3df943ce0 -> 519d8f7db
https://issues.apache.org/jira/browse/AMQ-5038 - close active sessions on deactivate to ensure consumption stops for an endpoint
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/519d8f7d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/519d8f7d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/519d8f7d
Branch: refs/heads/trunk
Commit: 519d8f7db0d3243cb3b60b156369f3f069e20e52
Parents: 3df943c
Author: gtully <ga...@gmail.com>
Authored: Mon Feb 10 15:11:13 2014 +0000
Committer: gtully <ga...@gmail.com>
Committed: Mon Feb 10 15:12:39 2014 +0000
----------------------------------------------------------------------
.../activemq/JmsQueueTransactionTest.java | 3 +
.../activemq/ra/ActiveMQEndpointWorker.java | 8 +-
.../apache/activemq/ra/ServerSessionImpl.java | 2 +-
.../activemq/ra/ServerSessionPoolImpl.java | 18 ++-
.../activemq/ra/ServerSessionImplTest.java | 162 +++++++++++++++++--
5 files changed, 171 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/519d8f7d/activemq-broker/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java b/activemq-broker/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java
index 6504d7c..c2e9510 100755
--- a/activemq-broker/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java
+++ b/activemq-broker/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java
@@ -183,6 +183,7 @@ public class JmsQueueTransactionTest extends JmsTransactionTestSupport {
assertTrue("Should have received the third message", enumeration.hasMoreElements());
assertEquals(outbound[2], (Message)enumeration.nextElement());
+ LOG.info("Check for more...");
// There should be no more.
boolean tooMany = false;
while (enumeration.hasMoreElements()) {
@@ -190,8 +191,10 @@ public class JmsQueueTransactionTest extends JmsTransactionTestSupport {
tooMany = true;
}
assertFalse(tooMany);
+ LOG.info("close browser...");
browser.close();
+ LOG.info("reopen and consume...");
// Re-open the consumer.
consumer = resourceProvider.createConsumer(session, destination);
// Receive the second.
http://git-wip-us.apache.org/repos/asf/activemq/blob/519d8f7d/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
index 4765520..b18ef29 100755
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
@@ -21,7 +21,6 @@ import java.lang.reflect.Method;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
-import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -117,7 +116,7 @@ public class ActiveMQEndpointWorker {
if (connecting.compareAndSet(false, true)) {
synchronized (connectWork) {
disconnect();
- serverSessionPool.closeIdleSessions();
+ serverSessionPool.closeSessions();
connect();
}
} else {
@@ -328,6 +327,11 @@ public class ActiveMQEndpointWorker {
THREAD_LOCAL.set(null);
}
+ // for testing
+ public void setConnection(ActiveMQConnection activeMQConnection) {
+ this.connection = activeMQConnection;
+ }
+
protected ActiveMQConnection getConnection() {
// make sure we only return a working connection
// in particular make sure that we do not return null
http://git-wip-us.apache.org/repos/asf/activemq/blob/519d8f7d/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
index f6f965f..27c75b1 100755
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
@@ -166,7 +166,7 @@ public class ServerSessionImpl implements ServerSession, InboundContext, Work, D
try {
InboundContextSupport.register(this);
if ( session.isRunning() ) {
- session.run();
+ session.run();
} else {
log.debug("JMS Session is no longer running (maybe due to loss of connection?), marking ServerSesison as stale");
stale = true;
http://git-wip-us.apache.org/repos/asf/activemq/blob/519d8f7d/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
index ccae078..c0c3320 100755
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
@@ -262,7 +262,7 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
public void close() {
closing.set(true);
- int activeCount = closeIdleSessions();
+ int activeCount = closeSessions();
// we may have to wait erroneously 250ms if an
// active session is removed during our wait and we
// are not notified
@@ -278,14 +278,26 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
Thread.currentThread().interrupt();
return;
}
- activeCount = closeIdleSessions();
+ activeCount = closeSessions();
}
}
- protected int closeIdleSessions() {
+ protected int closeSessions() {
sessionLock.lock();
try {
+ for (ServerSessionImpl ss : activeSessions) {
+ try {
+ ActiveMQSession session = (ActiveMQSession) ss.getSession();
+ if (!session.isClosed()) {
+ session.close();
+ }
+ } catch (JMSException ignored) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Failed to close active running server session {}, reason:{}", ss, ignored.toString(), ignored);
+ }
+ }
+ }
for (ServerSessionImpl ss : idleSessions) {
ss.close();
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/519d8f7d/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java
index a862110..fb99330 100644
--- a/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java
+++ b/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java
@@ -16,36 +16,55 @@
*/
package org.apache.activemq.ra;
+import java.lang.reflect.Method;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import javax.jms.Session;
-import javax.resource.spi.endpoint.MessageEndpoint;
+import javax.resource.spi.BootstrapContext;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.resource.spi.work.ExecutionContext;
+import javax.resource.spi.work.Work;
+import javax.resource.spi.work.WorkListener;
import javax.resource.spi.work.WorkManager;
+import javax.transaction.xa.XAResource;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.MessageDispatch;
+import org.hamcrest.Description;
import org.jmock.Expectations;
import org.jmock.Mockery;
+import org.jmock.api.Action;
+import org.jmock.api.Invocation;
import org.jmock.integration.junit4.JMock;
import org.jmock.lib.legacy.ClassImposteriser;
+import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
*
*/
@RunWith(JMock.class)
public class ServerSessionImplTest extends TestCase {
- private static final String BROKER_URL = "vm://localhost";
+ private static final Logger LOG = LoggerFactory.getLogger(ServerSessionImplTest.class);
+ private static final String BROKER_URL = "vm://localhost?broker.persistent=false";
private ServerSessionImpl serverSession;
private ServerSessionPoolImpl pool;
private WorkManager workManager;
- private MessageEndpoint messageEndpoint;
+ private MessageEndpointProxy messageEndpoint;
private ActiveMQConnection con;
private ActiveMQSession session;
+ ActiveMQEndpointWorker endpointWorker;
private Mockery context;
-
@Before
public void setUp() throws Exception
{
@@ -57,25 +76,136 @@ public class ServerSessionImplTest extends TestCase {
org.apache.activemq.ActiveMQConnectionFactory factory =
new org.apache.activemq.ActiveMQConnectionFactory(BROKER_URL);
con = (ActiveMQConnection) factory.createConnection();
+ con.start();
session = (ActiveMQSession) con.createSession(false, Session.AUTO_ACKNOWLEDGE);
- pool = context.mock(ServerSessionPoolImpl.class);
- workManager = context.mock(WorkManager.class);
-
- serverSession = new ServerSessionImpl(
- (ServerSessionPoolImpl) pool,
- session,
- (WorkManager) workManager,
- messageEndpoint,
- false,
- 10);
}
-
+
+ @After
+ public void tearDown() throws Exception {
+ if (con != null) {
+ con.close();
+ }
+ }
+
@Test
public void testRunDetectsStoppedSession() throws Exception {
+
+ pool = context.mock(ServerSessionPoolImpl.class);
+ workManager = context.mock(WorkManager.class);
+ messageEndpoint = context.mock(MessageEndpointProxy.class);
+
+ serverSession = new ServerSessionImpl(
+ (ServerSessionPoolImpl) pool,
+ session,
+ (WorkManager) workManager,
+ messageEndpoint,
+ false,
+ 10);
+
con.close();
context.checking(new Expectations() {{
oneOf (pool).removeFromPool(with(same(serverSession)));
- }});
+ }});
serverSession.run();
}
+
+ @Test
+ public void testCloseCanStopActiveSession() throws Exception {
+
+ final int maxMessages = 4000;
+ final CountDownLatch messageCount = new CountDownLatch(maxMessages);
+
+ final MessageEndpointFactory messageEndpointFactory = context.mock(MessageEndpointFactory.class);
+ final MessageResourceAdapter resourceAdapter = context.mock(MessageResourceAdapter.class);
+ final ActiveMQEndpointActivationKey key = context.mock(ActiveMQEndpointActivationKey.class);
+ messageEndpoint = context.mock(MessageEndpointProxy.class);
+ workManager = context.mock(WorkManager.class);
+ final MessageActivationSpec messageActivationSpec = context.mock(MessageActivationSpec.class);
+ final BootstrapContext boostrapContext = context.mock(BootstrapContext.class);
+ context.checking(new Expectations() {{
+ allowing(boostrapContext).getWorkManager(); will (returnValue(workManager));
+ allowing(resourceAdapter).getBootstrapContext(); will (returnValue(boostrapContext));
+ allowing(messageEndpointFactory).isDeliveryTransacted(with (any(Method.class))); will(returnValue(Boolean.FALSE));
+ allowing(key).getMessageEndpointFactory(); will(returnValue(messageEndpointFactory));
+ allowing(key).getActivationSpec(); will (returnValue(messageActivationSpec));
+ allowing(messageActivationSpec).isUseJndi(); will (returnValue(Boolean.FALSE));
+ allowing(messageActivationSpec).getDestinationType(); will (returnValue("javax.jms.Queue"));
+ allowing(messageActivationSpec).getDestination(); will (returnValue("Queue"));
+ allowing(messageActivationSpec).getAcknowledgeModeForSession(); will (returnValue(1));
+ allowing(messageActivationSpec).getMaxSessionsIntValue(); will (returnValue(1));
+ allowing(messageActivationSpec).getEnableBatchBooleanValue(); will (returnValue(Boolean.FALSE));
+ allowing(messageActivationSpec).isUseRAManagedTransactionEnabled(); will (returnValue(Boolean.TRUE));
+ allowing(messageEndpointFactory).createEndpoint(with (any(XAResource.class))); will (returnValue(messageEndpoint));
+
+ allowing(workManager).scheduleWork((Work) with(anything()), (long) with(any(long.class)), with(any(ExecutionContext.class)), with(any(WorkListener.class)));
+ will (new Action() {
+ @Override
+ public Object invoke(Invocation invocation) throws Throwable {
+ return null;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ }
+ });
+
+ allowing(messageEndpoint).beforeDelivery((Method) with(anything()));
+ allowing (messageEndpoint).onMessage(with (any(javax.jms.Message.class))); will(new Action(){
+ @Override
+ public Object invoke(Invocation invocation) throws Throwable {
+ messageCount.countDown();
+ if (messageCount.getCount() < maxMessages - 11) {
+ TimeUnit.MILLISECONDS.sleep(200);
+ }
+ return null;
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendText("Keep message count");
+ }
+ });
+ allowing(messageEndpoint).afterDelivery();
+ allowing(messageEndpoint).release();
+
+ }});
+
+ endpointWorker = new ActiveMQEndpointWorker(resourceAdapter, key);
+ endpointWorker.setConnection(con);
+ pool = new ServerSessionPoolImpl(endpointWorker, 2);
+
+ endpointWorker.start();
+ final ServerSessionImpl serverSession1 = (ServerSessionImpl) pool.getServerSession();
+
+ // preload the session dispatch queue to keep the session active
+ ActiveMQSession session1 = (ActiveMQSession) serverSession1.getSession();
+ for (int i=0; i<maxMessages; i++) {
+ MessageDispatch messageDispatch = new MessageDispatch();
+ messageDispatch.setMessage(new ActiveMQTextMessage());
+ session1.dispatch(messageDispatch);
+ }
+
+ ExecutorService executorService = Executors.newCachedThreadPool();
+ final CountDownLatch runState = new CountDownLatch(1);
+ executorService.execute(new Runnable(){
+ @Override
+ public void run() {
+ try {
+ serverSession1.run();
+ runState.countDown();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ while (messageCount.getCount() > maxMessages - 10) {
+ TimeUnit.MILLISECONDS.sleep(100);
+ }
+ LOG.info("Closing pool on {}", messageCount.getCount());
+ pool.close();
+
+ assertTrue("run has completed", runState.await(20, TimeUnit.SECONDS));
+ assertTrue("not all messages consumed", messageCount.getCount() > 0);
+ }
}