You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2008/10/15 12:10:29 UTC
svn commit: r704846 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/network/ test/java/org/apache/activemq/network/
Author: gtully
Date: Wed Oct 15 03:10:29 2008
New Revision: 704846
URL: http://svn.apache.org/viewvc?rev=704846&view=rev
Log:
resolve AMQ-1977, some more choreography required in duplex case. Fix duplication of deleteAllMessageOnStartup which slows down JDBC test, add some more robustness to DuplexNetworkMBeanTest
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=704846&r1=704845&r2=704846&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Wed Oct 15 03:10:29 2008
@@ -1606,13 +1606,6 @@
* @throws Exception
*/
protected Broker createRegionBroker() throws Exception {
- // we must start the persistence adaptor before we can create the region
- // broker
- if (this.deleteAllMessagesOnStartup) {
- getPersistenceAdapter().deleteAllMessages();
- }
-// getPersistenceAdapter().start();
-
if (destinationInterceptors == null) {
destinationInterceptors = createDefaultDestinationInterceptor();
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=704846&r1=704845&r2=704846&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java Wed Oct 15 03:10:29 2008
@@ -439,7 +439,7 @@
if (!checkFoundStart && firstAckedMsg != null)
throw new JMSException("Unmatched acknowledege: Could not find Message-ID "+firstAckedMsg+" in dispatched-list (start of ack)");
if (!checkFoundEnd && lastAckedMsg != null)
- throw new JMSException("Unmatched acknowledege: Could not find Message-ID "+firstAckedMsg+" in dispatched-list (end of ack)");
+ throw new JMSException("Unmatched acknowledege: Could not find Message-ID "+lastAckedMsg+" in dispatched-list (end of ack)");
if (ack.getMessageCount() != checkCount) {
throw new JMSException("Unmatched acknowledege: Expected message count ("+ack.getMessageCount()+
") differs from count in dispatched-list ("+checkCount+")");
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java?rev=704846&r1=704845&r2=704846&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java Wed Oct 15 03:10:29 2008
@@ -60,6 +60,7 @@
ServiceSupport.dispose(this);
}
}
+ LOG.debug("counting down remoteBrokerNameKnownLatch with: " + command);
remoteBrokerNameKnownLatch.countDown();
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=704846&r1=704845&r2=704846&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Wed Oct 15 03:10:29 2008
@@ -214,6 +214,10 @@
localBroker.start();
remoteBroker.start();
+ if (configuration.isDuplex() && duplexInitiatingConnection == null) {
+ // initiator side of duplex network
+ remoteBrokerNameKnownLatch.await();
+ }
try {
triggerRemoteStartBridge();
} catch (IOException e) {
@@ -229,10 +233,14 @@
protected void triggerLocalStartBridge() throws IOException {
ASYNC_TASKS.execute(new Runnable() {
public void run() {
+ final String originalName = Thread.currentThread().getName();
+ Thread.currentThread().setName("StartLocalBridge: localBroker=" + localBroker);
try {
startLocalBridge();
} catch (Exception e) {
serviceLocalException(e);
+ } finally {
+ Thread.currentThread().setName(originalName);
}
}
});
@@ -241,10 +249,14 @@
protected void triggerRemoteStartBridge() throws IOException {
ASYNC_TASKS.execute(new Runnable() {
public void run() {
+ final String originalName = Thread.currentThread().getName();
+ Thread.currentThread().setName("StartRemotelBridge: localBroker=" + localBroker);
try {
startRemoteBridge();
} catch (Exception e) {
serviceRemoteException(e);
+ } finally {
+ Thread.currentThread().setName(originalName);
}
}
});
@@ -253,7 +265,7 @@
protected void startLocalBridge() throws Exception {
if (localBridgeStarted.compareAndSet(false, true)) {
synchronized (this) {
-
+ LOG.debug("starting local Bridge, localBroker=" + localBroker);
remoteBrokerNameKnownLatch.await();
localConnectionInfo = new ConnectionInfo();
@@ -278,6 +290,7 @@
protected void startRemoteBridge() throws Exception {
if (remoteBridgeStarted.compareAndSet(false, true)) {
+ LOG.debug("starting remote Bridge, localBroker=" + localBroker);
synchronized (this) {
if (!isCreatedByDuplex()) {
BrokerInfo brokerInfo = new BrokerInfo();
@@ -1025,7 +1038,7 @@
static {
ASYNC_TASKS = new ThreadPoolExecutor(0, Integer.MAX_VALUE, 30, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactory() {
public Thread newThread(Runnable runnable) {
- Thread thread = new Thread(runnable, "NetworkBridge: "+runnable);
+ Thread thread = new Thread(runnable, "NetworkBridge");
thread.setDaemon(true);
return thread;
}
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java?rev=704846&r1=704845&r2=704846&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/DuplexNetworkMBeanTest.java Wed Oct 15 03:10:29 2008
@@ -34,7 +34,7 @@
public class DuplexNetworkMBeanTest extends TestCase {
protected static final Log LOG = LogFactory.getLog(DuplexNetworkMBeanTest.class);
- protected final int numRestarts = 5;
+ protected final int numRestarts = 3;
protected BrokerService createBroker() throws Exception {
BrokerService broker = new BrokerService();
@@ -93,6 +93,7 @@
broker.stop();
broker.waitUntilStopped();
assertEquals(0, countMbeans(broker, "stopped"));
+ Thread.sleep(1000);
}
//assertEquals(0, countMbeans(networkedBroker, "NetworkBridge"));
Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java?rev=704846&r1=704845&r2=704846&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java Wed Oct 15 03:10:29 2008
@@ -105,7 +105,7 @@
assertNotNull(includedConsumer.receive(1000));
}
- public void xtestConduitBridge() throws Exception {
+ public void testConduitBridge() throws Exception {
MessageConsumer consumer1 = remoteSession.createConsumer(included);
MessageConsumer consumer2 = remoteSession.createConsumer(included);
MessageProducer producer = localSession.createProducer(included);
@@ -122,7 +122,7 @@
assertNull(consumer2.receive(500));
}
- public void xtestDurableStoreAndForward() throws Exception {
+ public void testDurableStoreAndForward() throws Exception {
// create a remote durable consumer
MessageConsumer remoteConsumer = remoteSession.createDurableSubscriber(included, consumerName);
Thread.sleep(1000);