You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2012/11/13 10:31:38 UTC

svn commit: r1408651 - in /activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker: BrokerService.java TransportConnection.java ft/ jmx/FTConnectorView.java jmx/FTConnectorViewMBean.java

Author: rajdavies
Date: Tue Nov 13 09:31:37 2012
New Revision: 1408651

URL: http://svn.apache.org/viewvc?rev=1408651&view=rev
Log:
Fix for https://issues.apache.org/jira/browse/AMQ-4165 - remove pure master/slave functionality

Removed:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ft/
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/FTConnectorView.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/FTConnectorViewMBean.java
Modified:
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
    activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1408651&r1=1408650&r2=1408651&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java Tue Nov 13 09:31:37 2012
@@ -21,7 +21,6 @@ import org.apache.activemq.Configuration
 import org.apache.activemq.Service;
 import org.apache.activemq.advisory.AdvisoryBroker;
 import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
-import org.apache.activemq.broker.ft.MasterConnector;
 import org.apache.activemq.broker.jmx.*;
 import org.apache.activemq.broker.region.*;
 import org.apache.activemq.broker.region.policy.PolicyMap;
@@ -78,7 +77,6 @@ import java.util.concurrent.atomic.Atomi
  * @org.apache.xbean.XBean
  */
 public class BrokerService implements Service {
-    protected CountDownLatch slaveStartSignal = new CountDownLatch(1);
     public static final String DEFAULT_PORT = "61616";
     public static final String LOCAL_HOST_NAME;
     public static final String BROKER_VERSION;
@@ -123,8 +121,6 @@ public class BrokerService implements Se
     private final List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>();
     private final List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>();
     private final List<Service> services = new ArrayList<Service>();
-    private MasterConnector masterConnector;
-    private String masterConnectorURI;
     private transient Thread shutdownHook;
     private String[] transportConnectorURIs;
     private String[] networkConnectorURIs;
@@ -397,27 +393,10 @@ public class BrokerService implements Se
     }
 
     /**
-     * @return Returns the masterConnectorURI.
-     */
-    public String getMasterConnectorURI() {
-        return masterConnectorURI;
-    }
-
-    /**
-     * @param masterConnectorURI
-     *            The masterConnectorURI to set.
-     */
-    public void setMasterConnectorURI(String masterConnectorURI) {
-        this.masterConnectorURI = masterConnectorURI;
-    }
-
-    /**
      * @return true if this Broker is a slave to a Master
      */
     public boolean isSlave() {
-        return (masterConnector != null && masterConnector.isSlave()) ||
-            (masterConnector != null && masterConnector.isStoppedBeforeStart()) ||
-            (masterConnector == null && slave);
+        return slave;
     }
 
     public void masterFailed() {
@@ -624,22 +603,11 @@ public class BrokerService implements Se
             managedBroker.setContextBroker(broker);
             adminView.setBroker(managedBroker);
         }
-        // see if there is a MasterBroker service and if so, configure
-        // it and start it.
-        for (Service service : services) {
-            if (service instanceof MasterConnector) {
-                configureService(service);
-                service.start();
-            }
-        }
-        if (!isSlave() && (masterConnector == null || isShutdownOnMasterFailure() == false)) {
+
+        if (!isSlave()) {
             startAllConnectors();
         }
-        if (!stopped.get()) {
-            if (isUseJmx() && masterConnector != null) {
-                registerFTConnectorMBean(masterConnector);
-            }
-        }
+
         if (ioExceptionHandler == null) {
             setIoExceptionHandler(new DefaultIOExceptionHandler());
         }
@@ -723,24 +691,7 @@ public class BrokerService implements Se
             stopped.set(true);
             stoppedLatch.countDown();
         }
-        if (masterConnectorURI == null) {
-            // master start has not finished yet
-            if (slaveStartSignal.getCount() == 1) {
-                started.set(false);
-                slaveStartSignal.countDown();
-            }
-        } else {
-            for (Service service : services) {
-                if (service instanceof MasterConnector) {
-                    MasterConnector mConnector = (MasterConnector) service;
-                    if (!mConnector.isSlave()) {
-                        // means should be slave but not connected to master yet
-                        started.set(false);
-                        mConnector.stopBeforeConnected();
-                    }
-                }
-            }
-        }
+
         if (this.taskRunnerFactory != null) {
             this.taskRunnerFactory.shutdown();
             this.taskRunnerFactory = null;
@@ -1794,20 +1745,6 @@ public class BrokerService implements Se
                 addJmsConnector(jmsBridgeConnectors[i]);
             }
         }
-        for (Service service : services) {
-            if (service instanceof MasterConnector) {
-                masterServiceExists = true;
-                break;
-            }
-        }
-        if (masterConnectorURI != null) {
-            if (masterServiceExists) {
-                throw new IllegalStateException(
-                        "Cannot specify masterConnectorURI when a masterConnector is already registered via the services property");
-            } else {
-                addService(new MasterConnector(masterConnectorURI));
-            }
-        }
     }
 
     protected void checkSystemUsageLimits() throws IOException {
@@ -2005,16 +1942,7 @@ public class BrokerService implements Se
         }
     }
 
-    protected void registerFTConnectorMBean(MasterConnector connector) throws IOException {
-        FTConnectorView view = new FTConnectorView(connector);
-        try {
-            ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
-                    + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=MasterConnector");
-            AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
-        } catch (Throwable e) {
-            throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
-        }
-    }
+
 
     protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException {
         JmsConnectorView view = new JmsConnectorView(connector);
@@ -2304,20 +2232,6 @@ public class BrokerService implements Se
         return BrokerSupport.getConnectionContext(getBroker());
     }
 
-    protected void waitForSlave() {
-        try {
-            if (!slaveStartSignal.await(waitForSlaveTimeout, TimeUnit.MILLISECONDS)) {
-                throw new IllegalStateException("Gave up waiting for slave to start after " + waitForSlaveTimeout + " milliseconds.");
-            }
-        } catch (InterruptedException e) {
-            LOG.error("Exception waiting for slave:" + e);
-        }
-    }
-
-    protected void slaveConnectionEstablished() {
-        slaveStartSignal.countDown();
-    }
-
     protected void startManagementContext() throws Exception {
         getManagementContext().setBrokerName(brokerName);
         getManagementContext().start();
@@ -2351,9 +2265,7 @@ public class BrokerService implements Se
             map.put("network", "true");
             map.put("async", "false");
             uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
-            if (isWaitForSlave()) {
-                waitForSlave();
-            }
+
             if (!stopped.get()) {
                 ThreadPoolExecutor networkConnectorStartExecutor = null;
                 if (isNetworkConnectorStartAsync()) {
@@ -2445,12 +2357,6 @@ public class BrokerService implements Se
             BrokerServiceAware serviceAware = (BrokerServiceAware) service;
             serviceAware.setBrokerService(this);
         }
-        if (masterConnector == null) {
-            if (service instanceof MasterConnector) {
-                masterConnector = (MasterConnector) service;
-                supportFailOver = true;
-            }
-        }
     }
 
     public void handleIOException(IOException exception) {
@@ -2621,10 +2527,6 @@ public class BrokerService implements Se
         this.waitForSlaveTimeout = waitForSlaveTimeout;
     }
 
-    public CountDownLatch getSlaveStartSignal() {
-        return slaveStartSignal;
-    }
-
     /**
      * Get the passiveSlave
      * @return the passiveSlave

Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1408651&r1=1408650&r2=1408651&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java Tue Nov 13 09:31:37 2012
@@ -37,7 +37,6 @@ import java.util.concurrent.locks.Reentr
 
 import javax.transaction.xa.XAResource;
 import org.apache.activemq.advisory.AdvisorySupport;
-import org.apache.activemq.broker.ft.MasterBroker;
 import org.apache.activemq.broker.region.ConnectionStatistics;
 import org.apache.activemq.broker.region.RegionBroker;
 import org.apache.activemq.command.*;
@@ -86,7 +85,6 @@ public class TransportConnection impleme
     protected TaskRunner taskRunner;
     protected final AtomicReference<IOException> transportException = new AtomicReference<IOException>();
     protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
-    private MasterBroker masterBroker;
     private final Transport transport;
     private MessageAuthorizationPolicy messageAuthorizationPolicy;
     private WireFormatInfo wireFormatInfo;
@@ -645,13 +643,6 @@ public class TransportConnection impleme
     }
 
     public Response processAddConnection(ConnectionInfo info) throws Exception {
-        // if the broker service has slave attached, wait for the slave to be
-        // attached to allow client connection. slave connection is fine
-        if (!info.isBrokerMasterConnector() && connector.getBrokerService().isWaitForSlave()
-                && connector.getBrokerService().getSlaveStartSignal().getCount() == 1) {
-            ServiceSupport.dispose(transport);
-            return new ExceptionResponse(new Exception("Master's slave not attached yet."));
-        }
         // Older clients should have been defaulting this field to true.. but
         // they were not.
         if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
@@ -1023,9 +1014,6 @@ public class TransportConnection impleme
         connector.onStopped(this);
         try {
             synchronized (this) {
-                if (masterBroker != null) {
-                    masterBroker.stop();
-                }
                 if (duplexBridge != null) {
                     duplexBridge.stop();
                 }
@@ -1216,19 +1204,7 @@ public class TransportConnection impleme
     public Response processBrokerInfo(BrokerInfo info) {
         if (info.isSlaveBroker()) {
             BrokerService bService = connector.getBrokerService();
-            // Do we only support passive slaves - or does the slave want to be
-            // passive ?
-            boolean passive = bService.isPassiveSlave() || info.isPassiveSlave();
-            if (passive == false) {
-
-                // stream messages from this broker (the master) to
-                // the slave
-                MutableBrokerFilter parent = (MutableBrokerFilter) broker.getAdaptor(MutableBrokerFilter.class);
-                masterBroker = new MasterBroker(parent, transport);
-                masterBroker.startProcessing();
-            }
-            LOG.info((passive ? "Passive" : "Active") + " Slave Broker " + info.getBrokerName() + " is attached");
-            bService.slaveConnectionEstablished();
+            LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: " + info.getBrokerName());
         } else if (info.isNetworkConnection() && info.isDuplexConnection()) {
             // so this TransportConnection is the rear end of a network bridge
             // We have been requested to create a two way pipe ...