You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2014/02/21 11:25:04 UTC
git commit: https://issues.apache.org/jira/browse/AMQ-5070 - broker
blocked on shutdown
Repository: activemq
Updated Branches:
refs/heads/trunk dcbac84a8 -> c6fe94ec0
https://issues.apache.org/jira/browse/AMQ-5070 - broker blocked on shutdown
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c6fe94ec
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c6fe94ec
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c6fe94ec
Branch: refs/heads/trunk
Commit: c6fe94ec0388f2acb169c9e3839ade5dc7c2c65f
Parents: dcbac84
Author: Dejan Bosanac <de...@nighttale.net>
Authored: Fri Feb 21 11:22:00 2014 +0100
Committer: Dejan Bosanac <de...@nighttale.net>
Committed: Fri Feb 21 11:22:11 2014 +0100
----------------------------------------------------------------------
.../activemq/broker/TransportConnection.java | 10 +-
.../activemq/broker/TransportConnector.java | 8 +-
.../xbean/ConnectorXBeanConfigTest.java | 114 +++++++++++++++++--
.../apache/activemq/xbean/connector-test.xml | 26 +++--
4 files changed, 137 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/c6fe94ec/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index 972ffe2..65d044b 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -144,9 +144,13 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
throw new RuntimeException("Protocol violation - Command corrupted: " + o.toString());
}
Command command = (Command) o;
- Response response = service(command);
- if (response != null && !brokerService.isStopping() ) {
- dispatchSync(response);
+ if (!brokerService.isStopping()) {
+ Response response = service(command);
+ if (response != null && !brokerService.isStopping()) {
+ dispatchSync(response);
+ }
+ } else {
+ throw new BrokerStoppedException("Broker " + brokerService + " is being stopped");
}
} finally {
serviceLock.readLock().unlock();
http://git-wip-us.apache.org/repos/asf/activemq/blob/c6fe94ec/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
index 582bc3f..5aa074b 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnector.java
@@ -216,8 +216,12 @@ public class TransportConnector implements Connector, BrokerServiceAware {
@Override
public void run() {
try {
- Connection connection = createConnection(transport);
- connection.start();
+ if (!brokerService.isStopping()) {
+ Connection connection = createConnection(transport);
+ connection.start();
+ } else {
+ throw new BrokerStoppedException("Broker " + brokerService + " is being stopped");
+ }
} catch (Exception e) {
String remoteHost = transport.getRemoteAddress();
ServiceSupport.dispose(transport);
http://git-wip-us.apache.org/repos/asf/activemq/blob/c6fe94ec/activemq-unit-tests/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java
index 4b328dd..ad96459 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/xbean/ConnectorXBeanConfigTest.java
@@ -18,13 +18,9 @@ package org.apache.activemq.xbean;
import java.net.URI;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
-import javax.jms.Connection;
-import javax.jms.Destination;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.Session;
-import javax.jms.TextMessage;
+import javax.jms.*;
import junit.framework.TestCase;
@@ -36,6 +32,7 @@ import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.MessageIdList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -86,13 +83,116 @@ public class ConnectorXBeanConfigTest extends TestCase {
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
conn.start();
Destination dest = new ActiveMQQueue("test");
- MessageProducer producer = sess.createProducer(dest);
MessageConsumer consumer = sess.createConsumer(dest);
+ MessageProducer producer = sess.createProducer(dest);
producer.send(sess.createTextMessage("test"));
TextMessage msg = (TextMessage)consumer.receive(1000);
assertEquals("test", msg.getText());
}
+
+ public void testBrokerWontStop() throws Exception {
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost?async=false");
+ factory.setDispatchAsync(false);
+ factory.setAlwaysSessionAsync(false);
+ Connection conn = factory.createConnection();
+ final Session sess = conn.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+ conn.start();
+ final Destination dest = new ActiveMQQueue("TEST");
+ final CountDownLatch stop = new CountDownLatch(1);
+ final CountDownLatch sendSecond = new CountDownLatch(1);
+ final CountDownLatch shutdown = new CountDownLatch(1);
+ final CountDownLatch test = new CountDownLatch(1);
+
+ ActiveMQConnectionFactory testFactory = new ActiveMQConnectionFactory("vm://localhost?async=false");
+ Connection testConn = testFactory.createConnection();
+ testConn.start();
+ Destination testDestination = sess.createQueue("NEW");
+ Session testSess = testConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer testProducer = testSess.createProducer(testDestination);
+
+ final Thread consumerThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ MessageProducer producer = sess.createProducer(dest);
+ producer.send(sess.createTextMessage("msg1"));
+ MessageConsumer consumer = sess.createConsumer(dest);
+ consumer.setMessageListener(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ try {
+ // send a message that will block
+ Thread.sleep(2000);
+ sendSecond.countDown();
+ // try to stop the broker
+ Thread.sleep(5000);
+ stop.countDown();
+ // run the test
+ Thread.sleep(5000);
+ test.countDown();
+ shutdown.await();
+ } catch (InterruptedException ie) {
+ }
+ }
+ });
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ consumerThread.start();
+
+ final Thread producerThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ sendSecond.await();
+ MessageProducer producer = sess.createProducer(dest);
+ producer.send(sess.createTextMessage("msg2"));
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ producerThread.start();
+
+ final Thread stopThread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ stop.await();
+ brokerService.stop();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ stopThread.start();
+
+ test.await();
+ try {
+ testSess.createConsumer(testDestination);
+ fail("Should have failed creating a consumer!");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ try {
+ testProducer.send(testSess.createTextMessage("msg3"));
+ fail("Should have failed sending a message!");
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+
+ shutdown.countDown();
+
+
+ }
+
@Override
protected void setUp() throws Exception {
brokerService = createBroker();
http://git-wip-us.apache.org/repos/asf/activemq/blob/c6fe94ec/activemq-unit-tests/src/test/resources/org/apache/activemq/xbean/connector-test.xml
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/xbean/connector-test.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/xbean/connector-test.xml
index 99dd04f..5fb6403 100644
--- a/activemq-unit-tests/src/test/resources/org/apache/activemq/xbean/connector-test.xml
+++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/xbean/connector-test.xml
@@ -27,19 +27,27 @@
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
- <broker useJmx="true" xmlns="http://activemq.apache.org/schema/core">
+ <broker useJmx="true" xmlns="http://activemq.apache.org/schema/core" persistent="false">
+ <destinationPolicy>
+ <policyMap>
+ <policyEntries>
+ <policyEntry queue=">" producerFlowControl="true" memoryLimit="20mb" optimizedDispatch="true">
+ </policyEntry>
+ </policyEntries>
+ </policyMap>
+ </destinationPolicy>
<networkConnectors>
<networkConnector uri="static://(tcp://localhost:61616)">
- <dynamicallyIncludedDestinations>
- <queue physicalName="include.test.foo"/>
- <topic physicalName="include.test.bar"/>
- </dynamicallyIncludedDestinations>
- <excludedDestinations>
- <queue physicalName="exclude.test.foo"/>
- <topic physicalName="exclude.test.bar"/>
- </excludedDestinations>
+ <dynamicallyIncludedDestinations>
+ <queue physicalName="include.test.foo"/>
+ <topic physicalName="include.test.bar"/>
+ </dynamicallyIncludedDestinations>
+ <excludedDestinations>
+ <queue physicalName="exclude.test.foo"/>
+ <topic physicalName="exclude.test.bar"/>
+ </excludedDestinations>
</networkConnector>
</networkConnectors>