You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2014/02/21 11:25:04 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-5070 - broker blocked on shutdown

Repository: activemq
Updated Branches:
  refs/heads/trunk dcbac84a8 -> c6fe94ec0


https://issues.apache.org/jira/browse/AMQ-5070 - broker blocked on shutdown


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c6fe94ec
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c6fe94ec
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c6fe94ec

Branch: refs/heads/trunk
Commit: c6fe94ec0388f2acb169c9e3839ade5dc7c2c65f
Parents: dcbac84
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Fri Feb 21 11:22:00 2014 +0100
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Fri Feb 21 11:22:11 2014 +0100

----------------------------------------------------------------------
 .../activemq/broker/TransportConnection.java    |  10 +-
 .../activemq/broker/TransportConnector.java     |   8 +-
 .../xbean/ConnectorXBeanConfigTest.java         | 114 +++++++++++++++++--
 .../apache/activemq/xbean/connector-test.xml    |  26 +++--
 4 files changed, 137 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/c6fe94ec/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index 972ffe2..65d044b 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -144,9 +144,13 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
                         throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString());
                     }
                     Command command = (Command) o;
-                    Response response = service(command);
-                    if (response != null && !brokerService.isStopping() ) {
-                        dispatchSync(response);
+                    if (!brokerService.isStopping()) {
+                        Response response = service(command);
+                        if (response != null && !brokerService.isStopping()) {
+                            dispatchSync(response);
+                        }
+                    } else {
+                        throw new BrokerStoppedException("Broker " + brokerService + " is being stopped");
                     }
                 } finally {
                     serviceLock.readLock().unlock();

http://git-wip-us.apache.org/repos/asf/activemq/blob/c6fe94ec/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
index 582bc3f..5aa074b 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
@@ -216,8 +216,12 @@ public class TransportConnector implements Connector, BrokerServiceAware {
                         @Override
                         public void run() {
                             try {
-                                Connection connection = createConnection(transport);
-                                connection.start();
+                                if (!brokerService.isStopping()) {
+                                    Connection connection = createConnection(transport);
+                                    connection.start();
+                                } else {
+                                    throw new BrokerStoppedException("Broker " + brokerService + " is being stopped");
+                                }
                             } catch (Exception e) {
                                 String remoteHost = transport.getRemoteAddress();
                                 ServiceSupport.dispose(transport);

http://git-wip-us.apache.org/repos/asf/activemq/blob/c6fe94ec/activemq-unit-tests/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java
index 4b328dd..ad96459 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java
@@ -18,13 +18,9 @@ package org.apache.activemq.xbean;
 
 import java.net.URI;
 import java.util.List;
+import java.util.concurrent.CountDownLatch;
 
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
+import javax.jms.*;
 
 import junit.framework.TestCase;
 
@@ -36,6 +32,7 @@ import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.MessageIdList;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -86,13 +83,116 @@ public class ConnectorXBeanConfigTest extends TestCase {
         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
         conn.start();
         Destination dest = new ActiveMQQueue("test");
-        MessageProducer producer = sess.createProducer(dest);
         MessageConsumer consumer = sess.createConsumer(dest);
+        MessageProducer producer = sess.createProducer(dest);
         producer.send(sess.createTextMessage("test"));
         TextMessage msg = (TextMessage)consumer.receive(1000);
         assertEquals("test", msg.getText());
     }
 
+
+    public void testBrokerWontStop() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?async=false");
+        factory.setDispatchAsync(false);
+        factory.setAlwaysSessionAsync(false);
+        Connection conn = factory.createConnection();
+        final Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+        conn.start();
+        final Destination dest = new ActiveMQQueue("TEST");
+        final CountDownLatch stop = new CountDownLatch(1);
+        final CountDownLatch sendSecond = new CountDownLatch(1);
+        final CountDownLatch shutdown = new CountDownLatch(1);
+        final CountDownLatch test = new CountDownLatch(1);
+
+        ActiveMQConnectionFactory testFactory = new ActiveMQConnectionFactory("vm://localhost?async=false");
+        Connection testConn = testFactory.createConnection();
+        testConn.start();
+        Destination testDestination = sess.createQueue("NEW");
+        Session testSess = testConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageProducer testProducer = testSess.createProducer(testDestination);
+
+        final Thread consumerThread = new Thread() {
+            @Override
+            public void run() {
+                try {
+                MessageProducer producer = sess.createProducer(dest);
+                producer.send(sess.createTextMessage("msg1"));
+                MessageConsumer consumer = sess.createConsumer(dest);
+                consumer.setMessageListener(new MessageListener() {
+                    @Override
+                    public void onMessage(Message message) {
+                        try {
+                            // send a message that will block
+                            Thread.sleep(2000);
+                            sendSecond.countDown();
+                            // try to stop the broker
+                            Thread.sleep(5000);
+                            stop.countDown();
+                            // run the test
+                            Thread.sleep(5000);
+                            test.countDown();
+                            shutdown.await();
+                        } catch (InterruptedException ie) {
+                        }
+                    }
+                });
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+
+        consumerThread.start();
+
+        final Thread producerThread = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    sendSecond.await();
+                    MessageProducer producer = sess.createProducer(dest);
+                    producer.send(sess.createTextMessage("msg2"));
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+
+        producerThread.start();
+
+        final Thread stopThread = new Thread() {
+            @Override
+            public void run() {
+                try {
+                    stop.await();
+                    brokerService.stop();
+                } catch (Exception e) {
+                    e.printStackTrace();
+                }
+            }
+        };
+
+        stopThread.start();
+
+        test.await();
+        try {
+            testSess.createConsumer(testDestination);
+            fail("Should have failed creating a consumer!");
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        try {
+            testProducer.send(testSess.createTextMessage("msg3"));
+            fail("Should have failed sending a message!");
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+
+        shutdown.countDown();
+
+
+    }
+
     @Override
     protected void setUp() throws Exception {
         brokerService = createBroker();

http://git-wip-us.apache.org/repos/asf/activemq/blob/c6fe94ec/activemq-unit-tests/src/test/resources/org/apache/activemq/xbean/connector-test.xml
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/xbean/connector-test.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/xbean/connector-test.xml
index 99dd04f..5fb6403 100644
--- a/activemq-unit-tests/src/test/resources/org/apache/activemq/xbean/connector-test.xml
+++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/xbean/connector-test.xml
@@ -27,19 +27,27 @@
 
   <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
 
-  <broker useJmx="true" xmlns="http://activemq.apache.org/schema/core">
+  <broker useJmx="true" xmlns="http://activemq.apache.org/schema/core" persistent="false">
 
+    <destinationPolicy>
+      <policyMap>
+        <policyEntries>
+          <policyEntry queue=">" producerFlowControl="true" memoryLimit="20mb" optimizedDispatch="true">
+          </policyEntry>
+        </policyEntries>
+      </policyMap>
+    </destinationPolicy>
 
     <networkConnectors>
       <networkConnector uri="static://(tcp://localhost:61616)">
-      	<dynamicallyIncludedDestinations>
-      		<queue physicalName="include.test.foo"/>
-      		<topic physicalName="include.test.bar"/>
-      	</dynamicallyIncludedDestinations>
-      	<excludedDestinations>
-      		<queue physicalName="exclude.test.foo"/>
-      		<topic physicalName="exclude.test.bar"/>
-      	</excludedDestinations>
+        <dynamicallyIncludedDestinations>
+          <queue physicalName="include.test.foo"/>
+          <topic physicalName="include.test.bar"/>
+        </dynamicallyIncludedDestinations>
+        <excludedDestinations>
+          <queue physicalName="exclude.test.foo"/>
+          <topic physicalName="exclude.test.bar"/>
+        </excludedDestinations>
       </networkConnector>
     </networkConnectors>