You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2014/02/11 00:14:44 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-5035

Updated Branches:
  refs/heads/trunk 42d006f2b -> 4c38b03d1


https://issues.apache.org/jira/browse/AMQ-5035

Fix and test for variations in configuration settings that result in a
non-region broker as the AdvisoryBroker's next instance. 

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4c38b03d
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4c38b03d
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4c38b03d

Branch: refs/heads/trunk
Commit: 4c38b03d14f2d8e803d79272ae08d79b52105959
Parents: 42d006f
Author: Timothy Bish <ta...@gmai.com>
Authored: Mon Feb 10 18:14:41 2014 -0500
Committer: Timothy Bish <ta...@gmai.com>
Committed: Mon Feb 10 18:14:41 2014 -0500

----------------------------------------------------------------------
 .../activemq/advisory/AdvisoryBroker.java       |  33 +++++-
 .../org/apache/activemq/bugs/AMQ5035Test.java   |  82 ++++++++++++++
 .../apache/activemq/bugs/amq5035/activemq.xml   | 109 +++++++++++++++++++
 3 files changed, 222 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/4c38b03d/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
index 5c90287..d48bf16 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
@@ -25,6 +25,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 
 import org.apache.activemq.broker.Broker;
 import org.apache.activemq.broker.BrokerFilter;
+import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
 import org.apache.activemq.broker.region.Destination;
@@ -34,7 +35,21 @@ import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.broker.region.Subscription;
 import org.apache.activemq.broker.region.TopicRegion;
 import org.apache.activemq.broker.region.TopicSubscription;
-import org.apache.activemq.command.*;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.ProducerId;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.security.SecurityContext;
 import org.apache.activemq.state.ProducerState;
 import org.apache.activemq.usage.Usage;
@@ -261,7 +276,21 @@ public class AdvisoryBroker extends BrokerFilter {
     @Override
     public void removeSubscription(ConnectionContext context, RemoveSubscriptionInfo info) throws Exception {
         SubscriptionKey key = new SubscriptionKey(context.getClientId(), info.getSubscriptionName());
-        DurableTopicSubscription sub = ((TopicRegion) ((RegionBroker) next).getTopicRegion()).getDurableSubscription(key);
+
+        RegionBroker regionBroker = null;
+        if (next instanceof RegionBroker) {
+            regionBroker = (RegionBroker) next;
+        } else {
+            BrokerService service = next.getBrokerService();
+            regionBroker = (RegionBroker) service.getRegionBroker();
+        }
+
+        if (regionBroker == null) {
+            LOG.warn("Cannot locate a RegionBroker instance to pass along the removeSubscription call");
+            throw new IllegalStateException("No RegionBroker found.");
+        }
+
+        DurableTopicSubscription sub = ((TopicRegion) regionBroker.getTopicRegion()).getDurableSubscription(key);
 
         super.removeSubscription(context, info);
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/4c38b03d/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5035Test.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5035Test.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5035Test.java
new file mode 100644
index 0000000..a253bc8
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ5035Test.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import static org.junit.Assert.assertNotNull;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.jmx.BrokerViewMBean;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class AMQ5035Test {
+
+    private static final String CLIENT_ID = "amq-test-client-id";
+    private static final String DURABLE_SUB_NAME = "testDurable";
+
+    private final String xbean = "xbean:";
+    private final String confBase = "src/test/resources/org/apache/activemq/bugs/amq5035";
+
+    private static BrokerService brokerService;
+    private String connectionUri;
+
+    @Before
+    public void setUp() throws Exception {
+        brokerService = BrokerFactory.createBroker(xbean + confBase + "/activemq.xml");
+        connectionUri = brokerService.getTransportConnectorByScheme("tcp").getPublishableConnectString();
+        brokerService.start();
+        brokerService.waitUntilStarted();
+    }
+
+    @After
+    public void tearDown() throws Exception {
+        brokerService.stop();
+        brokerService.waitUntilStopped();
+    }
+
+    @Test
+    public void testFoo() throws Exception {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(connectionUri);
+        Connection connection = factory.createConnection();
+        connection.setClientID(CLIENT_ID);
+        connection.start();
+        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Topic topic = session.createTopic("Test.Topic");
+        MessageConsumer consumer = session.createDurableSubscriber(topic, DURABLE_SUB_NAME);
+        consumer.close();
+
+        BrokerViewMBean brokerView = getBrokerView(DURABLE_SUB_NAME);
+        brokerView.destroyDurableSubscriber(CLIENT_ID, DURABLE_SUB_NAME);
+    }
+
+    private BrokerViewMBean getBrokerView(String testDurable) throws MalformedObjectNameException {
+        ObjectName brokerName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost");
+        BrokerViewMBean view = (BrokerViewMBean) brokerService.getManagementContext().newProxyInstance(brokerName, BrokerViewMBean.class, true);
+        assertNotNull(view);
+        return view;
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/4c38b03d/activemq-unit-tests/src/test/resources/org/apache/activemq/bugs/amq5035/activemq.xml
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/bugs/amq5035/activemq.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/bugs/amq5035/activemq.xml
new file mode 100644
index 0000000..1bd5a42
--- /dev/null
+++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/bugs/amq5035/activemq.xml
@@ -0,0 +1,109 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<!-- START SNIPPET: example -->
+<beans
+  xmlns="http://www.springframework.org/schema/beans"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
+
+    <!--
+        The <broker> element is used to configure the ActiveMQ broker.
+    -->
+    <broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">
+
+        <destinationPolicy>
+            <policyMap>
+              <policyEntries>
+                <policyEntry topic=">" >
+                    <!-- The constantPendingMessageLimitStrategy is used to prevent
+                         slow topic consumers to block producers and affect other consumers
+                         by limiting the number of messages that are retained
+                         For more information, see:
+
+                         http://activemq.apache.org/slow-consumer-handling.html
+                    -->
+                  <pendingMessageLimitStrategy>
+                    <constantPendingMessageLimitStrategy limit="1000"/>
+                  </pendingMessageLimitStrategy>
+                </policyEntry>
+              </policyEntries>
+            </policyMap>
+        </destinationPolicy>
+
+        <!--
+            The managementContext is used to configure how ActiveMQ is exposed in
+            JMX. By default, ActiveMQ uses the MBean server that is started by
+            the JVM. For more information, see:
+
+            http://activemq.apache.org/jmx.html
+        -->
+        <managementContext>
+            <managementContext createConnector="false"/>
+        </managementContext>
+
+        <!--
+            Configure message persistence for the broker. The default persistence
+            mechanism is the KahaDB store (identified by the kahaDB tag).
+            For more information, see:
+
+            http://activemq.apache.org/persistence.html
+        -->
+        <persistenceAdapter>
+            <kahaDB directory="${activemq.data}/kahadb"/>
+        </persistenceAdapter>
+
+
+          <!--
+            The systemUsage controls the maximum amount of space the broker will
+            use before disabling caching and/or slowing down producers. For more information, see:
+            http://activemq.apache.org/producer-flow-control.html
+          -->
+          <systemUsage>
+            <systemUsage>
+                <memoryUsage>
+                    <memoryUsage percentOfJvmHeap="70" />
+                </memoryUsage>
+                <storeUsage>
+                    <storeUsage limit="100 gb"/>
+                </storeUsage>
+                <tempUsage>
+                    <tempUsage limit="50 gb"/>
+                </tempUsage>
+            </systemUsage>
+        </systemUsage>
+
+        <!--
+            The transport connectors expose ActiveMQ over a given protocol to
+            clients and other brokers. For more information, see:
+
+            http://activemq.apache.org/configuring-transports.html
+        -->
+        <transportConnectors>
+            <!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
+            <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
+        </transportConnectors>
+
+        <!-- destroy the spring context on shutdown to stop jetty -->
+        <shutdownHooks>
+            <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
+        </shutdownHooks>
+
+    </broker>
+
+</beans>
+<!-- END SNIPPET: example -->