You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2016/01/12 15:00:39 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-6124 -
fix and test - propagate broker info from prestarted backup transport
Repository: activemq
Updated Branches:
refs/heads/master 66cfc7bab -> db1506a59
https://issues.apache.org/jira/browse/AMQ-6124 - fix and test - propagate broker info from prestarted backup transport
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/db1506a5
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/db1506a5
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/db1506a5
Branch: refs/heads/master
Commit: db1506a5921f70134c3b647cec51204f0e1c1416
Parents: 66cfc7b
Author: gtully <ga...@gmail.com>
Authored: Tue Jan 12 14:00:13 2016 +0000
Committer: gtully <ga...@gmail.com>
Committed: Tue Jan 12 14:00:13 2016 +0000
----------------------------------------------------------------------
.../transport/failover/BackupTransport.java | 16 +++++++++++++++-
.../transport/failover/FailoverTransport.java | 1 +
.../failover/FailoverClusterTestSupport.java | 6 ++++++
.../transport/failover/FailoverPriorityTest.java | 5 ++++-
.../failover/FailoverTransportBackupsTest.java | 15 +++++++++++++--
5 files changed, 39 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/db1506a5/activemq-client/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java
index f6df0a4..9c591e2 100644
--- a/activemq-client/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java
@@ -18,6 +18,7 @@
package org.apache.activemq.transport.failover;
+import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.Transport;
@@ -29,10 +30,12 @@ class BackupTransport extends DefaultTransportListener{
private Transport transport;
private URI uri;
private boolean disposed;
-
+ private BrokerInfo brokerInfo;
+
BackupTransport(FailoverTransport ft){
this.failoverTransport=ft;
}
+
@Override
public void onException(IOException error) {
this.disposed=true;
@@ -41,6 +44,17 @@ class BackupTransport extends DefaultTransportListener{
}
}
+ @Override
+ public void onCommand(Object command) {
+ if (command instanceof BrokerInfo) {
+ brokerInfo = (BrokerInfo) command;
+ }
+ }
+
+ public BrokerInfo getBrokerInfo() {
+ return brokerInfo;
+ }
+
public Transport getTransport() {
return transport;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/db1506a5/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
----------------------------------------------------------------------
diff --git a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
index 7f7d7c6..dcb0867 100755
--- a/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
+++ b/activemq-client/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
@@ -991,6 +991,7 @@ public class FailoverTransport implements CompositeTransport {
backups.remove(bt);
transport = bt.getTransport();
uri = bt.getUri();
+ myTransportListener.onCommand(bt.getBrokerInfo());
if (priorityBackup && priorityBackupAvailable) {
Transport old = this.connectedTransport.getAndSet(null);
if (old != null) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/db1506a5/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java
index c5e9665..01dcce4 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTestSupport.java
@@ -110,6 +110,12 @@ public class FailoverClusterTestSupport extends TestCase {
}
}
+ protected void assertBrokerInfo(String brokerName) throws Exception {
+ for (ActiveMQConnection c : connections) {
+ assertEquals(brokerName, c.getBrokerInfo().getBrokerName());
+ }
+ }
+
protected void addBroker(String name, BrokerService brokerService) {
brokers.put(name, brokerService);
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/db1506a5/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java
index bed5183..72137dd 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverPriorityTest.java
@@ -52,7 +52,7 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport {
createClients(5);
assertAllConnectedTo(urls.get(BROKER_A_NAME));
-
+ assertBrokerInfo(BROKER_A_NAME);
restart(false, BROKER_A_NAME, BROKER_B_NAME);
@@ -169,8 +169,10 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport {
if (primary) {
assertAllConnectedTo(urls.get(secondaryName));
+ assertBrokerInfo(secondaryName);
} else {
assertAllConnectedTo(urls.get(primaryName));
+ assertBrokerInfo(primaryName);
}
if (primary) {
@@ -186,6 +188,7 @@ public class FailoverPriorityTest extends FailoverClusterTestSupport {
Thread.sleep(5000);
assertAllConnectedTo(urls.get(primaryName));
+ assertBrokerInfo(primaryName);
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/db1506a5/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java
index b1c8a1b..ed39268 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/transport/failover/FailoverTransportBackupsTest.java
@@ -24,6 +24,8 @@ import java.io.IOException;
import java.net.URI;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.Command;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.TransportListener;
@@ -111,9 +113,10 @@ public class FailoverTransportBackupsTest {
}
}));
+ assertEquals("conected to..", "1", currentBrokerInfo.getBrokerName());
broker1.stop();
- assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){
+ assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups());
@@ -124,9 +127,10 @@ public class FailoverTransportBackupsTest {
assertTrue("Incorrect number of Transport interruptions", transportInterruptions >= 1);
assertTrue("Incorrect number of Transport resumptions", transportResumptions >= 1);
+ assertEquals("conected to..", "2", currentBrokerInfo.getBrokerName());
broker2.stop();
- assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition(){
+ assertTrue("Timed out waiting for Backups to connect.", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
LOG.debug("Current Backup Count = " + failoverTransport.getCurrentBackups());
@@ -136,6 +140,8 @@ public class FailoverTransportBackupsTest {
assertTrue("Incorrect number of Transport interruptions", transportInterruptions >= 2);
assertTrue("Incorrect number of Transport resumptions", transportResumptions >= 2);
+
+ assertEquals("conected to..", "3", currentBrokerInfo.getBrokerName());
}
@Test
@@ -183,6 +189,7 @@ public class FailoverTransportBackupsTest {
return bs;
}
+ BrokerInfo currentBrokerInfo;
protected Transport createTransport(int backups) throws Exception {
String connectionUri = "failover://("+
broker1.getTransportConnectors().get(0).getPublishableConnectString() + "," +
@@ -199,6 +206,10 @@ public class FailoverTransportBackupsTest {
@Override
public void onCommand(Object command) {
LOG.debug("Test Transport Listener received Command: " + command);
+ if (command instanceof BrokerInfo) {
+ currentBrokerInfo = (BrokerInfo) command;
+ LOG.info("BrokerInfo: " + currentBrokerInfo);
+ }
}
@Override