You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/02/11 00:14:44 UTC
git commit: https://issues.apache.org/jira/browse/AMQ-5035
Updated Branches:
refs/heads/trunk 42d006f2b -> 4c38b03d1
https://issues.apache.org/jira/browse/AMQ-5035
Fix and test for variations in configuration settings that result in a
non-region broker as the AdvisoryBroker's next instance.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4c38b03d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4c38b03d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4c38b03d
Branch: refs/heads/trunk
Commit: 4c38b03d14f2d8e803d79272ae08d79b52105959
Parents: 42d006f
Author: Timothy Bish <ta...@gmai.com>
Authored: Mon Feb 10 18:14:41 2014 -0500
Committer: Timothy Bish <ta...@gmai.com>
Committed: Mon Feb 10 18:14:41 2014 -0500
----------------------------------------------------------------------
.../activemq/advisory/AdvisoryBroker.java | 33 +++++-
.../org/apache/activemq/bugs/AMQ5035Test.java | 82 ++++++++++++++
.../apache/activemq/bugs/amq5035/activemq.xml | 109 +++++++++++++++++++
3 files changed, 222 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/4c38b03d/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
index 5c90287..d48bf16 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.ConnectionContext;
import org.apache.activemq.broker.ProducerBrokerExchange;
import org.apache.activemq.broker.region.Destination;
@@ -34,7 +35,21 @@ import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.TopicRegion;
import org.apache.activemq.broker.region.TopicSubscription;
-import org.apache.activemq.command.*;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
import org.apache.activemq.security.SecurityContext;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.usage.Usage;
@@ -261,7 +276,21 @@ public class AdvisoryBroker extends BrokerFilter {
@Override
public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
- DurableTopicSubscription sub = ((TopicRegion) ((RegionBroker) next).getTopicRegion()).getDurableSubscription(key);
+
+ RegionBroker regionBroker = null;
+ if (next instanceof RegionBroker) {
+ regionBroker = (RegionBroker) next;
+ } else {
+ BrokerService service = next.getBrokerService();
+ regionBroker = (RegionBroker) service.getRegionBroker();
+ }
+
+ if (regionBroker == null) {
+ LOG.warn("Cannot locate a RegionBroker instance to pass along the removeSubscription call");
+ throw new IllegalStateException("No RegionBroker found.");
+ }
+
+ DurableTopicSubscription sub = ((TopicRegion) regionBroker.getTopicRegion()).getDurableSubscription(key);
super.removeSubscription(context, info);
http://git-wip-us.apache.org/repos/asf/activemq/blob/4c38b03d/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5035Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5035Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5035Test.java
new file mode 100644
index 0000000..a253bc8
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5035Test.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertNotNull;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.BrokerViewMBean;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQ5035Test {
+
+ private static final String CLIENT_ID = "amq-test-client-id";
+ private static final String DURABLE_SUB_NAME = "testDurable";
+
+ private final String xbean = "xbean:";
+ private final String confBase = "src/test/resources/org/apache/activemq/bugs/amq5035";
+
+ private static BrokerService brokerService;
+ private String connectionUri;
+
+ @Before
+ public void setUp() throws Exception {
+ brokerService = BrokerFactory.createBroker(xbean + confBase + "/activemq.xml");
+ connectionUri = brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString();
+ brokerService.start();
+ brokerService.waitUntilStarted();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ brokerService.stop();
+ brokerService.waitUntilStopped();
+ }
+
+ @Test
+ public void testFoo() throws Exception {
+ ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
+ Connection connection = factory.createConnection();
+ connection.setClientID(CLIENT_ID);
+ connection.start();
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Topic topic = session.createTopic("Test.Topic");
+ MessageConsumer consumer = session.createDurableSubscriber(topic, DURABLE_SUB_NAME);
+ consumer.close();
+
+ BrokerViewMBean brokerView = getBrokerView(DURABLE_SUB_NAME);
+ brokerView.destroyDurableSubscriber(CLIENT_ID, DURABLE_SUB_NAME);
+ }
+
+ private BrokerViewMBean getBrokerView(String testDurable) throws MalformedObjectNameException {
+ ObjectName brokerName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost");
+ BrokerViewMBean view = (BrokerViewMBean) brokerService.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true);
+ assertNotNull(view);
+ return view;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/4c38b03d/activemq-unit-tests/src/test/resources/org/apache/activemq/bugs/amq5035/activemq.xml
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/bugs/amq5035/activemq.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/bugs/amq5035/activemq.xml
new file mode 100644
index 0000000..1bd5a42
--- /dev/null
+++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/bugs/amq5035/activemq.xml
@@ -0,0 +1,109 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<!-- START SNIPPET: example -->
+<beans
+ xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+ http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+ <!--
+ The <broker> element is used to configure the ActiveMQ broker.
+ -->
+ <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">
+
+ <destinationPolicy>
+ <policyMap>
+ <policyEntries>
+ <policyEntry topic=">" >
+ <!-- The constantPendingMessageLimitStrategy is used to prevent
+ slow topic consumers to block producers and affect other consumers
+ by limiting the number of messages that are retained
+ For more information, see:
+
+ http://activemq.apache.org/slow-consumer-handling.html
+ -->
+ <pendingMessageLimitStrategy>
+ <constantPendingMessageLimitStrategy limit="1000"/>
+ </pendingMessageLimitStrategy>
+ </policyEntry>
+ </policyEntries>
+ </policyMap>
+ </destinationPolicy>
+
+ <!--
+ The managementContext is used to configure how ActiveMQ is exposed in
+ JMX. By default, ActiveMQ uses the MBean server that is started by
+ the JVM. For more information, see:
+
+ http://activemq.apache.org/jmx.html
+ -->
+ <managementContext>
+ <managementContext createConnector="false"/>
+ </managementContext>
+
+ <!--
+ Configure message persistence for the broker. The default persistence
+ mechanism is the KahaDB store (identified by the kahaDB tag).
+ For more information, see:
+
+ http://activemq.apache.org/persistence.html
+ -->
+ <persistenceAdapter>
+ <kahaDB directory="${activemq.data}/kahadb"/>
+ </persistenceAdapter>
+
+
+ <!--
+ The systemUsage controls the maximum amount of space the broker will
+ use before disabling caching and/or slowing down producers. For more information, see:
+ http://activemq.apache.org/producer-flow-control.html
+ -->
+ <systemUsage>
+ <systemUsage>
+ <memoryUsage>
+ <memoryUsage percentOfJvmHeap="70" />
+ </memoryUsage>
+ <storeUsage>
+ <storeUsage limit="100 gb"/>
+ </storeUsage>
+ <tempUsage>
+ <tempUsage limit="50 gb"/>
+ </tempUsage>
+ </systemUsage>
+ </systemUsage>
+
+ <!--
+ The transport connectors expose ActiveMQ over a given protocol to
+ clients and other brokers. For more information, see:
+
+ http://activemq.apache.org/configuring-transports.html
+ -->
+ <transportConnectors>
+ <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
+ <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
+ </transportConnectors>
+
+ <!-- destroy the spring context on shutdown to stop jetty -->
+ <shutdownHooks>
+ <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
+ </shutdownHooks>
+
+ </broker>
+
+</beans>
+<!-- END SNIPPET: example -->