You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/09/02 13:48:22 UTC
svn commit: r691206 - in /activemq/trunk/activemq-core/src/main:
java/org/apache/activemq/broker/ java/org/apache/activemq/broker/ft/
java/org/apache/activemq/broker/region/ resources/
Author: rajdavies
Date: Tue Sep 2 04:48:21 2008
New Revision: 691206
URL: http://svn.apache.org/viewvc?rev=691206&view=rev
Log:
Apply patch for https://issues.apache.org/activemq/browse/AMQ-596
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
activemq/trunk/activemq-core/src/main/resources/activemq.xsd
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=691206&r1=691205&r2=691206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Tue Sep 2 04:48:21 2008
@@ -102,7 +102,7 @@
* @version $Revision: 1.1 $
*/
public class BrokerService implements Service {
-
+ protected CountDownLatch slaveStartSignal = new CountDownLatch(1);
public static final String DEFAULT_PORT = "61616";
public static final String LOCAL_HOST_NAME;
public static final String DEFAULT_BROKER_NAME = "localhost";
@@ -117,6 +117,8 @@
private boolean useShutdownHook = true;
private boolean useLoggingForShutdownErrors;
private boolean shutdownOnMasterFailure;
+ private boolean shutdownOnSlaveFailure;
+ private boolean waitForSlave;
private String brokerName = DEFAULT_BROKER_NAME;
private File dataDirectoryFile;
private File tmpDataDirectory;
@@ -1820,6 +1822,19 @@
return context;
}
+ protected void waitForSlave(){
+ try {
+ slaveStartSignal.await();
+ }catch(InterruptedException e){
+ LOG.error("Exception waiting for slave:"+e);
+ }
+ }
+
+ protected void slaveConnectionEstablished(){
+ slaveStartSignal.countDown();
+ }
+
+
/**
* Start all transport and network connections, proxies and bridges
*
@@ -1847,7 +1862,9 @@
map.put("network", "true");
map.put("async", "false");
uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
-
+ if(isWaitForSlave()){
+ waitForSlave();
+ }
for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
NetworkConnector connector = iter.next();
connector.setLocalUri(uri);
@@ -1984,4 +2001,24 @@
this.sslContext = sslContext;
}
+ public boolean isShutdownOnSlaveFailure() {
+ return shutdownOnSlaveFailure;
+ }
+
+ public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) {
+ this.shutdownOnSlaveFailure = shutdownOnSlaveFailure;
+ }
+
+ public boolean isWaitForSlave() {
+ return waitForSlave;
+ }
+
+ public void setWaitForSlave(boolean waitForSlave) {
+ this.waitForSlave = waitForSlave;
+ }
+
+ public CountDownLatch getSlaveStartSignal() {
+ return slaveStartSignal;
+ }
+
}
\ No newline at end of file
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=691206&r1=691205&r2=691206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Tue Sep 2 04:48:21 2008
@@ -87,6 +87,7 @@
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.util.IntrospectionSupport;
import org.apache.activemq.util.MarshallingSupport;
+import org.apache.activemq.util.ServiceSupport;
import org.apache.activemq.util.URISupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -208,6 +209,20 @@
}
public void serviceTransportException(IOException e) {
+ BrokerService bService=connector.getBrokerService();
+ if(bService.isShutdownOnSlaveFailure()){
+ if(brokerInfo!=null){
+ if(brokerInfo.isSlaveBroker()){
+ LOG.error("Slave has exception: " + e.getMessage()+" shutting down master now.", e);
+ try {
+ broker.stop();
+ bService.stop();
+ }catch(Exception ex){
+ LOG.warn("Failed to stop the master",ex);
+ }
+ }
+ }
+ }
if (!stopping.get()) {
transportException.set(e);
if (TRANSPORTLOG.isDebugEnabled()) {
@@ -601,7 +616,11 @@
}
public Response processAddConnection(ConnectionInfo info) throws Exception {
-
+ //if the broker service has slave attached, wait for the slave to be attached to allow client connection. slave connection is fine
+ if(!info.isBrokerMasterConnector()&&connector.getBrokerService().isWaitForSlave()&&connector.getBrokerService().getSlaveStartSignal().getCount()==1){
+ ServiceSupport.dispose(transport);
+ return new ExceptionResponse(new Exception("Master's slave not attached yet."));
+ }
// Older clients should have been defaulting this field to true.. but they were not.
if( wireFormatInfo!=null && wireFormatInfo.getVersion() <= 2 ) {
info.setClientMaster(true);
@@ -1129,6 +1148,9 @@
masterBroker = new MasterBroker(parent, transport);
masterBroker.startProcessing();
LOG.info("Slave Broker " + info.getBrokerName() + " is attached");
+ BrokerService bService=connector.getBrokerService();
+ bService.slaveConnectionEstablished();
+
} else if (info.isNetworkConnection() && info.isDuplexConnection()) {
// so this TransportConnection is the rear end of a network bridge
// We have been requested to create a two way pipe ...
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?rev=691206&r1=691205&r2=691206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java Tue Sep 2 04:48:21 2008
@@ -394,4 +394,8 @@
public Broker getBroker() {
return broker;
}
+
+ public BrokerService getBrokerService() {
+ return brokerService;
+ }
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java?rev=691206&r1=691205&r2=691206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java Tue Sep 2 04:48:21 2008
@@ -139,7 +139,7 @@
} catch (Exception e) {
masterActive.set(false);
LOG.error("Failed to start network bridge: " + e, e);
- }
+ }
}
protected void startBridge() throws Exception {
@@ -148,10 +148,8 @@
connectionInfo.setClientId(idGenerator.generateId());
connectionInfo.setUserName(userName);
connectionInfo.setPassword(password);
+ connectionInfo.setBrokerMasterConnector(true);
localBroker.oneway(connectionInfo);
- ConnectionInfo remoteInfo = new ConnectionInfo();
- connectionInfo.copy(remoteInfo);
- remoteInfo.setBrokerMasterConnector(true);
remoteBroker.oneway(connectionInfo);
sessionInfo = new SessionInfo(connectionInfo, 1);
localBroker.oneway(sessionInfo);
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=691206&r1=691205&r2=691206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Tue Sep 2 04:48:21 2008
@@ -392,7 +392,7 @@
try {
context.getBroker().addDestination(context, destination);
- // dest = addDestination(context, destination);
+ dest = addDestination(context, destination);
} catch (DestinationAlreadyExistsException e) {
// if the destination already exists then lets ignore
// this error
Modified: activemq/trunk/activemq-core/src/main/resources/activemq.xsd
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/activemq.xsd?rev=691206&r1=691205&r2=691206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/resources/activemq.xsd (original)
+++ activemq/trunk/activemq-core/src/main/resources/activemq.xsd Tue Sep 2 04:48:21 2008
@@ -813,6 +813,8 @@
<xs:attribute name='producerSystemUsagePortion' type='xs:integer'/>
<xs:attribute name='regionBroker' type='xs:string'/>
<xs:attribute name='shutdownOnMasterFailure' type='xs:boolean'/>
+ <xs:attribute name='shutdownOnSlaveFailure' type='xs:boolean'/>
+ <xs:attribute name='waitForSlave' type='xs:boolean'/>
<xs:attribute name='splitSystemUsageForProducersConsumers' type='xs:boolean'/>
<xs:attribute name='sslContext' type='xs:string'/>
<xs:attribute name='start' type='xs:boolean'>