You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2010/04/22 14:01:15 UTC

svn commit: r936798 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/transport/tcp/ test/java/org/apache/activemq/bugs/

Author: gtully
Date: Thu Apr 22 12:01:15 2010
New Revision: 936798

URL: http://svn.apache.org/viewvc?rev=936798&view=rev
Log:
resolve hang in purge if message count stats are off and log a warning.. tidy up broker service waitForStarted logic which caused AMQ2102 test to block.. still issue with master/slave sync with AMQ2102

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=936798&r1=936797&r2=936798&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Thu Apr 22 12:01:15 2010
@@ -689,10 +689,10 @@ public class BrokerService implements Se
     }
 
     /**
-     * A helper method to block the caller thread until the broker has been
-     * started
+     * A helper method to block the caller thread until the broker has fully started
+     * @return boolean true if wait succeeded false if broker was not started or was stopped
      */
-    public void waitUntilStarted() {
+    public boolean waitUntilStarted() {
         boolean waitSucceeded = false;
         while (isStarted() && !stopped.get() && !waitSucceeded) {
             try {
@@ -700,6 +700,7 @@ public class BrokerService implements Se
             } catch (InterruptedException ignore) {
             }
         }
+        return waitSucceeded;
     }
 
     // Properties

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=936798&r1=936797&r2=936798&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Thu Apr 22 12:01:15 2010
@@ -940,8 +940,11 @@ public class Queue extends BaseDestinati
                 } catch (IOException e) {
                 }
             }
-
-        } while (!pagedInMessages.isEmpty() || this.destinationStatistics.getMessages().getCount() > 0);
+            // don't spin/hang if stats are out and there is nothing left in the store
+        } while (!list.isEmpty() && this.destinationStatistics.getMessages().getCount() > 0);
+        if (this.destinationStatistics.getMessages().getCount() > 0) {
+            LOG.warn(getActiveMQDestination().getQualifiedName() + " after purge complete, message count stats report: " +  this.destinationStatistics.getMessages().getCount());
+        }
         gc();
         this.destinationStatistics.getMessages().setCount(0);
         getMessages().clear();

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java?rev=936798&r1=936797&r2=936798&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java Thu Apr 22 12:01:15 2010
@@ -186,7 +186,8 @@ public class TcpTransport extends Transp
      */
     @Override
     public String toString() {
-        return "tcp://" + socket.getInetAddress() + ":" + socket.getPort();
+        return "" + (socket.isConnected() ? "tcp://" + socket.getInetAddress() + ":" + socket.getPort()
+                : (localLocation != null ? localLocation : remoteLocation)) ;
     }
 
     /**

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java?rev=936798&r1=936797&r2=936798&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ2102Test.java Thu Apr 22 12:01:15 2010
@@ -397,7 +397,7 @@ public class AMQ2102Test extends Combina
     }
 
     private static void debug(String message) {
-        LOG.debug(message);
+        LOG.info(message);
     }
 
     private static void info(String message) {
@@ -464,7 +464,6 @@ public class AMQ2102Test extends Combina
             }
         };
         t.start();
-        master.waitUntilStarted();
         masterUrl = master.getTransportConnectors().get(0).getConnectUri().toString(); 
         
         debug("masterUrl: " + masterUrl);
@@ -475,6 +474,7 @@ public class AMQ2102Test extends Combina
         slave.setMasterConnectorURI(masterUrl);
         slave.start();
         slave.waitUntilStarted();
+        assertTrue("master started", master.waitUntilStarted());
     }
     
     public void tearDown() throws Exception {