You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/09/02 13:48:22 UTC

svn commit: r691206 - in /activemq/trunk/activemq-core/src/main: java/org/apache/activemq/broker/ java/org/apache/activemq/broker/ft/ java/org/apache/activemq/broker/region/ resources/

Author: rajdavies
Date: Tue Sep  2 04:48:21 2008
New Revision: 691206

URL: http://svn.apache.org/viewvc?rev=691206&view=rev
Log:
Apply patch for https://issues.apache.org/activemq/browse/AMQ-596

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    activemq/trunk/activemq-core/src/main/resources/activemq.xsd

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=691206&r1=691205&r2=691206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Tue Sep  2 04:48:21 2008
@@ -102,7 +102,7 @@
  * @version $Revision: 1.1 $
  */
 public class BrokerService implements Service {
-
+	protected CountDownLatch slaveStartSignal = new CountDownLatch(1);
     public static final String DEFAULT_PORT = "61616";
     public static final String LOCAL_HOST_NAME;
     public static final String DEFAULT_BROKER_NAME = "localhost";
@@ -117,6 +117,8 @@
     private boolean useShutdownHook = true;
     private boolean useLoggingForShutdownErrors;
     private boolean shutdownOnMasterFailure;
+    private boolean shutdownOnSlaveFailure;
+    private boolean waitForSlave;
     private String brokerName = DEFAULT_BROKER_NAME;
     private File dataDirectoryFile;
     private File tmpDataDirectory;
@@ -1820,6 +1822,19 @@
         return context;
     }
 
+    protected void waitForSlave(){
+        try {
+        	slaveStartSignal.await();
+        }catch(InterruptedException e){
+        	LOG.error("Exception waiting for slave:"+e);
+        }
+    }
+    
+    protected void slaveConnectionEstablished(){
+    	slaveStartSignal.countDown();
+    }
+    
+    
     /**
      * Start all transport and network connections, proxies and bridges
      * 
@@ -1847,7 +1862,9 @@
             map.put("network", "true");
             map.put("async", "false");
             uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
-
+            if(isWaitForSlave()){
+            	waitForSlave();
+            }
             for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
                 NetworkConnector connector = iter.next();
                 connector.setLocalUri(uri);
@@ -1984,4 +2001,24 @@
         this.sslContext = sslContext;
     }
 
+	public boolean isShutdownOnSlaveFailure() {
+		return shutdownOnSlaveFailure;
+	}
+
+	public void setShutdownOnSlaveFailure(boolean shutdownOnSlaveFailure) {
+		this.shutdownOnSlaveFailure = shutdownOnSlaveFailure;
+	}
+
+	public boolean isWaitForSlave() {
+		return waitForSlave;
+	}
+
+	public void setWaitForSlave(boolean waitForSlave) {
+		this.waitForSlave = waitForSlave;
+	}
+
+	public CountDownLatch getSlaveStartSignal() {
+		return slaveStartSignal;
+	}
+
 }
\ No newline at end of file

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=691206&r1=691205&r2=691206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Tue Sep  2 04:48:21 2008
@@ -87,6 +87,7 @@
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.MarshallingSupport;
+import org.apache.activemq.util.ServiceSupport;
 import org.apache.activemq.util.URISupport;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -208,6 +209,20 @@
     }
 
     public void serviceTransportException(IOException e) {
+    	BrokerService bService=connector.getBrokerService();
+    	if(bService.isShutdownOnSlaveFailure()){
+	    	if(brokerInfo!=null){
+		    	if(brokerInfo.isSlaveBroker()){
+		        	LOG.error("Slave has exception: " + e.getMessage()+" shutting down master now.", e);
+		            try {
+		                broker.stop();
+		                bService.stop();
+		        	}catch(Exception ex){
+		                LOG.warn("Failed to stop the master",ex);
+		            }
+		        }
+	    	}
+    	}
         if (!stopping.get()) {
             transportException.set(e);
             if (TRANSPORTLOG.isDebugEnabled()) {
@@ -601,7 +616,11 @@
     }
 
     public Response processAddConnection(ConnectionInfo info) throws Exception {
-        
+    	//if the broker service has slave attached, wait for the slave to be attached to allow client connection. slave connection is fine
+    	if(!info.isBrokerMasterConnector()&&connector.getBrokerService().isWaitForSlave()&&connector.getBrokerService().getSlaveStartSignal().getCount()==1){
+    			ServiceSupport.dispose(transport);
+    			return new ExceptionResponse(new Exception("Master's slave not attached yet."));
+    	}
         // Older clients should have been defaulting this field to true.. but they were not. 
         if( wireFormatInfo!=null && wireFormatInfo.getVersion() <= 2 ) {
             info.setClientMaster(true);
@@ -1129,6 +1148,9 @@
             masterBroker = new MasterBroker(parent, transport);
             masterBroker.startProcessing();
             LOG.info("Slave Broker " + info.getBrokerName() + " is attached");
+            BrokerService bService=connector.getBrokerService();
+            bService.slaveConnectionEstablished();
+            
         } else if (info.isNetworkConnection() && info.isDuplexConnection()) {
             // so this TransportConnection is the rear end of a network bridge
             // We have been requested to create a two way pipe ...

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java?rev=691206&r1=691205&r2=691206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnector.java Tue Sep  2 04:48:21 2008
@@ -394,4 +394,8 @@
     public Broker getBroker() {
         return broker;
     }
+
+	public BrokerService getBrokerService() {
+		return brokerService;
+	}
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java?rev=691206&r1=691205&r2=691206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java Tue Sep  2 04:48:21 2008
@@ -139,7 +139,7 @@
         } catch (Exception e) {
             masterActive.set(false);
             LOG.error("Failed to start network bridge: " + e, e);
-        }    
+        }   
     }
 
     protected void startBridge() throws Exception {
@@ -148,10 +148,8 @@
         connectionInfo.setClientId(idGenerator.generateId());
         connectionInfo.setUserName(userName);
         connectionInfo.setPassword(password);
+        connectionInfo.setBrokerMasterConnector(true);
         localBroker.oneway(connectionInfo);
-        ConnectionInfo remoteInfo = new ConnectionInfo();
-        connectionInfo.copy(remoteInfo);
-        remoteInfo.setBrokerMasterConnector(true);
         remoteBroker.oneway(connectionInfo);
         sessionInfo = new SessionInfo(connectionInfo, 1);
         localBroker.oneway(sessionInfo);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=691206&r1=691205&r2=691206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java Tue Sep  2 04:48:21 2008
@@ -392,7 +392,7 @@
                     try {
 
                         context.getBroker().addDestination(context, destination);
-                        // dest = addDestination(context, destination);
+                        dest = addDestination(context, destination);
                     } catch (DestinationAlreadyExistsException e) {
                         // if the destination already exists then lets ignore
                         // this error

Modified: activemq/trunk/activemq-core/src/main/resources/activemq.xsd
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/resources/activemq.xsd?rev=691206&r1=691205&r2=691206&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/resources/activemq.xsd (original)
+++ activemq/trunk/activemq-core/src/main/resources/activemq.xsd Tue Sep  2 04:48:21 2008
@@ -813,6 +813,8 @@
       <xs:attribute name='producerSystemUsagePortion' type='xs:integer'/>
       <xs:attribute name='regionBroker' type='xs:string'/>
       <xs:attribute name='shutdownOnMasterFailure' type='xs:boolean'/>
+      <xs:attribute name='shutdownOnSlaveFailure' type='xs:boolean'/>
+      <xs:attribute name='waitForSlave' type='xs:boolean'/>
       <xs:attribute name='splitSystemUsageForProducersConsumers' type='xs:boolean'/>
       <xs:attribute name='sslContext' type='xs:string'/>
       <xs:attribute name='start' type='xs:boolean'>