You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/01/31 23:47:25 UTC
svn commit: r1441240 - in /activemq/trunk:
activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java
Author: tabish
Date: Thu Jan 31 22:47:25 2013
New Revision: 1441240
URL: http://svn.apache.org/viewvc?rev=1441240&view=rev
Log:
Add a fix for: https://issues.apache.org/jira/browse/AMQ-4147
Added:
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java (with props)
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1441240&r1=1441239&r2=1441240&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Thu Jan 31 22:47:25 2013
@@ -922,6 +922,7 @@ public abstract class DemandForwardingBr
message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath));
message.setProducerId(producerInfo.getProducerId());
message.setDestination(md.getDestination());
+ message.setMemoryUsage(null);
if (message.getOriginalTransactionId() == null) {
message.setOriginalTransactionId(message.getTransactionId());
}
Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java?rev=1441240&view=auto
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java (added)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java Thu Jan 31 22:47:25 2013
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import java.net.URI;
+import java.util.concurrent.Semaphore;
+
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.network.DemandForwardingBridgeSupport;
+import org.apache.activemq.util.MessageIdList;
+import org.apache.activemq.util.Wait;
+
+/**
+ * This test demonstrates a bug in {@link DemandForwardingBridgeSupport} when
+ * bridges are VM-to-VM. Specifically, memory usage from the local broker is
+ * manipulated by the remote broker.
+ */
+public class AMQ4147Test extends JmsMultipleBrokersTestSupport {
+ /**
+ * This test demonstrates the bug: namely, when a message is bridged over
+ * the VMTransport, its memory usage continues to refer to the originating
+ * broker. As a result, memory usage is never accounted for on the remote
+ * broker, and the local broker's memory usage is only decreased once the
+ * message is consumed on the remote broker.
+ */
+ public void testVMTransportRemoteMemoryUsage() throws Exception {
+ BrokerService broker1 = createBroker(new URI(
+ "broker:(vm://broker1)/broker1?persistent=false"));
+
+ BrokerService broker2 = createBroker(new URI(
+ "broker:(vm://broker2)/broker2?persistent=false"));
+
+ startAllBrokers();
+
+ // Forward messages from broker1 to broker2 over the VM transport.
+ bridgeBrokers("broker1", "broker2").start();
+
+ // Verify that broker1 and broker2's test queues have no memory usage.
+ ActiveMQDestination testQueue = createDestination(
+ AMQ4147Test.class.getSimpleName() + ".queue", false);
+ final Destination broker1TestQueue = broker1.getDestination(testQueue);
+ final Destination broker2TestQueue = broker2.getDestination(testQueue);
+
+ assertEquals(0, broker1TestQueue.getMemoryUsage().getUsage());
+ assertEquals(0, broker2TestQueue.getMemoryUsage().getUsage());
+
+ // Produce a message to broker1's test queue and verify that broker1's
+ // memory usage has increased, but broker2 still has no memory usage.
+ sendMessages("broker1", testQueue, 1);
+ assertTrue(broker1TestQueue.getMemoryUsage().getUsage() > 0);
+ assertEquals(0, broker2TestQueue.getMemoryUsage().getUsage());
+
+ // Create a consumer on broker2 that is synchronized to allow detection
+ // of "in flight" messages to the consumer.
+ MessageIdList broker2Messages = getBrokerMessages("broker2");
+ final Semaphore consumerReady = new Semaphore(0);
+ final Semaphore consumerProceed = new Semaphore(0);
+
+ broker2Messages.setParent(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ consumerReady.release();
+ try {
+ consumerProceed.acquire();
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ });
+
+ createConsumer("broker2", testQueue);
+
+ // Verify that when broker2's consumer receives the message, the memory
+ // usage has moved broker1 to broker2. The first assertion is expected
+ // to fail due to the bug; the try/finally ensures the consumer is
+ // released prior to failure so that the broker can shut down.
+ consumerReady.acquire();
+
+ try {
+ assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return broker1TestQueue.getMemoryUsage().getUsage() == 0;
+ }
+ }));
+ assertTrue(broker2TestQueue.getMemoryUsage().getUsage() > 0);
+ } finally {
+ // Consume the message and verify that there is no more memory
+ // usage.
+ consumerProceed.release();
+ }
+
+ assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return broker1TestQueue.getMemoryUsage().getUsage() == 0;
+ }
+ }));
+ assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return broker2TestQueue.getMemoryUsage().getUsage() == 0;
+ }
+ }));
+ }
+
+ /**
+ * This test demonstrates that the bug is VMTransport-specific and does not
+ * occur when bridges occur using other protocols.
+ */
+ public void testTcpTransportRemoteMemoryUsage() throws Exception {
+ BrokerService broker1 = createBroker(new URI(
+ "broker:(vm://broker1)/broker1?persistent=false"));
+
+ BrokerService broker2 = createBroker(new URI(
+ "broker:(tcp://localhost:61616)/broker2?persistent=false"));
+
+ startAllBrokers();
+
+ // Forward messages from broker1 to broker2 over the TCP transport.
+ bridgeBrokers("broker1", "broker2").start();
+
+ // Verify that broker1 and broker2's test queues have no memory usage.
+ ActiveMQDestination testQueue = createDestination(
+ AMQ4147Test.class.getSimpleName() + ".queue", false);
+ final Destination broker1TestQueue = broker1.getDestination(testQueue);
+ final Destination broker2TestQueue = broker2.getDestination(testQueue);
+
+ assertEquals(0, broker1TestQueue.getMemoryUsage().getUsage());
+ assertEquals(0, broker2TestQueue.getMemoryUsage().getUsage());
+
+ // Produce a message to broker1's test queue and verify that broker1's
+ // memory usage has increased, but broker2 still has no memory usage.
+ sendMessages("broker1", testQueue, 1);
+ assertTrue(broker1TestQueue.getMemoryUsage().getUsage() > 0);
+ assertEquals(0, broker2TestQueue.getMemoryUsage().getUsage());
+
+ // Create a consumer on broker2 that is synchronized to allow detection
+ // of "in flight" messages to the consumer.
+ MessageIdList broker2Messages = getBrokerMessages("broker2");
+ final Semaphore consumerReady = new Semaphore(0);
+ final Semaphore consumerProceed = new Semaphore(0);
+
+ broker2Messages.setParent(new MessageListener() {
+ @Override
+ public void onMessage(Message message) {
+ consumerReady.release();
+ try {
+ consumerProceed.acquire();
+ } catch (InterruptedException ex) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ });
+
+ createConsumer("broker2", testQueue);
+
+ // Verify that when broker2's consumer receives the message, the memory
+ // usage has moved broker1 to broker2.
+ consumerReady.acquire();
+
+ try {
+ assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return broker1TestQueue.getMemoryUsage().getUsage() == 0;
+ }
+ }));
+ assertTrue(broker2TestQueue.getMemoryUsage().getUsage() > 0);
+ } finally {
+ // Consume the message and verify that there is no more memory
+ // usage.
+ consumerProceed.release();
+ }
+
+ // Pause to allow ACK to be processed.
+ assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return broker1TestQueue.getMemoryUsage().getUsage() == 0;
+ }
+ }));
+ assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisified() throws Exception {
+ return broker2TestQueue.getMemoryUsage().getUsage() == 0;
+ }
+ }));
+ }
+}
Propchange: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java
------------------------------------------------------------------------------
svn:eol-style = native