You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/01/12 15:00:39 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-6124 - fix and test - propagate broker info from prestarted backup transport

Repository: activemq
Updated Branches:
  refs/heads/master 66cfc7bab -> db1506a59


https://issues.apache.org/jira/browse/AMQ-6124 - fix and test - propagate broker info from prestarted backup transport


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/db1506a5
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/db1506a5
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/db1506a5

Branch: refs/heads/master
Commit: db1506a5921f70134c3b647cec51204f0e1c1416
Parents: 66cfc7b
Author: gtully <ga...@gmail.com>
Authored: Tue Jan 12 14:00:13 2016 +0000
Committer: gtully <ga...@gmail.com>
Committed: Tue Jan 12 14:00:13 2016 +0000

----------------------------------------------------------------------
 .../transport/failover/BackupTransport.java         | 16 +++++++++++++++-
 .../transport/failover/FailoverTransport.java       |  1 +
 .../failover/FailoverClusterTestSupport.java        |  6 ++++++
 .../transport/failover/FailoverPriorityTest.java    |  5 ++++-
 .../failover/FailoverTransportBackupsTest.java      | 15 +++++++++++++--
 5 files changed, 39 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/db1506a5/activemq-client/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java
index f6df0a4..9c591e2 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java
@@ -18,6 +18,7 @@
 
 package org.apache.activemq.transport.failover;
 
+import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.transport.DefaultTransportListener;
 import org.apache.activemq.transport.Transport;
 
@@ -29,10 +30,12 @@ class BackupTransport extends DefaultTransportListener{
 	private Transport transport;
 	private URI uri;
 	private boolean disposed;
-	
+	private BrokerInfo brokerInfo;
+
 	BackupTransport(FailoverTransport ft){
 		this.failoverTransport=ft;
 	}
+
 	@Override
     public void onException(IOException error) {
 		this.disposed=true;
@@ -41,6 +44,17 @@ class BackupTransport extends DefaultTransportListener{
 		}
 	}
 
+	@Override
+	public void onCommand(Object command) {
+		if (command instanceof BrokerInfo) {
+			brokerInfo = (BrokerInfo) command;
+		}
+	}
+
+	public BrokerInfo getBrokerInfo() {
+		return brokerInfo;
+	}
+
 	public Transport getTransport() {
 		return transport;
 	}

http://git-wip-us.apache.org/repos/asf/activemq/blob/db1506a5/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
index 7f7d7c6..dcb0867 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
@@ -991,6 +991,7 @@ public class FailoverTransport implements CompositeTransport {
                             backups.remove(bt);
                             transport = bt.getTransport();
                             uri = bt.getUri();
+                            myTransportListener.onCommand(bt.getBrokerInfo());
                             if (priorityBackup && priorityBackupAvailable) {
                                 Transport old = this.connectedTransport.getAndSet(null);
                                 if (old != null) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/db1506a5/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java
index c5e9665..01dcce4 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java
@@ -110,6 +110,12 @@ public class FailoverClusterTestSupport extends TestCase {
         }
     }
 
+    protected void assertBrokerInfo(String brokerName) throws Exception {
+        for (ActiveMQConnection c : connections) {
+            assertEquals(brokerName, c.getBrokerInfo().getBrokerName());
+        }
+    }
+
     protected void addBroker(String name, BrokerService brokerService) {
         brokers.put(name, brokerService);
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/db1506a5/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java
index bed5183..72137dd 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java
@@ -52,7 +52,7 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport {
         createClients(5);
 
         assertAllConnectedTo(urls.get(BROKER_A_NAME));
-
+        assertBrokerInfo(BROKER_A_NAME);
 
         restart(false, BROKER_A_NAME, BROKER_B_NAME);
 
@@ -169,8 +169,10 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport {
 
         if (primary) {
             assertAllConnectedTo(urls.get(secondaryName));
+            assertBrokerInfo(secondaryName);
         } else {
             assertAllConnectedTo(urls.get(primaryName));
+            assertBrokerInfo(primaryName);
         }
 
         if (primary) {
@@ -186,6 +188,7 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport {
         Thread.sleep(5000);
 
         assertAllConnectedTo(urls.get(primaryName));
+        assertBrokerInfo(primaryName);
 
     }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/db1506a5/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java
index b1c8a1b..ed39268 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java
@@ -24,6 +24,8 @@ import java.io.IOException;
 import java.net.URI;
 
 import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.Command;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportListener;
@@ -111,9 +113,10 @@ public class FailoverTransportBackupsTest {
             }
         }));
 
+        assertEquals("conected to..", "1", currentBrokerInfo.getBrokerName());
         broker1.stop();
 
-        assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){
+        assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition() {
             @Override
             public boolean isSatisified() throws Exception {
                 LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups());
@@ -124,9 +127,10 @@ public class FailoverTransportBackupsTest {
         assertTrue("Incorrect number of Transport interruptions", transportInterruptions >= 1);
         assertTrue("Incorrect number of Transport resumptions", transportResumptions >= 1);
 
+        assertEquals("conected to..", "2", currentBrokerInfo.getBrokerName());
         broker2.stop();
 
-        assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){
+        assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition() {
             @Override
             public boolean isSatisified() throws Exception {
                 LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups());
@@ -136,6 +140,8 @@ public class FailoverTransportBackupsTest {
 
         assertTrue("Incorrect number of Transport interruptions", transportInterruptions >= 2);
         assertTrue("Incorrect number of Transport resumptions", transportResumptions >= 2);
+
+        assertEquals("conected to..", "3", currentBrokerInfo.getBrokerName());
     }
 
     @Test
@@ -183,6 +189,7 @@ public class FailoverTransportBackupsTest {
         return bs;
     }
 
+    BrokerInfo currentBrokerInfo;
     protected Transport createTransport(int backups) throws Exception {
         String connectionUri = "failover://("+
                                broker1.getTransportConnectors().get(0).getPublishableConnectString() + "," +
@@ -199,6 +206,10 @@ public class FailoverTransportBackupsTest {
             @Override
             public void onCommand(Object command) {
                 LOG.debug("Test Transport Listener received Command: " + command);
+                if (command instanceof BrokerInfo) {
+                    currentBrokerInfo = (BrokerInfo) command;
+                    LOG.info("BrokerInfo: " + currentBrokerInfo);
+                }
             }
 
             @Override