You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2014/02/10 16:12:53 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-5038 - close active sessions on deactivate to ensure consumption stops for an endpoint

Updated Branches:
  refs/heads/trunk 3df943ce0 -> 519d8f7db


https://issues.apache.org/jira/browse/AMQ-5038 - close active sessions on deactivate to ensure consumption stops for an endpoint


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/519d8f7d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/519d8f7d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/519d8f7d

Branch: refs/heads/trunk
Commit: 519d8f7db0d3243cb3b60b156369f3f069e20e52
Parents: 3df943c
Author: gtully <ga...@gmail.com>
Authored: Mon Feb 10 15:11:13 2014 +0000
Committer: gtully <ga...@gmail.com>
Committed: Mon Feb 10 15:12:39 2014 +0000

----------------------------------------------------------------------
 .../activemq/JmsQueueTransactionTest.java       |   3 +
 .../activemq/ra/ActiveMQEndpointWorker.java     |   8 +-
 .../apache/activemq/ra/ServerSessionImpl.java   |   2 +-
 .../activemq/ra/ServerSessionPoolImpl.java      |  18 ++-
 .../activemq/ra/ServerSessionImplTest.java      | 162 +++++++++++++++++--
 5 files changed, 171 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/519d8f7d/activemq-broker/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java b/activemq-broker/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java
index 6504d7c..c2e9510 100755
--- a/activemq-broker/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java
+++ b/activemq-broker/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java
@@ -183,6 +183,7 @@ public class JmsQueueTransactionTest extends JmsTransactionTestSupport {
         assertTrue("Should have received the third message", enumeration.hasMoreElements());
         assertEquals(outbound[2], (Message)enumeration.nextElement());
 
+        LOG.info("Check for more...");
         // There should be no more.
         boolean tooMany = false;
         while (enumeration.hasMoreElements()) {
@@ -190,8 +191,10 @@ public class JmsQueueTransactionTest extends JmsTransactionTestSupport {
             tooMany = true;
         }
         assertFalse(tooMany);
+        LOG.info("close browser...");
         browser.close();
 
+        LOG.info("reopen and consume...");
         // Re-open the consumer.
         consumer = resourceProvider.createConsumer(session, destination);
         // Receive the second.

http://git-wip-us.apache.org/repos/asf/activemq/blob/519d8f7d/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
index 4765520..b18ef29 100755
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ActiveMQEndpointWorker.java
@@ -21,7 +21,6 @@ import java.lang.reflect.Method;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.jms.Connection;
 import javax.jms.ConnectionConsumer;
-import javax.jms.Destination;
 import javax.jms.ExceptionListener;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -117,7 +116,7 @@ public class ActiveMQEndpointWorker {
                                     if (connecting.compareAndSet(false, true)) {
                                         synchronized (connectWork) {
                                             disconnect();
-                                            serverSessionPool.closeIdleSessions();
+                                            serverSessionPool.closeSessions();
                                             connect();
                                         }
                                     } else {
@@ -328,6 +327,11 @@ public class ActiveMQEndpointWorker {
         THREAD_LOCAL.set(null);
     }
 
+    // for testing
+    public void setConnection(ActiveMQConnection activeMQConnection) {
+        this.connection = activeMQConnection;
+    }
+
     protected ActiveMQConnection getConnection() {
         // make sure we only return a working connection
         // in particular make sure that we do not return null

http://git-wip-us.apache.org/repos/asf/activemq/blob/519d8f7d/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
index f6f965f..27c75b1 100755
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionImpl.java
@@ -166,7 +166,7 @@ public class ServerSessionImpl implements ServerSession, InboundContext, Work, D
             try {
                 InboundContextSupport.register(this);
                 if ( session.isRunning() ) {
-                session.run();
+                    session.run();
                 } else {
                     log.debug("JMS Session is no longer running (maybe due to loss of connection?), marking ServerSesison as stale");
                     stale = true;

http://git-wip-us.apache.org/repos/asf/activemq/blob/519d8f7d/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
index ccae078..c0c3320 100755
--- a/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
+++ b/activemq-ra/src/main/java/org/apache/activemq/ra/ServerSessionPoolImpl.java
@@ -262,7 +262,7 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
 
     public void close() {
         closing.set(true);
-        int activeCount = closeIdleSessions();
+        int activeCount = closeSessions();
         // we may have to wait erroneously 250ms if an
         // active session is removed during our wait and we
         // are not notified
@@ -278,14 +278,26 @@ public class ServerSessionPoolImpl implements ServerSessionPool {
                 Thread.currentThread().interrupt();
                 return;
             }
-            activeCount = closeIdleSessions();
+            activeCount = closeSessions();
         }
     }
 
 
-    protected int closeIdleSessions() {
+    protected int closeSessions() {
         sessionLock.lock();
         try {
+            for (ServerSessionImpl ss : activeSessions) {
+                try {
+                    ActiveMQSession session = (ActiveMQSession) ss.getSession();
+                    if (!session.isClosed()) {
+                        session.close();
+                    }
+                } catch (JMSException ignored) {
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Failed to close active running server session {}, reason:{}", ss, ignored.toString(), ignored);
+                    }
+                }
+            }
             for (ServerSessionImpl ss : idleSessions) {
                 ss.close();
             }

http://git-wip-us.apache.org/repos/asf/activemq/blob/519d8f7d/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java
----------------------------------------------------------------------
diff --git a/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java b/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java
index a862110..fb99330 100644
--- a/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java
+++ b/activemq-ra/src/test/java/org/apache/activemq/ra/ServerSessionImplTest.java
@@ -16,36 +16,55 @@
  */
 package org.apache.activemq.ra;
 
+import java.lang.reflect.Method;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 import javax.jms.Session;
-import javax.resource.spi.endpoint.MessageEndpoint;
+import javax.resource.spi.BootstrapContext;
+import javax.resource.spi.endpoint.MessageEndpointFactory;
+import javax.resource.spi.work.ExecutionContext;
+import javax.resource.spi.work.Work;
+import javax.resource.spi.work.WorkListener;
 import javax.resource.spi.work.WorkManager;
 
+import javax.transaction.xa.XAResource;
 import junit.framework.TestCase;
 
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQSession;
+import org.apache.activemq.command.ActiveMQTextMessage;
+import org.apache.activemq.command.MessageDispatch;
+import org.hamcrest.Description;
 import org.jmock.Expectations;
 import org.jmock.Mockery;
+import org.jmock.api.Action;
+import org.jmock.api.Invocation;
 import org.jmock.integration.junit4.JMock;
 import org.jmock.lib.legacy.ClassImposteriser;
+import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * 
  */
 @RunWith(JMock.class)
 public class ServerSessionImplTest extends TestCase {
-    private static final String BROKER_URL = "vm://localhost";
+    private static final Logger LOG = LoggerFactory.getLogger(ServerSessionImplTest.class);
+    private static final String BROKER_URL = "vm://localhost?broker.persistent=false";
     private ServerSessionImpl serverSession;
     private ServerSessionPoolImpl pool;
     private WorkManager workManager;
-    private MessageEndpoint messageEndpoint;
+    private MessageEndpointProxy messageEndpoint;
     private ActiveMQConnection con;
     private ActiveMQSession session;
+    ActiveMQEndpointWorker endpointWorker;
     private Mockery context;
-    
     @Before
     public void setUp() throws Exception
     {
@@ -57,25 +76,136 @@ public class ServerSessionImplTest extends TestCase {
         org.apache.activemq.ActiveMQConnectionFactory factory = 
                 new org.apache.activemq.ActiveMQConnectionFactory(BROKER_URL);
         con = (ActiveMQConnection) factory.createConnection();
+        con.start();
         session = (ActiveMQSession) con.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        pool = context.mock(ServerSessionPoolImpl.class);        
-        workManager = context.mock(WorkManager.class);
-        
-        serverSession = new ServerSessionImpl(
-                (ServerSessionPoolImpl) pool, 
-                session, 
-                (WorkManager) workManager, 
-                messageEndpoint, 
-                false, 
-                10);
     }
-    
+
+    @After
+    public void tearDown() throws Exception {
+        if (con != null) {
+            con.close();
+        }
+    }
+
     @Test
     public void testRunDetectsStoppedSession() throws Exception {
+
+        pool = context.mock(ServerSessionPoolImpl.class);
+        workManager = context.mock(WorkManager.class);
+        messageEndpoint = context.mock(MessageEndpointProxy.class);
+
+        serverSession = new ServerSessionImpl(
+                (ServerSessionPoolImpl) pool,
+                session,
+                (WorkManager) workManager,
+                messageEndpoint,
+                false,
+                10);
+
         con.close();
         context.checking(new Expectations() {{
             oneOf (pool).removeFromPool(with(same(serverSession)));
-        }});   
+        }});
         serverSession.run();
     }
+
+    @Test
+    public void testCloseCanStopActiveSession() throws Exception {
+
+        final int maxMessages = 4000;
+        final CountDownLatch messageCount = new CountDownLatch(maxMessages);
+
+        final MessageEndpointFactory messageEndpointFactory = context.mock(MessageEndpointFactory.class);
+        final MessageResourceAdapter resourceAdapter = context.mock(MessageResourceAdapter.class);
+        final ActiveMQEndpointActivationKey key = context.mock(ActiveMQEndpointActivationKey.class);
+        messageEndpoint = context.mock(MessageEndpointProxy.class);
+        workManager = context.mock(WorkManager.class);
+        final MessageActivationSpec messageActivationSpec = context.mock(MessageActivationSpec.class);
+        final BootstrapContext boostrapContext = context.mock(BootstrapContext.class);
+        context.checking(new Expectations() {{
+            allowing(boostrapContext).getWorkManager(); will (returnValue(workManager));
+            allowing(resourceAdapter).getBootstrapContext(); will (returnValue(boostrapContext));
+            allowing(messageEndpointFactory).isDeliveryTransacted(with (any(Method.class))); will(returnValue(Boolean.FALSE));
+            allowing(key).getMessageEndpointFactory();  will(returnValue(messageEndpointFactory));
+            allowing(key).getActivationSpec(); will (returnValue(messageActivationSpec));
+            allowing(messageActivationSpec).isUseJndi(); will (returnValue(Boolean.FALSE));
+            allowing(messageActivationSpec).getDestinationType(); will (returnValue("javax.jms.Queue"));
+            allowing(messageActivationSpec).getDestination(); will (returnValue("Queue"));
+            allowing(messageActivationSpec).getAcknowledgeModeForSession(); will (returnValue(1));
+            allowing(messageActivationSpec).getMaxSessionsIntValue(); will (returnValue(1));
+            allowing(messageActivationSpec).getEnableBatchBooleanValue(); will (returnValue(Boolean.FALSE));
+            allowing(messageActivationSpec).isUseRAManagedTransactionEnabled(); will (returnValue(Boolean.TRUE));
+            allowing(messageEndpointFactory).createEndpoint(with (any(XAResource.class))); will (returnValue(messageEndpoint));
+
+            allowing(workManager).scheduleWork((Work) with(anything()), (long) with(any(long.class)), with(any(ExecutionContext.class)), with(any(WorkListener.class)));
+            will (new Action() {
+                @Override
+                public Object invoke(Invocation invocation) throws Throwable {
+                    return null;
+                }
+
+                @Override
+                public void describeTo(Description description) {
+                }
+            });
+
+            allowing(messageEndpoint).beforeDelivery((Method) with(anything()));
+            allowing (messageEndpoint).onMessage(with (any(javax.jms.Message.class))); will(new Action(){
+                @Override
+                public Object invoke(Invocation invocation) throws Throwable {
+                    messageCount.countDown();
+                    if (messageCount.getCount() < maxMessages - 11) {
+                        TimeUnit.MILLISECONDS.sleep(200);
+                    }
+                    return null;
+                }
+
+                @Override
+                public void describeTo(Description description) {
+                    description.appendText("Keep message count");
+                }
+            });
+            allowing(messageEndpoint).afterDelivery();
+            allowing(messageEndpoint).release();
+
+        }});
+
+        endpointWorker = new ActiveMQEndpointWorker(resourceAdapter, key);
+        endpointWorker.setConnection(con);
+        pool = new ServerSessionPoolImpl(endpointWorker, 2);
+
+        endpointWorker.start();
+        final ServerSessionImpl serverSession1 = (ServerSessionImpl) pool.getServerSession();
+
+        // preload the session dispatch queue to keep the session active
+        ActiveMQSession session1 = (ActiveMQSession) serverSession1.getSession();
+        for (int i=0; i<maxMessages; i++) {
+            MessageDispatch messageDispatch = new  MessageDispatch();
+            messageDispatch.setMessage(new ActiveMQTextMessage());
+            session1.dispatch(messageDispatch);
+        }
+
+        ExecutorService executorService = Executors.newCachedThreadPool();
+        final CountDownLatch runState = new CountDownLatch(1);
+        executorService.execute(new Runnable(){
+            @Override
+            public void run() {
+                try {
+                    serverSession1.run();
+                    runState.countDown();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        });
+
+        while (messageCount.getCount() > maxMessages - 10) {
+            TimeUnit.MILLISECONDS.sleep(100);
+        }
+        LOG.info("Closing pool on {}", messageCount.getCount());
+        pool.close();
+
+        assertTrue("run has completed", runState.await(20, TimeUnit.SECONDS));
+        assertTrue("not all messages consumed", messageCount.getCount() > 0);
+    }
 }