You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2012/11/13 10:31:38 UTC
svn commit: r1408651 - in
/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker:
BrokerService.java TransportConnection.java ft/ jmx/FTConnectorView.java
jmx/FTConnectorViewMBean.java
Author: rajdavies
Date: Tue Nov 13 09:31:37 2012
New Revision: 1408651
URL: http://svn.apache.org/viewvc?rev=1408651&view=rev
Log:
Fix for https://issues.apache.org/jira/browse/AMQ-4165 - remove pure master/slave functionality
Removed:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ft/
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/FTConnectorView.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/FTConnectorViewMBean.java
Modified:
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1408651&r1=1408650&r2=1408651&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java Tue Nov 13 09:31:37 2012
@@ -21,7 +21,6 @@ import org.apache.activemq.Configuration
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisoryBroker;
import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
-import org.apache.activemq.broker.ft.MasterConnector;
import org.apache.activemq.broker.jmx.*;
import org.apache.activemq.broker.region.*;
import org.apache.activemq.broker.region.policy.PolicyMap;
@@ -78,7 +77,6 @@ import java.util.concurrent.atomic.Atomi
* @org.apache.xbean.XBean
*/
public class BrokerService implements Service {
- protected CountDownLatch slaveStartSignal = new CountDownLatch(1);
public static final String DEFAULT_PORT = "61616";
public static final String LOCAL_HOST_NAME;
public static final String BROKER_VERSION;
@@ -123,8 +121,6 @@ public class BrokerService implements Se
private final List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>();
private final List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>();
private final List<Service> services = new ArrayList<Service>();
- private MasterConnector masterConnector;
- private String masterConnectorURI;
private transient Thread shutdownHook;
private String[] transportConnectorURIs;
private String[] networkConnectorURIs;
@@ -397,27 +393,10 @@ public class BrokerService implements Se
}
/**
- * @return Returns the masterConnectorURI.
- */
- public String getMasterConnectorURI() {
- return masterConnectorURI;
- }
-
- /**
- * @param masterConnectorURI
- * The masterConnectorURI to set.
- */
- public void setMasterConnectorURI(String masterConnectorURI) {
- this.masterConnectorURI = masterConnectorURI;
- }
-
- /**
* @return true if this Broker is a slave to a Master
*/
public boolean isSlave() {
- return (masterConnector != null && masterConnector.isSlave()) ||
- (masterConnector != null && masterConnector.isStoppedBeforeStart()) ||
- (masterConnector == null && slave);
+ return slave;
}
public void masterFailed() {
@@ -624,22 +603,11 @@ public class BrokerService implements Se
managedBroker.setContextBroker(broker);
adminView.setBroker(managedBroker);
}
- // see if there is a MasterBroker service and if so, configure
- // it and start it.
- for (Service service : services) {
- if (service instanceof MasterConnector) {
- configureService(service);
- service.start();
- }
- }
- if (!isSlave() && (masterConnector == null || isShutdownOnMasterFailure() == false)) {
+
+ if (!isSlave()) {
startAllConnectors();
}
- if (!stopped.get()) {
- if (isUseJmx() && masterConnector != null) {
- registerFTConnectorMBean(masterConnector);
- }
- }
+
if (ioExceptionHandler == null) {
setIoExceptionHandler(new DefaultIOExceptionHandler());
}
@@ -723,24 +691,7 @@ public class BrokerService implements Se
stopped.set(true);
stoppedLatch.countDown();
}
- if (masterConnectorURI == null) {
- // master start has not finished yet
- if (slaveStartSignal.getCount() == 1) {
- started.set(false);
- slaveStartSignal.countDown();
- }
- } else {
- for (Service service : services) {
- if (service instanceof MasterConnector) {
- MasterConnector mConnector = (MasterConnector) service;
- if (!mConnector.isSlave()) {
- // means should be slave but not connected to master yet
- started.set(false);
- mConnector.stopBeforeConnected();
- }
- }
- }
- }
+
if (this.taskRunnerFactory != null) {
this.taskRunnerFactory.shutdown();
this.taskRunnerFactory = null;
@@ -1794,20 +1745,6 @@ public class BrokerService implements Se
addJmsConnector(jmsBridgeConnectors[i]);
}
}
- for (Service service : services) {
- if (service instanceof MasterConnector) {
- masterServiceExists = true;
- break;
- }
- }
- if (masterConnectorURI != null) {
- if (masterServiceExists) {
- throw new IllegalStateException(
- "Cannot specify masterConnectorURI when a masterConnector is already registered via the services property");
- } else {
- addService(new MasterConnector(masterConnectorURI));
- }
- }
}
protected void checkSystemUsageLimits() throws IOException {
@@ -2005,16 +1942,7 @@ public class BrokerService implements Se
}
}
- protected void registerFTConnectorMBean(MasterConnector connector) throws IOException {
- FTConnectorView view = new FTConnectorView(connector);
- try {
- ObjectName objectName = new ObjectName(getManagementContext().getJmxDomainName() + ":" + "BrokerName="
- + JMXSupport.encodeObjectNamePart(getBrokerName()) + "," + "Type=MasterConnector");
- AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
- } catch (Throwable e) {
- throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
- }
- }
+
protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException {
JmsConnectorView view = new JmsConnectorView(connector);
@@ -2304,20 +2232,6 @@ public class BrokerService implements Se
return BrokerSupport.getConnectionContext(getBroker());
}
- protected void waitForSlave() {
- try {
- if (!slaveStartSignal.await(waitForSlaveTimeout, TimeUnit.MILLISECONDS)) {
- throw new IllegalStateException("Gave up waiting for slave to start after " + waitForSlaveTimeout + " milliseconds.");
- }
- } catch (InterruptedException e) {
- LOG.error("Exception waiting for slave:" + e);
- }
- }
-
- protected void slaveConnectionEstablished() {
- slaveStartSignal.countDown();
- }
-
protected void startManagementContext() throws Exception {
getManagementContext().setBrokerName(brokerName);
getManagementContext().start();
@@ -2351,9 +2265,7 @@ public class BrokerService implements Se
map.put("network", "true");
map.put("async", "false");
uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
- if (isWaitForSlave()) {
- waitForSlave();
- }
+
if (!stopped.get()) {
ThreadPoolExecutor networkConnectorStartExecutor = null;
if (isNetworkConnectorStartAsync()) {
@@ -2445,12 +2357,6 @@ public class BrokerService implements Se
BrokerServiceAware serviceAware = (BrokerServiceAware) service;
serviceAware.setBrokerService(this);
}
- if (masterConnector == null) {
- if (service instanceof MasterConnector) {
- masterConnector = (MasterConnector) service;
- supportFailOver = true;
- }
- }
}
public void handleIOException(IOException exception) {
@@ -2621,10 +2527,6 @@ public class BrokerService implements Se
this.waitForSlaveTimeout = waitForSlaveTimeout;
}
- public CountDownLatch getSlaveStartSignal() {
- return slaveStartSignal;
- }
-
/**
* Get the passiveSlave
* @return the passiveSlave
Modified: activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1408651&r1=1408650&r2=1408651&view=diff
==============================================================================
--- activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java Tue Nov 13 09:31:37 2012
@@ -37,7 +37,6 @@ import java.util.concurrent.locks.Reentr
import javax.transaction.xa.XAResource;
import org.apache.activemq.advisory.AdvisorySupport;
-import org.apache.activemq.broker.ft.MasterBroker;
import org.apache.activemq.broker.region.ConnectionStatistics;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.*;
@@ -86,7 +85,6 @@ public class TransportConnection impleme
protected TaskRunner taskRunner;
protected final AtomicReference<IOException> transportException = new AtomicReference<IOException>();
protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
- private MasterBroker masterBroker;
private final Transport transport;
private MessageAuthorizationPolicy messageAuthorizationPolicy;
private WireFormatInfo wireFormatInfo;
@@ -645,13 +643,6 @@ public class TransportConnection impleme
}
public Response processAddConnection(ConnectionInfo info) throws Exception {
- // if the broker service has slave attached, wait for the slave to be
- // attached to allow client connection. slave connection is fine
- if (!info.isBrokerMasterConnector() && connector.getBrokerService().isWaitForSlave()
- && connector.getBrokerService().getSlaveStartSignal().getCount() == 1) {
- ServiceSupport.dispose(transport);
- return new ExceptionResponse(new Exception("Master's slave not attached yet."));
- }
// Older clients should have been defaulting this field to true.. but
// they were not.
if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
@@ -1023,9 +1014,6 @@ public class TransportConnection impleme
connector.onStopped(this);
try {
synchronized (this) {
- if (masterBroker != null) {
- masterBroker.stop();
- }
if (duplexBridge != null) {
duplexBridge.stop();
}
@@ -1216,19 +1204,7 @@ public class TransportConnection impleme
public Response processBrokerInfo(BrokerInfo info) {
if (info.isSlaveBroker()) {
BrokerService bService = connector.getBrokerService();
- // Do we only support passive slaves - or does the slave want to be
- // passive ?
- boolean passive = bService.isPassiveSlave() || info.isPassiveSlave();
- if (passive == false) {
-
- // stream messages from this broker (the master) to
- // the slave
- MutableBrokerFilter parent = (MutableBrokerFilter) broker.getAdaptor(MutableBrokerFilter.class);
- masterBroker = new MasterBroker(parent, transport);
- masterBroker.startProcessing();
- }
- LOG.info((passive ? "Passive" : "Active") + " Slave Broker " + info.getBrokerName() + " is attached");
- bService.slaveConnectionEstablished();
+ LOG.error(" Slave Brokers are no longer supported - slave trying to attach is: " + info.getBrokerName());
} else if (info.isNetworkConnection() && info.isDuplexConnection()) {
// so this TransportConnection is the rear end of a network bridge
// We have been requested to create a two way pipe ...