You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/01/31 23:47:25 UTC

svn commit: r1441240 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java

Author: tabish
Date: Thu Jan 31 22:47:25 2013
New Revision: 1441240

URL: http://svn.apache.org/viewvc?rev=1441240&view=rev
Log:
Add a fix for: https://issues.apache.org/jira/browse/AMQ-4147

Added:
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java   (with props)
Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1441240&r1=1441239&r2=1441240&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Thu Jan 31 22:47:25 2013
@@ -922,6 +922,7 @@ public abstract class DemandForwardingBr
         message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(), localBrokerPath));
         message.setProducerId(producerInfo.getProducerId());
         message.setDestination(md.getDestination());
+        message.setMemoryUsage(null);
         if (message.getOriginalTransactionId() == null) {
             message.setOriginalTransactionId(message.getTransactionId());
         }

Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java?rev=1441240&view=auto
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java (added)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java Thu Jan 31 22:47:25 2013
@@ -0,0 +1,211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.bugs;
+
+import java.net.URI;
+import java.util.concurrent.Semaphore;
+
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.Destination;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.network.DemandForwardingBridgeSupport;
+import org.apache.activemq.util.MessageIdList;
+import org.apache.activemq.util.Wait;
+
+/**
+ * This test demonstrates a bug in {@link DemandForwardingBridgeSupport} when
+ * bridges are VM-to-VM. Specifically, memory usage from the local broker is
+ * manipulated by the remote broker.
+ */
+public class AMQ4147Test extends JmsMultipleBrokersTestSupport {
+    /**
+     * This test demonstrates the bug: namely, when a message is bridged over
+     * the VMTransport, its memory usage continues to refer to the originating
+     * broker. As a result, memory usage is never accounted for on the remote
+     * broker, and the local broker's memory usage is only decreased once the
+     * message is consumed on the remote broker.
+     */
+    public void testVMTransportRemoteMemoryUsage() throws Exception {
+        BrokerService broker1 = createBroker(new URI(
+                "broker:(vm://broker1)/broker1?persistent=false"));
+
+        BrokerService broker2 = createBroker(new URI(
+                "broker:(vm://broker2)/broker2?persistent=false"));
+
+        startAllBrokers();
+
+        // Forward messages from broker1 to broker2 over the VM transport.
+        bridgeBrokers("broker1", "broker2").start();
+
+        // Verify that broker1 and broker2's test queues have no memory usage.
+        ActiveMQDestination testQueue = createDestination(
+                AMQ4147Test.class.getSimpleName() + ".queue", false);
+        final Destination broker1TestQueue = broker1.getDestination(testQueue);
+        final Destination broker2TestQueue = broker2.getDestination(testQueue);
+
+        assertEquals(0, broker1TestQueue.getMemoryUsage().getUsage());
+        assertEquals(0, broker2TestQueue.getMemoryUsage().getUsage());
+
+        // Produce a message to broker1's test queue and verify that broker1's
+        // memory usage has increased, but broker2 still has no memory usage.
+        sendMessages("broker1", testQueue, 1);
+        assertTrue(broker1TestQueue.getMemoryUsage().getUsage() > 0);
+        assertEquals(0, broker2TestQueue.getMemoryUsage().getUsage());
+
+        // Create a consumer on broker2 that is synchronized to allow detection
+        // of "in flight" messages to the consumer.
+        MessageIdList broker2Messages = getBrokerMessages("broker2");
+        final Semaphore consumerReady = new Semaphore(0);
+        final Semaphore consumerProceed = new Semaphore(0);
+
+        broker2Messages.setParent(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                consumerReady.release();
+                try {
+                    consumerProceed.acquire();
+                } catch (InterruptedException ex) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        });
+
+        createConsumer("broker2", testQueue);
+
+        // Verify that when broker2's consumer receives the message, the memory
+        // usage has moved broker1 to broker2. The first assertion is expected
+        // to fail due to the bug; the try/finally ensures the consumer is
+        // released prior to failure so that the broker can shut down.
+        consumerReady.acquire();
+
+        try {
+            assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return broker1TestQueue.getMemoryUsage().getUsage() == 0;
+                }
+            }));
+            assertTrue(broker2TestQueue.getMemoryUsage().getUsage() > 0);
+        } finally {
+            // Consume the message and verify that there is no more memory
+            // usage.
+            consumerProceed.release();
+        }
+
+        assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return broker1TestQueue.getMemoryUsage().getUsage() == 0;
+            }
+        }));
+        assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return broker2TestQueue.getMemoryUsage().getUsage() == 0;
+            }
+        }));
+    }
+
+    /**
+     * This test demonstrates that the bug is VMTransport-specific and does not
+     * occur when bridges occur using other protocols.
+     */
+    public void testTcpTransportRemoteMemoryUsage() throws Exception {
+        BrokerService broker1 = createBroker(new URI(
+                "broker:(vm://broker1)/broker1?persistent=false"));
+
+        BrokerService broker2 = createBroker(new URI(
+                "broker:(tcp://localhost:61616)/broker2?persistent=false"));
+
+        startAllBrokers();
+
+        // Forward messages from broker1 to broker2 over the TCP transport.
+        bridgeBrokers("broker1", "broker2").start();
+
+        // Verify that broker1 and broker2's test queues have no memory usage.
+        ActiveMQDestination testQueue = createDestination(
+                AMQ4147Test.class.getSimpleName() + ".queue", false);
+        final Destination broker1TestQueue = broker1.getDestination(testQueue);
+        final Destination broker2TestQueue = broker2.getDestination(testQueue);
+
+        assertEquals(0, broker1TestQueue.getMemoryUsage().getUsage());
+        assertEquals(0, broker2TestQueue.getMemoryUsage().getUsage());
+
+        // Produce a message to broker1's test queue and verify that broker1's
+        // memory usage has increased, but broker2 still has no memory usage.
+        sendMessages("broker1", testQueue, 1);
+        assertTrue(broker1TestQueue.getMemoryUsage().getUsage() > 0);
+        assertEquals(0, broker2TestQueue.getMemoryUsage().getUsage());
+
+        // Create a consumer on broker2 that is synchronized to allow detection
+        // of "in flight" messages to the consumer.
+        MessageIdList broker2Messages = getBrokerMessages("broker2");
+        final Semaphore consumerReady = new Semaphore(0);
+        final Semaphore consumerProceed = new Semaphore(0);
+
+        broker2Messages.setParent(new MessageListener() {
+            @Override
+            public void onMessage(Message message) {
+                consumerReady.release();
+                try {
+                    consumerProceed.acquire();
+                } catch (InterruptedException ex) {
+                    Thread.currentThread().interrupt();
+                }
+            }
+        });
+
+        createConsumer("broker2", testQueue);
+
+        // Verify that when broker2's consumer receives the message, the memory
+        // usage has moved broker1 to broker2.
+        consumerReady.acquire();
+
+        try {
+            assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    return broker1TestQueue.getMemoryUsage().getUsage() == 0;
+                }
+            }));
+            assertTrue(broker2TestQueue.getMemoryUsage().getUsage() > 0);
+        } finally {
+            // Consume the message and verify that there is no more memory
+            // usage.
+            consumerProceed.release();
+        }
+
+        // Pause to allow ACK to be processed.
+        assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return broker1TestQueue.getMemoryUsage().getUsage() == 0;
+            }
+        }));
+        assertTrue("Memory Usage Should be Zero: ", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                return broker2TestQueue.getMemoryUsage().getUsage() == 0;
+            }
+        }));
+    }
+}

Propchange: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4147Test.java
------------------------------------------------------------------------------
    svn:eol-style = native