You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@activemq.apache.org by Gary Tully <ga...@gmail.com> on 2012/11/27 12:27:07 UTC
Re: svn commit: r1413846 - in /activemq/trunk: activemq-broker/src/main/java/org/apache/activemq/broker/
activemq-broker/src/main/java/org/apache/activemq/broker/jmx/
activemq-broker/src/main/java/org/apache/activemq/broker/region/ activemq-core/src/test/j...
there may be too much removed here. The adminview on isSlave was driven by
shared file system master slave, not puremaster slave. One related issue
is https://issues.apache.org/jira/browse/AMQ-3696. not sure where the
original requirement came from. But it makes sense to have some
jmxpresence for a broker waiting to lock a store. Even if the broker
is
waiting to get a shared broker lock, it is nice to see that.
w.r.t the webconsole, the lifecycle of that can be independent of the
broker, so I think it is good that it can reflect a slave status or a
"trying to obtain lock" status.
Maybe the solution here is some jmx instrumentation on the abstract
plugable locker.
On 26 November 2012 21:13, <ch...@apache.org> wrote:
> Author: chirino
> Date: Mon Nov 26 21:13:25 2012
> New Revision: 1413846
>
> URL: http://svn.apache.org/viewvc?rev=1413846&view=rev
> Log:
> Changes for https://issues.apache.org/jira/browse/AMQ-4165 : Remove pure
> master/slave functionality
>
> Removed:
>
> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/discovery/MasterSlaveDiscoveryTest.java
> Modified:
>
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
>
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java
>
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
>
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
>
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
>
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
>
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
>
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
>
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
>
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
>
> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
>
> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
>
> activemq/trunk/activemq-web-console/src/main/java/org/apache/activemq/web/filter/ApplicationContextFilter.java
>
> activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java
>
> activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java
>
> Modified:
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
> URL:
> http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=1413846&r1=1413845&r2=1413846&view=diff
>
> ==============================================================================
> ---
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
> (original)
> +++
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java
> Mon Nov 26 21:13:25 2012
> @@ -168,7 +168,6 @@ public class BrokerService implements Se
> private File schedulerDirectoryFile;
> private Scheduler scheduler;
> private ThreadPoolExecutor executor;
> - private boolean slave = true;
> private int schedulePeriodForDestinationPurge= 0;
> private int maxPurgedDestinationsPerSweep = 0;
> private BrokerContext brokerContext;
> @@ -392,13 +391,6 @@ public class BrokerService implements Se
> return null;
> }
>
> - /**
> - * @return true if this Broker is a slave to a Master
> - */
> - public boolean isSlave() {
> - return slave;
> - }
> -
> public void masterFailed() {
> if (shutdownOnMasterFailure) {
> LOG.error("The Master has failed ... shutting down");
> @@ -578,7 +570,6 @@ public class BrokerService implements Se
> if (startException != null) {
> return;
> }
> - slave = false;
> startDestinations();
> addShutdownHook();
>
> @@ -604,9 +595,7 @@ public class BrokerService implements Se
> adminView.setBroker(managedBroker);
> }
>
> - if (!isSlave()) {
> - startAllConnectors();
> - }
> + startAllConnectors();
>
> if (ioExceptionHandler == null) {
> setIoExceptionHandler(new DefaultIOExceptionHandler());
> @@ -680,7 +669,6 @@ public class BrokerService implements Se
> try {
> stopper.stop(persistenceAdapter);
> persistenceAdapter = null;
> - slave = true;
> if (isUseJmx()) {
> stopper.stop(getManagementContext());
> managementContext = null;
> @@ -1227,8 +1215,7 @@ public class BrokerService implements Se
> }
>
> /**
> - * Sets the services associated with this broker such as a
> - * {@link MasterConnector}
> + * Sets the services associated with this broker.
> */
> public void setServices(Service[] services) {
> this.services.clear();
> @@ -2246,82 +2233,80 @@ public class BrokerService implements Se
> * @throws Exception
> */
> public void startAllConnectors() throws Exception {
> - if (!isSlave()) {
> - Set<ActiveMQDestination> durableDestinations =
> getBroker().getDurableDestinations();
> - List<TransportConnector> al = new
> ArrayList<TransportConnector>();
> - for (Iterator<TransportConnector> iter =
> getTransportConnectors().iterator(); iter.hasNext();) {
> - TransportConnector connector = iter.next();
> - connector.setBrokerService(this);
> - al.add(startTransportConnector(connector));
> - }
> - if (al.size() > 0) {
> - // let's clear the transportConnectors list and replace
> it with
> - // the started transportConnector instances
> - this.transportConnectors.clear();
> - setTransportConnectors(al);
> - }
> - URI uri = getVmConnectorURI();
> - Map<String, String> map = new HashMap<String,
> String>(URISupport.parseParameters(uri));
> - map.put("network", "true");
> - map.put("async", "false");
> - uri = URISupport.createURIWithQuery(uri,
> URISupport.createQueryString(map));
> -
> - if (!stopped.get()) {
> - ThreadPoolExecutor networkConnectorStartExecutor = null;
> - if (isNetworkConnectorStartAsync()) {
> - // spin up as many threads as needed
> - networkConnectorStartExecutor = new
> ThreadPoolExecutor(0, Integer.MAX_VALUE,
> - 10, TimeUnit.SECONDS, new
> SynchronousQueue<Runnable>(),
> - new ThreadFactory() {
> - int count=0;
> - public Thread newThread(Runnable
> runnable) {
> - Thread thread = new Thread(runnable,
> "NetworkConnector Start Thread-" +(count++));
> - thread.setDaemon(true);
> - return thread;
> - }
> - });
> - }
> + Set<ActiveMQDestination> durableDestinations =
> getBroker().getDurableDestinations();
> + List<TransportConnector> al = new ArrayList<TransportConnector>();
> + for (Iterator<TransportConnector> iter =
> getTransportConnectors().iterator(); iter.hasNext();) {
> + TransportConnector connector = iter.next();
> + connector.setBrokerService(this);
> + al.add(startTransportConnector(connector));
> + }
> + if (al.size() > 0) {
> + // let's clear the transportConnectors list and replace it
> with
> + // the started transportConnector instances
> + this.transportConnectors.clear();
> + setTransportConnectors(al);
> + }
> + URI uri = getVmConnectorURI();
> + Map<String, String> map = new HashMap<String,
> String>(URISupport.parseParameters(uri));
> + map.put("network", "true");
> + map.put("async", "false");
> + uri = URISupport.createURIWithQuery(uri,
> URISupport.createQueryString(map));
>
> - for (Iterator<NetworkConnector> iter =
> getNetworkConnectors().iterator(); iter.hasNext();) {
> - final NetworkConnector connector = iter.next();
> - connector.setLocalUri(uri);
> - connector.setBrokerName(getBrokerName());
> - connector.setDurableDestinations(durableDestinations);
> - if (getDefaultSocketURIString() != null) {
> -
> connector.setBrokerURL(getDefaultSocketURIString());
> - }
> - if (networkConnectorStartExecutor != null) {
> - networkConnectorStartExecutor.execute(new
> Runnable() {
> - public void run() {
> - try {
> - LOG.info("Async start of " +
> connector);
> - connector.start();
> - } catch(Exception e) {
> - LOG.error("Async start of network
> connector: " + connector + " failed", e);
> - }
> + if (!stopped.get()) {
> + ThreadPoolExecutor networkConnectorStartExecutor = null;
> + if (isNetworkConnectorStartAsync()) {
> + // spin up as many threads as needed
> + networkConnectorStartExecutor = new ThreadPoolExecutor(0,
> Integer.MAX_VALUE,
> + 10, TimeUnit.SECONDS, new
> SynchronousQueue<Runnable>(),
> + new ThreadFactory() {
> + int count=0;
> + public Thread newThread(Runnable runnable) {
> + Thread thread = new Thread(runnable,
> "NetworkConnector Start Thread-" +(count++));
> + thread.setDaemon(true);
> + return thread;
> }
> });
> - } else {
> - connector.start();
> - }
> - }
> - if (networkConnectorStartExecutor != null) {
> - // executor done when enqueued tasks are complete
> -
> ThreadPoolUtils.shutdown(networkConnectorStartExecutor);
> - }
> + }
>
> - for (Iterator<ProxyConnector> iter =
> getProxyConnectors().iterator(); iter.hasNext();) {
> - ProxyConnector connector = iter.next();
> - connector.start();
> + for (Iterator<NetworkConnector> iter =
> getNetworkConnectors().iterator(); iter.hasNext();) {
> + final NetworkConnector connector = iter.next();
> + connector.setLocalUri(uri);
> + connector.setBrokerName(getBrokerName());
> + connector.setDurableDestinations(durableDestinations);
> + if (getDefaultSocketURIString() != null) {
> + connector.setBrokerURL(getDefaultSocketURIString());
> }
> - for (Iterator<JmsConnector> iter =
> jmsConnectors.iterator(); iter.hasNext();) {
> - JmsConnector connector = iter.next();
> + if (networkConnectorStartExecutor != null) {
> + networkConnectorStartExecutor.execute(new Runnable() {
> + public void run() {
> + try {
> + LOG.info("Async start of " + connector);
> + connector.start();
> + } catch(Exception e) {
> + LOG.error("Async start of network
> connector: " + connector + " failed", e);
> + }
> + }
> + });
> + } else {
> connector.start();
> }
> - for (Service service : services) {
> - configureService(service);
> - service.start();
> - }
> + }
> + if (networkConnectorStartExecutor != null) {
> + // executor done when enqueued tasks are complete
> + ThreadPoolUtils.shutdown(networkConnectorStartExecutor);
> + }
> +
> + for (Iterator<ProxyConnector> iter =
> getProxyConnectors().iterator(); iter.hasNext();) {
> + ProxyConnector connector = iter.next();
> + connector.start();
> + }
> + for (Iterator<JmsConnector> iter = jmsConnectors.iterator();
> iter.hasNext();) {
> + JmsConnector connector = iter.next();
> + connector.start();
> + }
> + for (Service service : services) {
> + configureService(service);
> + service.start();
> }
> }
> }
>
> Modified:
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java
> URL:
> http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java?rev=1413846&r1=1413845&r2=1413846&view=diff
>
> ==============================================================================
> ---
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java
> (original)
> +++
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/ConnectionContext.java
> Mon Nov 26 21:13:25 2012
> @@ -296,13 +296,6 @@ public class ConnectionContext {
> }
>
> /**
> - * @return the slave
> - */
> - public boolean isSlave() {
> - return (this.broker != null &&
> this.broker.getBrokerService().isSlave()) || !this.clientMaster;
> - }
> -
> - /**
> * @return the clientMaster
> */
> public boolean isClientMaster() {
>
> Modified:
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
> URL:
> http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=1413846&r1=1413845&r2=1413846&view=diff
>
> ==============================================================================
> ---
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
> (original)
> +++
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
> Mon Nov 26 21:13:25 2012
> @@ -186,10 +186,6 @@ public class BrokerView implements Broke
> return brokerService.isPersistent();
> }
>
> - public boolean isSlave() {
> - return brokerService.isSlave();
> - }
> -
> public void terminateJVM(int exitCode) {
> System.exit(exitCode);
> }
>
> Modified:
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
> URL:
> http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java?rev=1413846&r1=1413845&r2=1413846&view=diff
>
> ==============================================================================
> ---
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
> (original)
> +++
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
> Mon Nov 26 21:13:25 2012
> @@ -115,9 +115,6 @@ public interface BrokerViewMBean extends
> @MBeanInfo("Messages are synchronized to disk.")
> boolean isPersistent();
>
> - @MBeanInfo("Slave broker.")
> - boolean isSlave();
> -
> /**
> * Shuts down the JVM.
> *
>
> Modified:
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
> URL:
> http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java?rev=1413846&r1=1413845&r2=1413846&view=diff
>
> ==============================================================================
> ---
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
> (original)
> +++
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/AbstractSubscription.java
> Mon Nov 26 21:13:25 2012
> @@ -116,10 +116,6 @@ public abstract class AbstractSubscripti
> public void gc() {
> }
>
> - public boolean isSlave() {
> - return broker.getBrokerService().isSlave();
> - }
> -
> public ConnectionContext getContext() {
> return context;
> }
>
> Modified:
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
> URL:
> http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=1413846&r1=1413845&r2=1413846&view=diff
>
> ==============================================================================
> ---
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
> (original)
> +++
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
> Mon Nov 26 21:13:25 2012
> @@ -92,7 +92,7 @@ public abstract class PrefetchSubscripti
> // The slave should not deliver pull messages.
> // TODO: when the slave becomes a master, He should send a NULL
> message to all the
> // consumers to 'wake them up' in case they were waiting for a
> message.
> - if (getPrefetchSize() == 0 && !isSlave()) {
> + if (getPrefetchSize() == 0) {
>
> prefetchExtension.incrementAndGet();
> final long dispatchCounterBeforePull = dispatchCounter;
> @@ -194,13 +194,12 @@ public abstract class PrefetchSubscripti
> boolean callDispatchMatched = false;
> Destination destination = null;
>
> - if (!isSlave()) {
> - if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS))
> {
> - // suppress unexpected ack exception in this expected case
> - LOG.warn("Ignoring ack received before dispatch; result
> of failover with an outstanding ack. Acked messages will be replayed if
> present on this broker. Ignored ack: " + ack);
> - return;
> - }
> + if (!okForAckAsDispatchDone.await(0l, TimeUnit.MILLISECONDS)) {
> + // suppress unexpected ack exception in this expected case
> + LOG.warn("Ignoring ack received before dispatch; result of
> failover with an outstanding ack. Acked messages will be replayed if
> present on this broker. Ignored ack: " + ack);
> + return;
> }
> +
> if (LOG.isTraceEnabled()) {
> LOG.trace("ack:" + ack);
> }
> @@ -413,15 +412,8 @@ public abstract class PrefetchSubscripti
> destination.wakeup();
> dispatchPending();
> } else {
> - if (isSlave()) {
> - throw new JMSException(
> - "Slave broker out of sync with master:
> Acknowledgment ("
> - + ack + ") was not in the dispatch list: "
> - + dispatched);
> - } else {
> - LOG.debug("Acknowledgment out of sync (Normally occurs
> when failover connection reconnects): "
> - + ack);
> - }
> + LOG.debug("Acknowledgment out of sync (Normally occurs when
> failover connection reconnects): "
> + + ack);
> }
> }
>
> @@ -447,11 +439,7 @@ public abstract class PrefetchSubscripti
> @Override
> public void afterRollback() throws Exception {
> synchronized(dispatchLock) {
> - if (isSlave()) {
> -
> ((Destination)node.getRegionDestination()).getDestinationStatistics().getInflight().decrement();
> - } else {
> - // poisionAck will decrement - otherwise
> still inflight on client
> - }
> + // poisionAck will decrement - otherwise
> still inflight on client
> }
> }
> });
> @@ -617,53 +605,51 @@ public abstract class PrefetchSubscripti
> }
>
> protected void dispatchPending() throws IOException {
> - if (!isSlave()) {
> - synchronized(pendingLock) {
> - try {
> - int numberToDispatch = countBeforeFull();
> - if (numberToDispatch > 0) {
> - setSlowConsumer(false);
> - setPendingBatchSize(pending, numberToDispatch);
> - int count = 0;
> - pending.reset();
> - while (pending.hasNext() && !isFull()
> - && count < numberToDispatch) {
> - MessageReference node = pending.next();
> - if (node == null) {
> - break;
> - }
> + synchronized(pendingLock) {
> + try {
> + int numberToDispatch = countBeforeFull();
> + if (numberToDispatch > 0) {
> + setSlowConsumer(false);
> + setPendingBatchSize(pending, numberToDispatch);
> + int count = 0;
> + pending.reset();
> + while (pending.hasNext() && !isFull()
> + && count < numberToDispatch) {
> + MessageReference node = pending.next();
> + if (node == null) {
> + break;
> + }
> +
> + // Synchronize between dispatched list and remove
> of message from pending list
> + // related to remove subscription action
> + synchronized(dispatchLock) {
> + pending.remove();
> + node.decrementReferenceCount();
> + if( !isDropped(node) && canDispatch(node)) {
>
> - // Synchronize between dispatched list and
> remove of message from pending list
> - // related to remove subscription action
> - synchronized(dispatchLock) {
> - pending.remove();
> - node.decrementReferenceCount();
> - if( !isDropped(node) &&
> canDispatch(node)) {
> -
> - // Message may have been sitting in
> the pending
> - // list a while waiting for the
> consumer to ak the message.
> - if
> (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
> - //increment number to dispatch
> - numberToDispatch++;
> - if (broker.isExpired(node)) {
> -
> ((Destination)node.getRegionDestination()).messageExpired(context, this,
> node);
> - }
> - continue;
> + // Message may have been sitting in the
> pending
> + // list a while waiting for the consumer
> to ak the message.
> + if
> (node!=QueueMessageReference.NULL_MESSAGE && node.isExpired()) {
> + //increment number to dispatch
> + numberToDispatch++;
> + if (broker.isExpired(node)) {
> +
> ((Destination)node.getRegionDestination()).messageExpired(context, this,
> node);
> }
> - dispatch(node);
> - count++;
> + continue;
> }
> + dispatch(node);
> + count++;
> }
> }
> - } else if (!isSlowConsumer()) {
> - setSlowConsumer(true);
> - for (Destination dest :destinations) {
> - dest.slowConsumer(context, this);
> - }
> }
> - } finally {
> - pending.release();
> + } else if (!isSlowConsumer()) {
> + setSlowConsumer(true);
> + for (Destination dest :destinations) {
> + dest.slowConsumer(context, this);
> + }
> }
> + } finally {
> + pending.release();
> }
> }
> }
> @@ -682,42 +668,37 @@ public abstract class PrefetchSubscripti
> okForAckAsDispatchDone.countDown();
>
> // No reentrant lock - Patch needed to IndirectMessageReference
> on method lock
> - if (!isSlave()) {
> -
> - MessageDispatch md = createMessageDispatch(node, message);
> - // NULL messages don't count... they don't get Acked.
> - if (node != QueueMessageReference.NULL_MESSAGE) {
> - dispatchCounter++;
> - dispatched.add(node);
> - } else {
> - while (true) {
> - int currentExtension = prefetchExtension.get();
> - int newExtension = Math.max(0, currentExtension - 1);
> - if (prefetchExtension.compareAndSet(currentExtension,
> newExtension)) {
> - break;
> - }
> + MessageDispatch md = createMessageDispatch(node, message);
> + // NULL messages don't count... they don't get Acked.
> + if (node != QueueMessageReference.NULL_MESSAGE) {
> + dispatchCounter++;
> + dispatched.add(node);
> + } else {
> + while (true) {
> + int currentExtension = prefetchExtension.get();
> + int newExtension = Math.max(0, currentExtension - 1);
> + if (prefetchExtension.compareAndSet(currentExtension,
> newExtension)) {
> + break;
> }
> }
> - if (info.isDispatchAsync()) {
> - md.setTransmitCallback(new Runnable() {
> + }
> + if (info.isDispatchAsync()) {
> + md.setTransmitCallback(new Runnable() {
>
> - public void run() {
> - // Since the message gets queued up in async
> dispatch,
> - // we don't want to
> - // decrease the reference count until it gets put
> on the
> - // wire.
> - onDispatch(node, message);
> - }
> - });
> - context.getConnection().dispatchAsync(md);
> - } else {
> - context.getConnection().dispatchSync(md);
> - onDispatch(node, message);
> - }
> - return true;
> + public void run() {
> + // Since the message gets queued up in async dispatch,
> + // we don't want to
> + // decrease the reference count until it gets put on
> the
> + // wire.
> + onDispatch(node, message);
> + }
> + });
> + context.getConnection().dispatchAsync(md);
> } else {
> - return false;
> + context.getConnection().dispatchSync(md);
> + onDispatch(node, message);
> }
> + return true;
> }
>
> protected void onDispatch(final MessageReference node, final Message
> message) {
>
> Modified:
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
> URL:
> http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=1413846&r1=1413845&r2=1413846&view=diff
>
> ==============================================================================
> ---
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
> (original)
> +++
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
> Mon Nov 26 21:13:25 2012
> @@ -471,13 +471,13 @@ public class Queue extends BaseDestinati
> browserDispatches.add(browserDispatch);
> }
>
> - if (!(this.optimizedDispatch || isSlave())) {
> + if (!this.optimizedDispatch) {
> wakeup();
> }
> }finally {
> pagedInPendingDispatchLock.writeLock().unlock();
> }
> - if (this.optimizedDispatch || isSlave()) {
> + if (this.optimizedDispatch) {
> // Outside of dispatchLock() to maintain the lock hierarchy of
> // iteratingMutex -> dispatchLock. - see
> // https://issues.apache.org/activemq/browse/AMQ-1878
> @@ -578,13 +578,13 @@ public class Queue extends BaseDestinati
> }finally {
> consumersLock.writeLock().unlock();
> }
> - if (!(this.optimizedDispatch || isSlave())) {
> + if (!this.optimizedDispatch) {
> wakeup();
> }
> }finally {
> pagedInPendingDispatchLock.writeLock().unlock();
> }
> - if (this.optimizedDispatch || isSlave()) {
> + if (this.optimizedDispatch) {
> // Outside of dispatchLock() to maintain the lock hierarchy of
> // iteratingMutex -> dispatchLock. - see
> // https://issues.apache.org/activemq/browse/AMQ-1878
> @@ -1704,7 +1704,7 @@ public class Queue extends BaseDestinati
> }
>
> public void wakeup() {
> - if ((optimizedDispatch || isSlave()) && !iterationRunning) {
> + if (optimizedDispatch && !iterationRunning) {
> iterate();
> pendingWakeups.incrementAndGet();
> } else {
> @@ -1721,10 +1721,6 @@ public class Queue extends BaseDestinati
> }
> }
>
> - private boolean isSlave() {
> - return broker.getBrokerService().isSlave();
> - }
> -
> private void doPageIn(boolean force) throws Exception {
> PendingList newlyPaged = doPageInForDispatch(force);
> pagedInPendingDispatchLock.writeLock().lock();
> @@ -1875,7 +1871,7 @@ public class Queue extends BaseDestinati
> consumersLock.writeLock().lock();
>
> try {
> - if (this.consumers.isEmpty() || isSlave()) {
> + if (this.consumers.isEmpty()) {
> // slave dispatch happens in processDispatchNotification
> return list;
> }
>
> Modified:
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
> URL:
> http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1413846&r1=1413845&r2=1413846&view=diff
>
> ==============================================================================
> ---
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
> (original)
> +++
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
> Mon Nov 26 21:13:25 2012
> @@ -695,10 +695,6 @@ public class RegionBroker extends EmptyB
> }
> }
>
> - public boolean isSlaveBroker() {
> - return brokerService.isSlave();
> - }
> -
> @Override
> public boolean isStopped() {
> return !started;
>
> Modified:
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
> URL:
> http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java?rev=1413846&r1=1413845&r2=1413846&view=diff
>
> ==============================================================================
> ---
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
> (original)
> +++
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/Subscription.java
> Mon Nov 26 21:13:25 2012
> @@ -109,12 +109,7 @@ public interface Subscription extends Su
> * @throws Exception
> */
> void processMessageDispatchNotification(MessageDispatchNotification
> mdn) throws Exception;
> -
> - /**
> - * @return true if the broker is currently in slave mode
> - */
> - boolean isSlave();
> -
> +
> /**
> * @return number of messages pending delivery
> */
>
> Modified:
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
> URL:
> http://svn.apache.org/viewvc/activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=1413846&r1=1413845&r2=1413846&view=diff
>
> ==============================================================================
> ---
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
> (original)
> +++
> activemq/trunk/activemq-broker/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
> Mon Nov 26 21:13:25 2012
> @@ -101,7 +101,7 @@ public class TopicSubscription extends A
> return;
> }
> enqueueCounter.incrementAndGet();
> - if (!isFull() && matched.isEmpty() && !isSlave()) {
> + if (!isFull() && matched.isEmpty()) {
> // if maximumPendingMessages is set we will only discard
> messages which
> // have not been dispatched (i.e. we allow the prefetch
> buffer to be filled)
> dispatch(node);
> @@ -299,7 +299,7 @@ public class TopicSubscription extends A
> public Response pullMessage(ConnectionContext context, MessagePull
> pull) throws Exception {
>
> // The slave should not deliver pull messages.
> - if (getPrefetchSize() == 0 && !isSlave()) {
> + if (getPrefetchSize() == 0 ) {
>
> prefetchWindowOpen.set(true);
> dispatchMatched();
>
> Modified:
> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
> URL:
> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java?rev=1413846&r1=1413845&r2=1413846&view=diff
>
> ==============================================================================
> ---
> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
> (original)
> +++
> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/jmx/MBeanTest.java
> Mon Nov 26 21:13:25 2012
> @@ -338,7 +338,6 @@ public class MBeanTest extends EmbeddedB
> ObjectName brokerName = assertRegisteredObjectName(domain +
> ":Type=Broker,BrokerName=localhost");
> BrokerViewMBean broker =
> (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
> brokerName, BrokerViewMBean.class, true);
>
> - assertTrue("broker is not a slave", !broker.isSlave());
> // create 2 topics
> broker.addTopic(getDestinationString() + "1 ");
> broker.addTopic(" " + getDestinationString() + "2");
> @@ -536,7 +535,6 @@ public class MBeanTest extends EmbeddedB
> ObjectName brokerName = assertRegisteredObjectName(domain +
> ":Type=Broker,BrokerName=localhost");
> BrokerViewMBean broker =
> (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
> brokerName, BrokerViewMBean.class, true);
>
> - assertTrue("broker is not a slave", !broker.isSlave());
> // create 2 topics
> broker.addTopic(getDestinationString() + "1");
> broker.addTopic(getDestinationString() + "2");
> @@ -588,7 +586,6 @@ public class MBeanTest extends EmbeddedB
> ObjectName brokerName = assertRegisteredObjectName(domain +
> ":Type=Broker,BrokerName=localhost");
> BrokerViewMBean broker =
> (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
> brokerName, BrokerViewMBean.class, true);
>
> - assertTrue("broker is not a slave", !broker.isSlave());
> // create 2 topics
> broker.addTopic(getDestinationString() + "1");
> broker.addTopic(getDestinationString() + "2");
> @@ -797,7 +794,6 @@ public class MBeanTest extends EmbeddedB
> ObjectName brokerName = assertRegisteredObjectName(domain +
> ":Type=Broker,BrokerName=localhost");
> BrokerViewMBean broker =
> (BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(mbeanServer,
> brokerName, BrokerViewMBean.class, true);
>
> - assertTrue("broker is not a slave", !broker.isSlave());
> assertEquals(0, broker.getDynamicDestinationProducers().length);
>
> Session session = connection.createSession(false,
> Session.AUTO_ACKNOWLEDGE);
>
> Modified:
> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
> URL:
> http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java?rev=1413846&r1=1413845&r2=1413846&view=diff
>
> ==============================================================================
> ---
> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
> (original)
> +++
> activemq/trunk/activemq-core/src/test/java/org/apache/activemq/broker/region/QueueDuplicatesFromStoreTest.java
> Mon Nov 26 21:13:25 2012
> @@ -234,10 +234,6 @@ public class QueueDuplicatesFromStoreTes
> return false;
> }
>
> - public boolean isSlave() {
> - return false;
> - }
> -
> public boolean matches(MessageReference node,
> MessageEvaluationContext context) throws IOException {
> return true;
>
> Modified:
> activemq/trunk/activemq-web-console/src/main/java/org/apache/activemq/web/filter/ApplicationContextFilter.java
> URL:
> http://svn.apache.org/viewvc/activemq/trunk/activemq-web-console/src/main/java/org/apache/activemq/web/filter/ApplicationContextFilter.java?rev=1413846&r1=1413845&r2=1413846&view=diff
>
> ==============================================================================
> ---
> activemq/trunk/activemq-web-console/src/main/java/org/apache/activemq/web/filter/ApplicationContextFilter.java
> (original)
> +++
> activemq/trunk/activemq-web-console/src/main/java/org/apache/activemq/web/filter/ApplicationContextFilter.java
> Mon Nov 26 21:13:25 2012
> @@ -66,7 +66,6 @@ public class ApplicationContextFilter im
> private String applicationContextName = "applicationContext";
> private String requestContextName = "requestContext";
> private String requestName = "request";
> - private final String slavePage = "slave.jsp";
>
> public void init(FilterConfig config) throws ServletException {
> this.servletContext = config.getServletContext();
> @@ -85,19 +84,19 @@ public class ApplicationContextFilter im
> Map requestContextWrapper = createRequestContextWrapper(request);
> String path = ((HttpServletRequest)request).getRequestURI();
> // handle slave brokers
> - try {
> - if ( !(path.endsWith("css") || path.endsWith("png") ||
> path.endsWith("ico") || path.endsWith(slavePage))
> - &&
> ((BrokerFacade)requestContextWrapper.get("brokerQuery")).isSlave()) {
> - ((HttpServletResponse)response).sendRedirect(slavePage);
> - return;
> - }
> - } catch (Exception e) {
> - LOG.warn(path + ", failed to access BrokerFacade: reason: " +
> e.getLocalizedMessage());
> - if (LOG.isDebugEnabled()) {
> - LOG.debug(request.toString(), e);
> - }
> - throw new IOException(e);
> - }
> +// try {
> +// if ( !(path.endsWith("css") || path.endsWith("png") ||
> path.endsWith("ico") || path.endsWith(slavePage))
> +// &&
> ((BrokerFacade)requestContextWrapper.get("brokerQuery")).isSlave()) {
> +// ((HttpServletResponse)response).sendRedirect(slavePage);
> +// return;
> +// }
> +// } catch (Exception e) {
> +// LOG.warn(path + ", failed to access BrokerFacade: reason: "
> + e.getLocalizedMessage());
> +// if (LOG.isDebugEnabled()) {
> +// LOG.debug(request.toString(), e);
> +// }
> +// throw new IOException(e);
> +// }
> request.setAttribute(requestContextName, requestContextWrapper);
> request.setAttribute(requestName, request);
> chain.doFilter(request, response);
>
> Modified:
> activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java
> URL:
> http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java?rev=1413846&r1=1413845&r2=1413846&view=diff
>
> ==============================================================================
> ---
> activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java
> (original)
> +++
> activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacade.java
> Mon Nov 26 21:13:25 2012
> @@ -209,6 +209,4 @@ public interface BrokerFacade {
>
> boolean isJobSchedulerStarted();
>
> - boolean isSlave() throws Exception;
> -
> }
> \ No newline at end of file
>
> Modified:
> activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java
> URL:
> http://svn.apache.org/viewvc/activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java?rev=1413846&r1=1413845&r2=1413846&view=diff
>
> ==============================================================================
> ---
> activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java
> (original)
> +++
> activemq/trunk/activemq-web/src/main/java/org/apache/activemq/web/BrokerFacadeSupport.java
> Mon Nov 26 21:13:25 2012
> @@ -226,8 +226,4 @@ public abstract class BrokerFacadeSuppor
> return false;
> }
> }
> -
> - public boolean isSlave() throws Exception {
> - return getBrokerAdmin().isSlave();
> - }
> }
>
>
>
--
http://redhat.com
http://blog.garytully.com