You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2010/03/08 13:48:46 UTC
svn commit: r920306 [1/2] - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/ main/java/org/apache/activemq/advisory/
main/java/org/apache/activemq/broker/
main/java/org/apache/activemq/broker/region/
main/java/org/apache/activemq/co...
Author: rajdavies
Date: Mon Mar 8 12:48:45 2010
New Revision: 920306
URL: http://svn.apache.org/viewvc?rev=920306&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-2632
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/RunBroker.java (with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connector.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Command.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/PartialCommand.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/RemoveSubscriptionInfo.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionControlMarshaller.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionInfoMarshaller.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/proxy/ProxyConnector.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/CompositeTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransportFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/fanout/FanoutTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/mock/MockTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/vm/VMTransport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionControlTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v6/ConnectionInfoTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBDurableTopicTest.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/perf/KahaDBQueueTest.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java Mon Mar 8 12:48:45 2010
@@ -215,6 +215,7 @@
this.info = new ConnectionInfo(new ConnectionId(CONNECTION_ID_GENERATOR.generateId()));
this.info.setManageable(true);
+ this.info.setFaultTolerant(transport.isFaultTolerant());
this.connectionSessionId = new SessionId(info.getConnectionId(), -1);
this.transport.setTransportListener(this);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java Mon Mar 8 12:48:45 2010
@@ -440,11 +440,10 @@
String id = getBrokerId() != null ? getBrokerId().getValue() : "NOT_SET";
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, id);
- String[] uris = getBrokerService().getTransportConnectorURIs();
String url = getBrokerService().getVmConnectorURI().toString();
- if (uris != null && uris.length > 0) {
- url = uris[0];
- }
+ if (getBrokerService().getDefaultSocketURI() != null) {
+ url = getBrokerService().getDefaultSocketURI().toString();
+ }
advisoryMessage.setStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_URL, url);
//set the data structure
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Mon Mar 8 12:48:45 2010
@@ -31,7 +31,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.management.MalformedObjectNameException;
@@ -154,6 +153,7 @@
private boolean deleteAllMessagesOnStartup;
private boolean advisorySupport = true;
private URI vmConnectorURI;
+ private URI defaultSocketURI;
private PolicyMap destinationPolicy;
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean stopped = new AtomicBoolean(false);
@@ -1271,6 +1271,28 @@
public void setVmConnectorURI(URI vmConnectorURI) {
this.vmConnectorURI = vmConnectorURI;
}
+
+ public URI getDefaultSocketURI() {
+
+ if (started.get()) {
+ if (this.defaultSocketURI==null) {
+ for (TransportConnector tc:this.transportConnectors) {
+ URI result = null;
+ try {
+ result = tc.getConnectUri();
+ } catch (Exception e) {
+ LOG.warn("Failed to get the ConnectURI for "+tc,e);
+ }
+ if (result != null) {
+ this.defaultSocketURI=result;
+ break;
+ }
+ }
+ }
+ return this.defaultSocketURI;
+ }
+ return null;
+ }
/**
* @return Returns the shutdownOnMasterFailure.
@@ -2007,6 +2029,9 @@
connector.setLocalUri(uri);
connector.setBrokerName(getBrokerName());
connector.setDurableDestinations(durableDestinations);
+ if (getDefaultSocketURI() != null) {
+ connector.setBrokerURL(getDefaultSocketURI().toString());
+ }
connector.start();
}
for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connection.java Mon Mar 8 12:48:45 2010
@@ -17,10 +17,10 @@
package org.apache.activemq.broker;
import java.io.IOException;
-
import org.apache.activemq.Service;
import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.command.Response;
/**
@@ -51,6 +51,7 @@
* Services a client command and submits it to the broker.
*
* @param command
+ * @return Response
*/
Response service(Command command);
@@ -110,5 +111,12 @@
* @return
*/
boolean isNetworkConnection();
+
+ /**
+ * @return true if a fault tolerant connection
+ */
+ boolean isFaultTolerantConnection();
+
+ void updateClient(ConnectionControl control);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connector.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Connector.java Mon Mar 8 12:48:45 2010
@@ -28,8 +28,7 @@
public interface Connector extends Service {
/**
- *
- * @return
+ * @return brokerInfo
*/
BrokerInfo getBrokerInfo();
@@ -37,4 +36,21 @@
* @return the statistics for this connector
*/
ConnectorStatistics getStatistics();
+
+ /**
+ * @return true if update client connections when brokers leave/join a cluster
+ */
+ public boolean isUpdateClusterClients();
+
+ /**
+ * @return true if clients should be re-balanced across the cluster
+ */
+ public boolean isRebalanceClusterClients();
+
+ /**
+ * Update all the connections with information
+ * about the connected brokers in the cluster
+ */
+ public void updateClientClusterInfo();
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Mon Mar 8 12:48:45 2010
@@ -121,7 +121,7 @@
// Used to do async dispatch.. this should perhaps be pushed down into the
// transport layer..
private boolean inServiceException;
- private ConnectionStatistics statistics = new ConnectionStatistics();
+ private final ConnectionStatistics statistics = new ConnectionStatistics();
private boolean manageable;
private boolean slow;
private boolean markedCandidate;
@@ -133,15 +133,15 @@
private boolean pendingStop;
private long timeStamp;
private final AtomicBoolean stopping = new AtomicBoolean(false);
- private CountDownLatch stopped = new CountDownLatch(1);
+ private final CountDownLatch stopped = new CountDownLatch(1);
private final AtomicBoolean asyncException = new AtomicBoolean(false);
private final Map<ProducerId, ProducerBrokerExchange> producerExchanges = new HashMap<ProducerId, ProducerBrokerExchange>();
private final Map<ConsumerId, ConsumerBrokerExchange> consumerExchanges = new HashMap<ConsumerId, ConsumerBrokerExchange>();
- private CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
+ private final CountDownLatch dispatchStoppedLatch = new CountDownLatch(1);
private ConnectionContext context;
private boolean networkConnection;
private boolean faultTolerantConnection;
- private AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
+ private final AtomicInteger protocolVersion = new AtomicInteger(CommandTypes.PROTOCOL_VERSION);
private DemandForwardingBridge duplexBridge;
private final TaskRunnerFactory taskRunnerFactory;
private TransportConnectionStateRegister connectionStateRegister = new SingleTransportConnectionStateRegister();
@@ -168,6 +168,7 @@
this.taskRunnerFactory = taskRunnerFactory;
this.transport = transport;
this.transport.setTransportListener(new DefaultTransportListener() {
+ @Override
public void onCommand(Object o) {
serviceLock.readLock().lock();
try {
@@ -184,6 +185,7 @@
}
}
+ @Override
public void onException(IOException exception) {
serviceLock.readLock().lock();
try {
@@ -241,6 +243,7 @@
public void serviceExceptionAsync(final IOException e) {
if (asyncException.compareAndSet(false, true)) {
new Thread("Async Exception Handler") {
+ @Override
public void run() {
serviceException(e);
}
@@ -654,6 +657,7 @@
}
registerConnectionState(info.getConnectionId(), state);
LOG.debug("Setting up new connection id: " + info.getConnectionId() + ", address: " + getRemoteAddress());
+ this.faultTolerantConnection=info.isFaultTolerant();
// Setup the context.
String clientId = info.getClientId();
context = new ConnectionContext();
@@ -672,6 +676,7 @@
this.manageable = info.isManageable();
state.setContext(context);
state.setConnection(this);
+
try {
broker.addConnection(context, info);
} catch (Exception e) {
@@ -679,9 +684,9 @@
LOG.warn("Failed to add Connection", e);
throw e;
}
- if (info.isManageable() && broker.isFaultTolerantConfiguration()) {
+ if (info.isManageable()) {
// send ConnectionCommand
- ConnectionControl command = new ConnectionControl();
+ ConnectionControl command = this.connector.getConnectionControl();
command.setFaultTolerant(broker.isFaultTolerantConfiguration());
dispatchAsync(command);
}
@@ -867,7 +872,10 @@
}
transport.start();
active = true;
- dispatchAsync(connector.getBrokerInfo());
+ BrokerInfo info = connector.getBrokerInfo().copy();
+ info.setPeerBrokerInfos(this.broker.getPeerBrokerInfos());
+ dispatchAsync(info);
+
connector.onStarted(this);
}
} catch (Exception e) {
@@ -1120,6 +1128,10 @@
public synchronized boolean isNetworkConnection() {
return networkConnection;
}
+
+ public boolean isFaultTolerantConnection() {
+ return this.faultTolerantConnection;
+ }
protected synchronized void setStarting(boolean starting) {
this.starting = starting;
@@ -1222,6 +1234,13 @@
}
return null;
}
+
+ public void updateClient(ConnectionControl control) {
+ if (isActive() && isBlocked() == false && isFaultTolerantConnection() && this.wireFormatInfo != null
+ && this.wireFormatInfo.getVersion() >= 6) {
+ dispatchAsync(control);
+ }
+ }
private ProducerBrokerExchange getProducerBrokerExchange(ProducerId id) {
ProducerBrokerExchange result = producerExchanges.get(id);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java Mon Mar 8 12:48:45 2010
@@ -16,10 +16,17 @@
*/
package org.apache.activemq.broker;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.Iterator;
+import java.util.concurrent.CopyOnWriteArrayList;
+import javax.management.ObjectName;
import org.apache.activemq.broker.jmx.ManagedTransportConnector;
import org.apache.activemq.broker.jmx.ManagementContext;
import org.apache.activemq.broker.region.ConnectorStatistics;
import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.ConnectionControl;
import org.apache.activemq.security.MessageAuthorizationPolicy;
import org.apache.activemq.thread.DefaultThreadPools;
import org.apache.activemq.thread.TaskRunnerFactory;
@@ -34,6 +41,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+
import static org.apache.activemq.thread.DefaultThreadPools.*;
import java.io.IOException;
@@ -53,7 +61,6 @@
protected CopyOnWriteArrayList<TransportConnection> connections = new CopyOnWriteArrayList<TransportConnection>();
protected TransportStatusDetector statusDector;
-
private BrokerService brokerService;
private TransportServer server;
private URI uri;
@@ -61,14 +68,16 @@
private TaskRunnerFactory taskRunnerFactory;
private MessageAuthorizationPolicy messageAuthorizationPolicy;
private DiscoveryAgent discoveryAgent;
- private ConnectorStatistics statistics = new ConnectorStatistics();
+ private final ConnectorStatistics statistics = new ConnectorStatistics();
private URI discoveryUri;
private URI connectUri;
private String name;
private boolean disableAsyncDispatch;
private boolean enableStatusMonitor = false;
private Broker broker;
-
+ private boolean updateClusterClients=false;
+ private boolean rebalanceClusterClients;
+
public TransportConnector() {
}
@@ -109,6 +118,8 @@
rc.setTaskRunnerFactory(getTaskRunnerFactory());
rc.setUri(getUri());
rc.setBrokerService(brokerService);
+ rc.setUpdateClusterClients(isUpdateClusterClients());
+ rc.setRebalanceClusterClients(isRebalanceClusterClients());
return rc;
}
@@ -193,16 +204,13 @@
}
public void start() throws Exception {
-
- TransportServer server = getServer();
-
+ TransportServer server = getServer();
broker = brokerService.getBroker();
brokerInfo.setBrokerName(broker.getBrokerName());
brokerInfo.setBrokerId(broker.getBrokerId());
brokerInfo.setPeerBrokerInfos(broker.getPeerBrokerInfos());
brokerInfo.setFaultTolerantConfiguration(broker.isFaultTolerantConfiguration());
brokerInfo.setBrokerURL(server.getConnectURI().toString());
-
server.setAcceptListener(new TransportAcceptListener() {
public void onAccept(final Transport transport) {
try {
@@ -233,7 +241,6 @@
LOG.debug("Reason: " + error, error);
}
});
-
server.setBrokerInfo(brokerInfo);
server.start();
@@ -366,6 +373,7 @@
this.name = name;
}
+ @Override
public String toString() {
String rc = getName();
if (rc == null) {
@@ -373,6 +381,43 @@
}
return rc;
}
+
+ protected ConnectionControl getConnectionControl() {
+ boolean rebalance = isRebalanceClusterClients();
+ String connectedBrokers = "";
+ String self = "";
+ if (brokerService.getDefaultSocketURI() != null) {
+ self += brokerService.getDefaultSocketURI().toString();
+ self += ",";
+ }
+ if (rebalance == false) {
+ connectedBrokers += self;
+ }
+ if (this.broker.getPeerBrokerInfos() != null) {
+ for (BrokerInfo info : this.broker.getPeerBrokerInfos()) {
+ connectedBrokers += info.getBrokerURL();
+ connectedBrokers += ",";
+ }
+ }
+ if (rebalance) {
+ connectedBrokers += self;
+ }
+
+ ConnectionControl control = new ConnectionControl();
+ control.setConnectedBrokers(connectedBrokers);
+ control.setRebalanceConnection(rebalance);
+ return control;
+
+ }
+
+ public void updateClientClusterInfo() {
+ if (isRebalanceClusterClients() || isUpdateClusterClients()) {
+ ConnectionControl control = getConnectionControl();
+ for (Connection c: this.connections) {
+ c.updateClient(control);
+ }
+ }
+ }
public boolean isDisableAsyncDispatch() {
return disableAsyncDispatch;
@@ -410,4 +455,32 @@
public BrokerService getBrokerService() {
return brokerService;
}
+
+ /**
+ * @return the updateClusterClients
+ */
+ public boolean isUpdateClusterClients() {
+ return this.updateClusterClients;
+ }
+
+ /**
+ * @param updateClusterClients the updateClusterClients to set
+ */
+ public void setUpdateClusterClients(boolean updateClusterClients) {
+ this.updateClusterClients = updateClusterClients;
+ }
+
+ /**
+ * @return the rebalanceClusterClients
+ */
+ public boolean isRebalanceClusterClients() {
+ return this.rebalanceClusterClients;
+ }
+
+ /**
+ * @param rebalanceClusterClients the rebalanceClusterClients to set
+ */
+ public void setRebalanceClusterClients(boolean rebalanceClusterClients) {
+ this.rebalanceClusterClients = rebalanceClusterClients;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Mon Mar 8 12:48:45 2010
@@ -21,6 +21,7 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -34,6 +35,7 @@
import org.apache.activemq.broker.ConsumerBrokerExchange;
import org.apache.activemq.broker.EmptyBroker;
import org.apache.activemq.broker.ProducerBrokerExchange;
+import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQDestination;
@@ -585,12 +587,14 @@
@Override
public synchronized void addBroker(Connection connection, BrokerInfo info) {
brokerInfos.add(info);
+ updateClients();
}
@Override
public synchronized void removeBroker(Connection connection, BrokerInfo info) {
if (info != null) {
brokerInfos.remove(info);
+ updateClients();
}
}
@@ -830,4 +834,11 @@
LOG.warn("unmatched destination: " + destination + ", in consumerControl: " + control);
}
}
+
+ protected void updateClients() {
+ List<TransportConnector> connectors = this.brokerService.getTransportConnectors();
+ for (TransportConnector connector : connectors) {
+ connector.updateClientClusterInfo();
+ }
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BaseCommand.java Mon Mar 8 12:48:45 2010
@@ -17,7 +17,6 @@
package org.apache.activemq.command;
import java.util.Map;
-
import org.apache.activemq.util.IntrospectionSupport;
@@ -61,6 +60,7 @@
this.responseRequired = responseRequired;
}
+ @Override
public String toString() {
return toString(null);
}
@@ -104,6 +104,10 @@
public boolean isShutdownInfo() {
return false;
}
+
+ public boolean isConnectionControl() {
+ return false;
+ }
/**
* The endpoint within the transport where this message came from.
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java Mon Mar 8 12:48:45 2010
@@ -16,13 +16,12 @@
*/
package org.apache.activemq.command;
-import org.apache.activemq.plugin.StatisticsBrokerPlugin;
+import java.io.IOException;
+import java.util.Properties;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.util.MarshallingSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import java.io.IOException;
-import java.util.Properties;
/**
* When a client connects to a broker, the broker send the client a BrokerInfo
@@ -49,7 +48,30 @@
long connectionId;
String brokerUploadUrl;
String networkProperties;
+
+ public BrokerInfo copy() {
+ BrokerInfo copy = new BrokerInfo();
+ copy(copy);
+ return copy;
+ }
+
+ private void copy(BrokerInfo copy) {
+ super.copy(copy);
+ copy.brokerId = this.brokerId;
+ copy.brokerURL = this.brokerURL;
+ copy.slaveBroker = this.slaveBroker;
+ copy.masterBroker = this.masterBroker;
+ copy.faultTolerantConfiguration = this.faultTolerantConfiguration;
+ copy.networkConnection = this.networkConnection;
+ copy.duplexConnection = this.duplexConnection;
+ copy.peerBrokerInfos = this.peerBrokerInfos;
+ copy.brokerName = this.brokerName;
+ copy.connectionId = this.connectionId;
+ copy.brokerUploadUrl = this.brokerUploadUrl;
+ copy.networkProperties = this.networkProperties;
+ }
+ @Override
public boolean isBrokerInfo() {
return true;
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Command.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Command.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Command.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/Command.java Mon Mar 8 12:48:45 2010
@@ -52,6 +52,8 @@
boolean isMessageDispatchNotification();
boolean isShutdownInfo();
+
+ boolean isConnectionControl();
Response visit(CommandVisitor visitor) throws Exception;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionControl.java Mon Mar 8 12:48:45 2010
@@ -31,6 +31,9 @@
protected boolean close;
protected boolean exit;
protected boolean faultTolerant;
+ protected String connectedBrokers="";
+ protected String reconnectTo = "";
+ protected boolean rebalanceConnection;
public byte getDataStructureType() {
return DATA_STRUCTURE_TYPE;
@@ -39,6 +42,10 @@
public Response visit(CommandVisitor visitor) throws Exception {
return visitor.processConnectionControl(this);
}
+ @Override
+ public boolean isConnectionControl() {
+ return true;
+ }
/**
* @openwire:property version=1
@@ -114,4 +121,49 @@
public void setSuspend(boolean suspend) {
this.suspend = suspend;
}
+
+ /**
+ * @openwire:property version=6 cache=false
+ * @return connected brokers.
+ */
+ public String getConnectedBrokers() {
+ return this.connectedBrokers;
+ }
+
+ /**
+ * @param connectedBrokers the connectedBrokers to set
+ */
+ public void setConnectedBrokers(String connectedBrokers) {
+ this.connectedBrokers = connectedBrokers;
+ }
+
+ /**
+ * @openwire:property version=6 cache=false
+ * @return the reconnectTo
+ */
+ public String getReconnectTo() {
+ return this.reconnectTo;
+ }
+
+ /**
+ * @param reconnectTo the reconnectTo to set
+ */
+ public void setReconnectTo(String reconnectTo) {
+ this.reconnectTo = reconnectTo;
+ }
+
+ /**
+ * @return the rebalanceConnection
+ * @openwire:property version=6 cache=false
+ */
+ public boolean isRebalanceConnection() {
+ return this.rebalanceConnection;
+ }
+
+ /**
+ * @param rebalanceConnection the rebalanceConnection to set
+ */
+ public void setRebalanceConnection(boolean rebalanceConnection) {
+ this.rebalanceConnection = rebalanceConnection;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ConnectionInfo.java Mon Mar 8 12:48:45 2010
@@ -35,6 +35,7 @@
protected boolean brokerMasterConnector;
protected boolean manageable;
protected boolean clientMaster = true;
+ protected boolean faultTolerant = false;
protected transient Object transportContext;
public ConnectionInfo() {
@@ -65,6 +66,7 @@
copy.manageable = manageable;
copy.clientMaster = clientMaster;
copy.transportContext = transportContext;
+ copy.faultTolerant= faultTolerant;
}
/**
@@ -199,4 +201,19 @@
this.clientMaster = clientMaster;
}
+ /**
+ * @openwire:property version=6 cache=false
+ * @return the faultTolerant
+ */
+ public boolean isFaultTolerant() {
+ return this.faultTolerant;
+ }
+
+ /**
+ * @param faultTolerant the faultTolerant to set
+ */
+ public void setFaultTolerant(boolean faultTolerant) {
+ this.faultTolerant = faultTolerant;
+ }
+
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/PartialCommand.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/PartialCommand.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/PartialCommand.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/PartialCommand.java Mon Mar 8 12:48:45 2010
@@ -117,6 +117,10 @@
public boolean isShutdownInfo() {
return false;
}
+
+ public boolean isConnectionControl() {
+ return false;
+ }
public void setResponseRequired(boolean responseRequired) {
}
@@ -135,7 +139,6 @@
size = data.length;
}
return "PartialCommand[id: " + commandId + " data: " + size + " byte(s)]";
- }
-
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/RemoveSubscriptionInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/RemoveSubscriptionInfo.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/RemoveSubscriptionInfo.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/RemoveSubscriptionInfo.java Mon Mar 8 12:48:45 2010
@@ -47,14 +47,12 @@
/**
* @openwire:property version=1
- * @deprecated
*/
public String getSubcriptionName() {
return subscriptionName;
}
/**
- * @deprecated
*/
public void setSubcriptionName(String subscriptionName) {
this.subscriptionName = subscriptionName;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/SubscriptionInfo.java Mon Mar 8 12:48:45 2010
@@ -76,7 +76,6 @@
/**
* @openwire:property version=1
- * @deprecated
*/
public String getSubcriptionName() {
return subscriptionName;
@@ -84,7 +83,6 @@
/**
* @param subscriptionName *
- * @deprecated
*/
public void setSubcriptionName(String subscriptionName) {
this.subscriptionName = subscriptionName;
@@ -102,16 +100,19 @@
return false;
}
+ @Override
public String toString() {
return IntrospectionSupport.toString(this);
}
+ @Override
public int hashCode() {
int h1 = clientId != null ? clientId.hashCode() : -1;
int h2 = subscriptionName != null ? subscriptionName.hashCode() : -1;
return h1 ^ h2;
}
+ @Override
public boolean equals(Object obj) {
boolean result = false;
if (obj instanceof SubscriptionInfo) {
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/WireFormatInfo.java Mon Mar 8 12:48:45 2010
@@ -23,7 +23,6 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.util.ByteArrayInputStream;
import org.apache.activemq.util.ByteArrayOutputStream;
@@ -290,6 +289,7 @@
return visitor.processWireFormat(this);
}
+ @Override
public String toString() {
Map<String, Object> p = null;
try {
@@ -356,6 +356,10 @@
public boolean isShutdownInfo() {
return false;
}
+
+ public boolean isConnectionControl() {
+ return false;
+ }
public void setCachedMarshalledForm(WireFormat wireFormat, ByteSequence data) {
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Mon Mar 8 12:48:45 2010
@@ -30,7 +30,6 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
@@ -131,7 +130,7 @@
private BrokerInfo localBrokerInfo;
private BrokerInfo remoteBrokerInfo;
- private AtomicBoolean started = new AtomicBoolean();
+ private final AtomicBoolean started = new AtomicBoolean();
private TransportConnection duplexInitiatingConnection;
private BrokerService brokerService = null;
@@ -153,11 +152,13 @@
if (started.compareAndSet(false, true)) {
localBroker.setTransportListener(new DefaultTransportListener() {
+ @Override
public void onCommand(Object o) {
Command command = (Command) o;
serviceLocalCommand(command);
}
+ @Override
public void onException(IOException error) {
serviceLocalException(error);
}
@@ -318,6 +319,7 @@
if (!isCreatedByDuplex()) {
BrokerInfo brokerInfo = new BrokerInfo();
brokerInfo.setBrokerName(configuration.getBrokerName());
+ brokerInfo.setBrokerURL(configuration.getBrokerURL());
brokerInfo.setNetworkConnection(true);
brokerInfo.setDuplexConnection(configuration.isDuplex());
// set our properties
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java Mon Mar 8 12:48:45 2010
@@ -17,7 +17,6 @@
package org.apache.activemq.network;
import java.util.List;
-
import org.apache.activemq.command.ActiveMQDestination;
/**
@@ -36,6 +35,7 @@
private int prefetchSize = 1000;
private int networkTTL = 1;
private String brokerName = "localhost";
+ private String brokerURL = "";
private String userName;
private String password;
private String destinationFilter = ">";
@@ -274,4 +274,18 @@
public void setSuppressDuplicateQueueSubscriptions(boolean val) {
suppressDuplicateQueueSubscriptions = val;
}
+
+ /**
+ * @return the brokerURL
+ */
+ public String getBrokerURL() {
+ return this.brokerURL;
+ }
+
+ /**
+ * @param brokerURL the brokerURL to set
+ */
+ public void setBrokerURL(String brokerURL) {
+ this.brokerURL = brokerURL;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionControlMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionControlMarshaller.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionControlMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionControlMarshaller.java Mon Mar 8 12:48:45 2010
@@ -71,6 +71,9 @@
info.setFaultTolerant(bs.readBoolean());
info.setResume(bs.readBoolean());
info.setSuspend(bs.readBoolean());
+ info.setConnectedBrokers(tightUnmarshalString(dataIn, bs));
+ info.setReconnectTo(tightUnmarshalString(dataIn, bs));
+ info.setRebalanceConnection(bs.readBoolean());
}
@@ -88,6 +91,9 @@
bs.writeBoolean(info.isFaultTolerant());
bs.writeBoolean(info.isResume());
bs.writeBoolean(info.isSuspend());
+ rc += tightMarshalString1(info.getConnectedBrokers(), bs);
+ rc += tightMarshalString1(info.getReconnectTo(), bs);
+ bs.writeBoolean(info.isRebalanceConnection());
return rc + 0;
}
@@ -108,6 +114,9 @@
bs.readBoolean();
bs.readBoolean();
bs.readBoolean();
+ tightMarshalString2(info.getConnectedBrokers(), dataOut, bs);
+ tightMarshalString2(info.getReconnectTo(), dataOut, bs);
+ bs.readBoolean();
}
@@ -127,6 +136,9 @@
info.setFaultTolerant(dataIn.readBoolean());
info.setResume(dataIn.readBoolean());
info.setSuspend(dataIn.readBoolean());
+ info.setConnectedBrokers(looseUnmarshalString(dataIn));
+ info.setReconnectTo(looseUnmarshalString(dataIn));
+ info.setRebalanceConnection(dataIn.readBoolean());
}
@@ -144,6 +156,9 @@
dataOut.writeBoolean(info.isFaultTolerant());
dataOut.writeBoolean(info.isResume());
dataOut.writeBoolean(info.isSuspend());
+ looseMarshalString(info.getConnectedBrokers(), dataOut);
+ looseMarshalString(info.getReconnectTo(), dataOut);
+ dataOut.writeBoolean(info.isRebalanceConnection());
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionInfoMarshaller.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionInfoMarshaller.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionInfoMarshaller.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v6/ConnectionInfoMarshaller.java Mon Mar 8 12:48:45 2010
@@ -85,6 +85,7 @@
info.setBrokerMasterConnector(bs.readBoolean());
info.setManageable(bs.readBoolean());
info.setClientMaster(bs.readBoolean());
+ info.setFaultTolerant(bs.readBoolean());
}
@@ -105,6 +106,7 @@
bs.writeBoolean(info.isBrokerMasterConnector());
bs.writeBoolean(info.isManageable());
bs.writeBoolean(info.isClientMaster());
+ bs.writeBoolean(info.isFaultTolerant());
return rc + 0;
}
@@ -128,6 +130,7 @@
bs.readBoolean();
bs.readBoolean();
bs.readBoolean();
+ bs.readBoolean();
}
@@ -161,6 +164,7 @@
info.setBrokerMasterConnector(dataIn.readBoolean());
info.setManageable(dataIn.readBoolean());
info.setClientMaster(dataIn.readBoolean());
+ info.setFaultTolerant(dataIn.readBoolean());
}
@@ -181,6 +185,7 @@
dataOut.writeBoolean(info.isBrokerMasterConnector());
dataOut.writeBoolean(info.isManageable());
dataOut.writeBoolean(info.isClientMaster());
+ dataOut.writeBoolean(info.isFaultTolerant());
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/proxy/ProxyConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/proxy/ProxyConnector.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/proxy/ProxyConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/proxy/ProxyConnector.java Mon Mar 8 12:48:45 2010
@@ -21,7 +21,6 @@
import java.net.URISyntaxException;
import java.util.Iterator;
import java.util.concurrent.CopyOnWriteArrayList;
-
import org.apache.activemq.Service;
import org.apache.activemq.transport.CompositeTransport;
import org.apache.activemq.transport.Transport;
@@ -46,7 +45,7 @@
private URI remote;
private URI localUri;
private String name;
- private CopyOnWriteArrayList<ProxyConnection> connections = new CopyOnWriteArrayList<ProxyConnection>();
+ private final CopyOnWriteArrayList<ProxyConnection> connections = new CopyOnWriteArrayList<ProxyConnection>();
public void start() throws Exception {
@@ -131,13 +130,14 @@
private Transport createRemoteTransport() throws Exception {
Transport transport = TransportFactory.compositeConnect(remote);
- CompositeTransport ct = (CompositeTransport)transport.narrow(CompositeTransport.class);
+ CompositeTransport ct = transport.narrow(CompositeTransport.class);
if (ct != null && localUri != null) {
- ct.add(new URI[] {localUri});
+ ct.add(false,new URI[] {localUri});
}
// Add a transport filter so that can track the transport life cycle
transport = new TransportFilter(transport) {
+ @Override
public void stop() throws Exception {
LOG.info("Stopping proxy.");
super.stop();
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/CompositeTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/CompositeTransport.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/CompositeTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/CompositeTransport.java Mon Mar 8 12:48:45 2010
@@ -19,6 +19,6 @@
import java.net.URI;
public interface CompositeTransport extends Transport {
- void add(URI[] uris);
- void remove(URI[] uris);
+ void add(boolean rebalance,URI[] uris);
+ void remove(boolean rebalance,URI[] uris);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/Transport.java Mon Mar 8 12:48:45 2010
@@ -18,7 +18,6 @@
import java.io.IOException;
import java.net.URI;
-
import org.apache.activemq.Service;
/**
@@ -148,11 +147,28 @@
boolean isConnected();
/**
+ * @return true if reconnect is supported
+ */
+ boolean isReconnectSupported();
+
+ /**
+ * @return true if updating uris is supported
+ */
+ boolean isUpdateURIsSupported();
+ /**
* reconnect to another location
* @param uri
* @throws IOException on failure of if not supported
*/
void reconnect(URI uri) throws IOException;
+
+ /**
+ * Provide a list of available alternative locations
+ * @param rebalance
+ * @param uris
+ * @throws IOException
+ */
+ void updateURIs(boolean rebalance,URI[] uris) throws IOException;
/**
* Returns a counter which gets incremented as data is read from the transport.
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFilter.java Mon Mar 8 12:48:45 2010
@@ -45,7 +45,8 @@
/**
* @see org.apache.activemq.Service#start()
- * @throws IOException if the next channel has not been set.
+ * @throws IOException
+ * if the next channel has not been set.
*/
public void start() throws Exception {
if (next == null) {
@@ -75,6 +76,7 @@
return next;
}
+ @Override
public String toString() {
return next.toString();
}
@@ -126,19 +128,31 @@
return next.isFaultTolerant();
}
- public boolean isDisposed() {
- return next.isDisposed();
- }
-
- public boolean isConnected() {
+ public boolean isDisposed() {
+ return next.isDisposed();
+ }
+
+ public boolean isConnected() {
return next.isConnected();
}
- public void reconnect(URI uri) throws IOException {
- next.reconnect(uri);
- }
+ public void reconnect(URI uri) throws IOException {
+ next.reconnect(uri);
+ }
public int getReceiveCounter() {
return next.getReceiveCounter();
}
+
+ public boolean isReconnectSupported() {
+ return next.isReconnectSupported();
+ }
+
+ public boolean isUpdateURIsSupported() {
+ return next.isUpdateURIsSupported();
+ }
+
+ public void updateURIs(boolean rebalance,URI[] uris) throws IOException {
+ next.updateURIs(rebalance,uris);
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportSupport.java Mon Mar 8 12:48:45 2010
@@ -18,7 +18,6 @@
import java.io.IOException;
import java.net.URI;
-
import org.apache.activemq.util.ServiceSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -96,8 +95,9 @@
try {
transportListener.onException(e);
} catch (RuntimeException e2) {
- // Handle any unexpected runtime exceptions by debug logging them.
- LOG.debug("Unexpected runtime exception: "+e2, e2);
+ // Handle any unexpected runtime exceptions by debug logging
+ // them.
+ LOG.debug("Unexpected runtime exception: " + e2, e2);
}
}
}
@@ -111,18 +111,28 @@
public boolean isFaultTolerant() {
return false;
}
-
-
- public void reconnect(URI uri) throws IOException {
- throw new IOException("Not supported");
- }
-
- public boolean isDisposed() {
- return isStopped();
- }
-
- public boolean isConnected() {
- return isStarted();
- }
+
+ public void reconnect(URI uri) throws IOException {
+ throw new IOException("Not supported");
+ }
+
+ public boolean isReconnectSupported() {
+ return false;
+ }
+
+ public boolean isUpdateURIsSupported() {
+ return false;
+ }
+ public void updateURIs(boolean reblance,URI[] uris) throws IOException {
+ throw new IOException("Not supported");
+ }
+
+ public boolean isDisposed() {
+ return isStopped();
+ }
+
+ public boolean isConnected() {
+ return isStarted();
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/discovery/DiscoveryTransport.java Mon Mar 8 12:48:45 2010
@@ -20,7 +20,6 @@
import java.net.URISyntaxException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-
import org.apache.activemq.command.DiscoveryEvent;
import org.apache.activemq.transport.CompositeTransport;
import org.apache.activemq.transport.TransportFilter;
@@ -50,6 +49,7 @@
this.next = next;
}
+ @Override
public void start() throws Exception {
if (discoveryAgent == null) {
throw new IllegalStateException("discoveryAgent not configured");
@@ -61,6 +61,7 @@
next.start();
}
+ @Override
public void stop() throws Exception {
ServiceStopper ss = new ServiceStopper();
ss.stop(discoveryAgent);
@@ -75,7 +76,7 @@
URI uri = new URI(url);
serviceURIs.put(event.getServiceName(), uri);
LOG.info("Adding new broker connection URL: " + uri);
- next.add(new URI[] {URISupport.applyParameters(uri, parameters)});
+ next.add(false,new URI[] {URISupport.applyParameters(uri, parameters)});
} catch (URISyntaxException e) {
LOG.warn("Could not connect to remote URI: " + url + " due to bad URI syntax: " + e, e);
}
@@ -85,7 +86,7 @@
public void onServiceRemove(DiscoveryEvent event) {
URI uri = serviceURIs.get(event.getServiceName());
if (uri != null) {
- next.remove(new URI[] {uri});
+ next.remove(false,new URI[] {uri});
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java?rev=920306&r1=920305&r2=920306&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/BackupTransport.java Mon Mar 8 12:48:45 2010
@@ -20,12 +20,11 @@
import java.io.IOException;
import java.net.URI;
-
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.Transport;
class BackupTransport extends DefaultTransportListener{
- private FailoverTransport failoverTransport;
+ private final FailoverTransport failoverTransport;
private Transport transport;
private URI uri;
private boolean disposed;
@@ -33,10 +32,11 @@
BackupTransport(FailoverTransport ft){
this.failoverTransport=ft;
}
- public void onException(IOException error) {
+ @Override
+ public void onException(IOException error) {
this.disposed=true;
if (failoverTransport!=null) {
- this.failoverTransport.reconnect();
+ this.failoverTransport.reconnect(false);
}
}
@@ -62,11 +62,13 @@
this.disposed = disposed;
}
- public int hashCode() {
+ @Override
+ public int hashCode() {
return uri != null ? uri.hashCode():-1;
}
- public boolean equals(Object obj) {
+ @Override
+ public boolean equals(Object obj) {
if (obj instanceof BackupTransport) {
BackupTransport other = (BackupTransport) obj;
return uri== null && other.uri==null ||