You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2015/05/12 23:24:49 UTC

[3/3] activemq git commit: fix intermittent failure of FailoverStaticNetworkTest. Bridge fails to start triggerStartAsyncNetworkBridgeCreation thread waiting on localBrokerInfo. The command was dropped due to contention between dispatch and peer start. F

fix intermittent failure of FailoverStaticNetworkTest. Bridge fails to start triggerStartAsyncNetworkBridgeCreation thread waiting on localBrokerInfo. The command was dropped due to contention between dispatch and peer start. Fix and test. Relates to https://issues.apache.org/jira/browse/AMQ-3684


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/c89bb7a3
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/c89bb7a3
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/c89bb7a3

Branch: refs/heads/master
Commit: c89bb7a316e3bc499e33d4dd5b14349f82ae1d1f
Parents: 3ef5389
Author: gtully <ga...@gmail.com>
Authored: Tue May 12 22:10:26 2015 +0100
Committer: gtully <ga...@gmail.com>
Committed: Tue May 12 22:10:57 2015 +0100

----------------------------------------------------------------------
 .../activemq/transport/vm/VMTransport.java      | 22 +++++----
 activemq-unit-tests/pom.xml                     |  1 -
 .../transport/vm/VMTransportBrokerNameTest.java | 48 ++++++++++++++++++++
 3 files changed, 61 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/c89bb7a3/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
index ec1a423..e7aa729 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
@@ -92,12 +92,24 @@ public class VMTransport implements Transport, Task {
                 throw new TransportDisposedIOException("Peer (" + peer.toString() + ") disposed.");
             }
 
-            if (peer.async || !peer.started.get()) {
+            if (peer.async) {
                 peer.getMessageQueue().put(command);
                 peer.wakeup();
                 return;
             }
 
+            if (!peer.started.get()) {
+                LinkedBlockingQueue<Object> pending = peer.getMessageQueue();
+                boolean accepted = false;
+                do {
+                    synchronized (peer.started) {
+                        accepted = pending.offer(command);
+                    }
+                } while (!accepted && !peer.started.get());
+                if (accepted) {
+                    return;
+                }
+            }
         } catch (InterruptedException e) {
             InterruptedIOException iioe = new InterruptedIOException(e.getMessage());
             iioe.initCause(e);
@@ -259,14 +271,6 @@ public class VMTransport implements Transport, Task {
         this.transportListener = commandListener;
     }
 
-    public void setMessageQueue(LinkedBlockingQueue<Object> asyncQueue) {
-        synchronized (this) {
-            if (messageQueue == null) {
-                messageQueue = asyncQueue;
-            }
-        }
-    }
-
     public LinkedBlockingQueue<Object> getMessageQueue() throws TransportDisposedIOException {
         LinkedBlockingQueue<Object> result = messageQueue;
         if (result == null) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/c89bb7a3/activemq-unit-tests/pom.xml
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/pom.xml b/activemq-unit-tests/pom.xml
index 3ca9b1a..7ea4ce3 100755
--- a/activemq-unit-tests/pom.xml
+++ b/activemq-unit-tests/pom.xml
@@ -583,7 +583,6 @@
                 <exclude>**/amq1490/*</exclude>
                 <exclude>**/archive/*</exclude>
                 <exclude>**/NetworkFailoverTest.*/**</exclude>
-                <exclude>**/vm/VMTransportBrokerTest.*</exclude>
                 <exclude>**/broker/MarshallingBrokerTest.*</exclude>
                 <exclude>**/AMQDeadlockTest3.*</exclude>
                 <!-- https://issues.apache.org/activemq/browse/AMQ-2050 -->

http://git-wip-us.apache.org/repos/asf/activemq/blob/c89bb7a3/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java
index ac05393..5183db8 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/vm/VMTransportBrokerNameTest.java
@@ -16,7 +16,10 @@
  */
 package org.apache.activemq.transport.vm;
 
+import java.io.IOException;
 import java.net.URI;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
 
 import javax.jms.Connection;
 
@@ -25,6 +28,10 @@ import junit.framework.TestCase;
 import org.apache.activemq.ActiveMQConnection;
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerRegistry;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportListener;
 
 public class VMTransportBrokerNameTest extends TestCase {
 
@@ -47,4 +54,45 @@ public class VMTransportBrokerNameTest extends TestCase {
         c1.close();
         c2.close();
     }
+
+    public void testBrokerInfoClientAsync() throws Exception {
+
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(new URI(vmUrl));
+        ActiveMQConnection c1 = (ActiveMQConnection) cf.createConnection();
+        assertTrue("Transport has name in it: " + c1.getTransport(), c1.getTransport().toString().contains(MY_BROKER));
+
+        for (int i=0;i<20; i++) {
+            final CountDownLatch gotBrokerInfo = new CountDownLatch(1);
+            Transport transport = TransportFactory.connect(new URI("vm://" + MY_BROKER + "?async=false"));
+            transport.setTransportListener(new TransportListener() {
+                @Override
+                public void onCommand(Object command) {
+                    if (command instanceof BrokerInfo) {
+                        gotBrokerInfo.countDown();
+                    }
+                }
+
+                @Override
+                public void onException(IOException error) {
+
+                }
+
+                @Override
+                public void transportInterupted() {
+
+                }
+
+                @Override
+                public void transportResumed() {
+
+                }
+            });
+            transport.start();
+
+            assertTrue("got broker info on iteration:" + i, gotBrokerInfo.await(5, TimeUnit.SECONDS));
+
+            transport.stop();
+        }
+        c1.close();
+    }
 }