You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2008/10/15 12:10:29 UTC

svn commit: r704846 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/network/ test/java/org/apache/activemq/network/

Author: gtully
Date: Wed Oct 15 03:10:29 2008
New Revision: 704846

URL: http://svn.apache.org/viewvc?rev=704846&view=rev
Log:
resolve AMQ-1977, some more choreography required in duplex case. Fix duplication of deleteAllMessageOnStartup which slows down JDBC test, add some more robustness to DuplexNetworkMBeanTest

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=704846&r1=704845&r2=704846&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Wed Oct 15 03:10:29 2008
@@ -1606,13 +1606,6 @@
      * @throws Exception
      */
     protected Broker createRegionBroker() throws Exception {
-        // we must start the persistence adaptor before we can create the region
-        // broker
-        if (this.deleteAllMessagesOnStartup) {
-            getPersistenceAdapter().deleteAllMessages();
-        }
-//        getPersistenceAdapter().start();
-
         if (destinationInterceptors == null) {
             destinationInterceptors = createDefaultDestinationInterceptor();
         }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=704846&r1=704845&r2=704846&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Wed Oct 15 03:10:29 2008
@@ -439,7 +439,7 @@
 		if (!checkFoundStart && firstAckedMsg != null)
 			throw new JMSException("Unmatched acknowledege: Could not find Message-ID "+firstAckedMsg+" in dispatched-list (start of ack)");
 		if (!checkFoundEnd && lastAckedMsg != null)
-			throw new JMSException("Unmatched acknowledege: Could not find Message-ID "+firstAckedMsg+" in dispatched-list (end of ack)");
+			throw new JMSException("Unmatched acknowledege: Could not find Message-ID "+lastAckedMsg+" in dispatched-list (end of ack)");
 		if (ack.getMessageCount() != checkCount) {
 			throw new JMSException("Unmatched acknowledege: Expected message count ("+ack.getMessageCount()+
 					") differs from count in dispatched-list ("+checkCount+")");

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java?rev=704846&r1=704845&r2=704846&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java Wed Oct 15 03:10:29 2008
@@ -60,6 +60,7 @@
                     ServiceSupport.dispose(this);
                 }
             }
+            LOG.debug("counting down remoteBrokerNameKnownLatch with: " + command);
             remoteBrokerNameKnownLatch.countDown();
         }
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=704846&r1=704845&r2=704846&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Wed Oct 15 03:10:29 2008
@@ -214,6 +214,10 @@
 
             localBroker.start();
             remoteBroker.start();
+            if (configuration.isDuplex() && duplexInitiatingConnection == null) {
+                // initiator side of duplex network
+                remoteBrokerNameKnownLatch.await();
+            }
             try {
                 triggerRemoteStartBridge();
             } catch (IOException e) {
@@ -229,10 +233,14 @@
     protected void triggerLocalStartBridge() throws IOException {
         ASYNC_TASKS.execute(new Runnable() {
             public void run() {
+                final String originalName = Thread.currentThread().getName();
+                Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker);
                 try {
                     startLocalBridge();
                 } catch (Exception e) {
                     serviceLocalException(e);
+                } finally {
+                    Thread.currentThread().setName(originalName);
                 }
             }
         });
@@ -241,10 +249,14 @@
     protected void triggerRemoteStartBridge() throws IOException {
         ASYNC_TASKS.execute(new Runnable() {
             public void run() {
+                final String originalName = Thread.currentThread().getName();
+                Thread.currentThread().setName("StartRemotelBridge: localBroker=" + localBroker);
                 try {
                     startRemoteBridge();
                 } catch (Exception e) {
                     serviceRemoteException(e);
+                } finally {
+                    Thread.currentThread().setName(originalName);
                 }
             }
         });
@@ -253,7 +265,7 @@
     protected void startLocalBridge() throws Exception {
         if (localBridgeStarted.compareAndSet(false, true)) {
             synchronized (this) {
-
+                LOG.debug("starting local Bridge, localBroker=" + localBroker);
                 remoteBrokerNameKnownLatch.await();
 
                 localConnectionInfo = new ConnectionInfo();
@@ -278,6 +290,7 @@
 
     protected void startRemoteBridge() throws Exception {
         if (remoteBridgeStarted.compareAndSet(false, true)) {
+            LOG.debug("starting remote Bridge, localBroker=" + localBroker);
             synchronized (this) {
                 if (!isCreatedByDuplex()) {
                     BrokerInfo brokerInfo = new BrokerInfo();
@@ -1025,7 +1038,7 @@
     static {
         ASYNC_TASKS =   new ThreadPoolExecutor(0, Integer.MAX_VALUE, 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
             public Thread newThread(Runnable runnable) {
-                Thread thread = new Thread(runnable, "NetworkBridge: "+runnable);
+                Thread thread = new Thread(runnable, "NetworkBridge");
                 thread.setDaemon(true);
                 return thread;
             }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java?rev=704846&r1=704845&r2=704846&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java Wed Oct 15 03:10:29 2008
@@ -34,7 +34,7 @@
 public class DuplexNetworkMBeanTest extends TestCase {
 
     protected static final Log LOG = LogFactory.getLog(DuplexNetworkMBeanTest.class);
-    protected final int numRestarts = 5;
+    protected final int numRestarts = 3;
 
     protected BrokerService createBroker() throws Exception {
         BrokerService broker = new BrokerService();
@@ -93,6 +93,7 @@
             broker.stop();
             broker.waitUntilStopped();
             assertEquals(0, countMbeans(broker, "stopped"));
+            Thread.sleep(1000);
         }
         
         //assertEquals(0, countMbeans(networkedBroker, "NetworkBridge"));

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java?rev=704846&r1=704845&r2=704846&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java Wed Oct 15 03:10:29 2008
@@ -105,7 +105,7 @@
         assertNotNull(includedConsumer.receive(1000));
     }
 
-    public void xtestConduitBridge() throws Exception {
+    public void testConduitBridge() throws Exception {
         MessageConsumer consumer1 = remoteSession.createConsumer(included);
         MessageConsumer consumer2 = remoteSession.createConsumer(included);
         MessageProducer producer = localSession.createProducer(included);
@@ -122,7 +122,7 @@
         assertNull(consumer2.receive(500));
     }
 
-    public void xtestDurableStoreAndForward() throws Exception {
+    public void testDurableStoreAndForward() throws Exception {
         // create a remote durable consumer
         MessageConsumer remoteConsumer = remoteSession.createDurableSubscriber(included, consumerName);
         Thread.sleep(1000);