You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cs...@apache.org on 2015/09/21 19:20:11 UTC
[1/2] activemq git commit: Removing inadvertantly checked in file
Repository: activemq
Updated Branches:
refs/heads/master cc9b9b084 -> 00d19e7b6
http://git-wip-us.apache.org/repos/asf/activemq/blob/00d19e7b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java.orig
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java.orig b/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java.orig
deleted file mode 100644
index e27279c..0000000
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/BrokerService.java.orig
+++ /dev/null
@@ -1,3147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.broker;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.security.Provider;
-import java.security.Security;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.SynchronousQueue;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
-import javax.annotation.PostConstruct;
-import javax.annotation.PreDestroy;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
-import org.apache.activemq.ActiveMQConnectionMetaData;
-import org.apache.activemq.ConfigurationException;
-import org.apache.activemq.Service;
-import org.apache.activemq.advisory.AdvisoryBroker;
-import org.apache.activemq.broker.cluster.ConnectionSplitBroker;
-import org.apache.activemq.broker.jmx.AnnotatedMBean;
-import org.apache.activemq.broker.jmx.BrokerMBeanSupport;
-import org.apache.activemq.broker.jmx.BrokerView;
-import org.apache.activemq.broker.jmx.ConnectorView;
-import org.apache.activemq.broker.jmx.ConnectorViewMBean;
-import org.apache.activemq.broker.jmx.HealthView;
-import org.apache.activemq.broker.jmx.HealthViewMBean;
-import org.apache.activemq.broker.jmx.JmsConnectorView;
-import org.apache.activemq.broker.jmx.JobSchedulerView;
-import org.apache.activemq.broker.jmx.JobSchedulerViewMBean;
-import org.apache.activemq.broker.jmx.Log4JConfigView;
-import org.apache.activemq.broker.jmx.ManagedRegionBroker;
-import org.apache.activemq.broker.jmx.ManagementContext;
-import org.apache.activemq.broker.jmx.NetworkConnectorView;
-import org.apache.activemq.broker.jmx.NetworkConnectorViewMBean;
-import org.apache.activemq.broker.jmx.ProxyConnectorView;
-import org.apache.activemq.broker.region.CompositeDestinationInterceptor;
-import org.apache.activemq.broker.region.Destination;
-import org.apache.activemq.broker.region.DestinationFactory;
-import org.apache.activemq.broker.region.DestinationFactoryImpl;
-import org.apache.activemq.broker.region.DestinationInterceptor;
-import org.apache.activemq.broker.region.RegionBroker;
-import org.apache.activemq.broker.region.policy.PolicyMap;
-import org.apache.activemq.broker.region.virtual.MirroredQueue;
-import org.apache.activemq.broker.region.virtual.VirtualDestination;
-import org.apache.activemq.broker.region.virtual.VirtualDestinationInterceptor;
-import org.apache.activemq.broker.region.virtual.VirtualTopic;
-import org.apache.activemq.broker.scheduler.JobSchedulerStore;
-import org.apache.activemq.broker.scheduler.SchedulerBroker;
-import org.apache.activemq.broker.scheduler.memory.InMemoryJobSchedulerStore;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQQueue;
-import org.apache.activemq.command.BrokerId;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.filter.DestinationFilter;
-import org.apache.activemq.network.ConnectionFilter;
-import org.apache.activemq.network.DiscoveryNetworkConnector;
-import org.apache.activemq.network.NetworkConnector;
-import org.apache.activemq.network.jms.JmsConnector;
-import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.proxy.ProxyConnector;
-import org.apache.activemq.security.MessageAuthorizationPolicy;
-import org.apache.activemq.selector.SelectorParser;
-import org.apache.activemq.store.JournaledStore;
-import org.apache.activemq.store.PListStore;
-import org.apache.activemq.store.PersistenceAdapter;
-import org.apache.activemq.store.PersistenceAdapterFactory;
-import org.apache.activemq.store.memory.MemoryPersistenceAdapter;
-import org.apache.activemq.thread.Scheduler;
-import org.apache.activemq.thread.TaskRunnerFactory;
-import org.apache.activemq.transport.TransportFactorySupport;
-import org.apache.activemq.transport.TransportServer;
-import org.apache.activemq.transport.vm.VMTransportFactory;
-import org.apache.activemq.usage.SystemUsage;
-import org.apache.activemq.util.BrokerSupport;
-import org.apache.activemq.util.DefaultIOExceptionHandler;
-import org.apache.activemq.util.IOExceptionHandler;
-import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.activemq.util.IOHelper;
-import org.apache.activemq.util.InetAddressUtil;
-import org.apache.activemq.util.ServiceStopper;
-import org.apache.activemq.util.StoreUtil;
-import org.apache.activemq.util.ThreadPoolUtils;
-import org.apache.activemq.util.TimeUtils;
-import org.apache.activemq.util.URISupport;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.slf4j.MDC;
-
-/**
- * Manages the life-cycle of an ActiveMQ Broker. A BrokerService consists of a
- * number of transport connectors, network connectors and a bunch of properties
- * which can be used to configure the broker as its lazily created.
- *
- * @org.apache.xbean.XBean
- */
-public class BrokerService implements Service {
- public static final String DEFAULT_PORT = "61616";
- public static final String LOCAL_HOST_NAME;
- public static final String BROKER_VERSION;
- public static final String DEFAULT_BROKER_NAME = "localhost";
- public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
- public static final long DEFAULT_START_TIMEOUT = 600000L;
-
- private static final Logger LOG = LoggerFactory.getLogger(BrokerService.class);
-
- @SuppressWarnings("unused")
- private static final long serialVersionUID = 7353129142305630237L;
-
- private boolean useJmx = true;
- private boolean enableStatistics = true;
- private boolean persistent = true;
- private boolean populateJMSXUserID;
- private boolean useAuthenticatedPrincipalForJMSXUserID;
- private boolean populateUserNameInMBeans;
- private long mbeanInvocationTimeout = 0;
-
- private boolean useShutdownHook = true;
- private boolean useLoggingForShutdownErrors;
- private boolean shutdownOnMasterFailure;
- private boolean shutdownOnSlaveFailure;
- private boolean waitForSlave;
- private long waitForSlaveTimeout = DEFAULT_START_TIMEOUT;
- private boolean passiveSlave;
- private String brokerName = DEFAULT_BROKER_NAME;
- private File dataDirectoryFile;
- private File tmpDataDirectory;
- private Broker broker;
- private BrokerView adminView;
- private ManagementContext managementContext;
- private ObjectName brokerObjectName;
- private TaskRunnerFactory taskRunnerFactory;
- private TaskRunnerFactory persistenceTaskRunnerFactory;
- private SystemUsage systemUsage;
- private SystemUsage producerSystemUsage;
- private SystemUsage consumerSystemUsaage;
- private PersistenceAdapter persistenceAdapter;
- private PersistenceAdapterFactory persistenceFactory;
- protected DestinationFactory destinationFactory;
- private MessageAuthorizationPolicy messageAuthorizationPolicy;
- private final List<TransportConnector> transportConnectors = new CopyOnWriteArrayList<TransportConnector>();
- private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>();
- private final List<ProxyConnector> proxyConnectors = new CopyOnWriteArrayList<ProxyConnector>();
- private final List<JmsConnector> jmsConnectors = new CopyOnWriteArrayList<JmsConnector>();
- private final List<Service> services = new ArrayList<Service>();
- private transient Thread shutdownHook;
- private String[] transportConnectorURIs;
- private String[] networkConnectorURIs;
- private JmsConnector[] jmsBridgeConnectors; // these are Jms to Jms bridges
- // to other jms messaging systems
- private boolean deleteAllMessagesOnStartup;
- private boolean advisorySupport = true;
- private URI vmConnectorURI;
- private String defaultSocketURIString;
- private PolicyMap destinationPolicy;
- private final AtomicBoolean started = new AtomicBoolean(false);
- private final AtomicBoolean stopped = new AtomicBoolean(false);
- private final AtomicBoolean stopping = new AtomicBoolean(false);
- private BrokerPlugin[] plugins;
- private boolean keepDurableSubsActive = true;
- private boolean useVirtualTopics = true;
- private boolean useMirroredQueues = false;
- private boolean useTempMirroredQueues = true;
- private BrokerId brokerId;
- private volatile DestinationInterceptor[] destinationInterceptors;
- private ActiveMQDestination[] destinations;
- private PListStore tempDataStore;
- private int persistenceThreadPriority = Thread.MAX_PRIORITY;
- private boolean useLocalHostBrokerName;
- private final CountDownLatch stoppedLatch = new CountDownLatch(1);
- private final CountDownLatch startedLatch = new CountDownLatch(1);
- private Broker regionBroker;
- private int producerSystemUsagePortion = 60;
- private int consumerSystemUsagePortion = 40;
- private boolean splitSystemUsageForProducersConsumers;
- private boolean monitorConnectionSplits = false;
- private int taskRunnerPriority = Thread.NORM_PRIORITY;
- private boolean dedicatedTaskRunner;
- private boolean cacheTempDestinations = false;// useful for failover
- private int timeBeforePurgeTempDestinations = 5000;
- private final List<Runnable> shutdownHooks = new ArrayList<Runnable>();
- private boolean systemExitOnShutdown;
- private int systemExitOnShutdownExitCode;
- private SslContext sslContext;
- private boolean forceStart = false;
- private IOExceptionHandler ioExceptionHandler;
- private boolean schedulerSupport = false;
- private File schedulerDirectoryFile;
- private Scheduler scheduler;
- private ThreadPoolExecutor executor;
- private int schedulePeriodForDestinationPurge= 0;
- private int maxPurgedDestinationsPerSweep = 0;
- private int schedulePeriodForDiskUsageCheck = 0;
- private boolean diskUsageCheckRegrowPercentChange = true;
- private BrokerContext brokerContext;
- private boolean networkConnectorStartAsync = false;
- private boolean allowTempAutoCreationOnSend;
- private JobSchedulerStore jobSchedulerStore;
- private final AtomicLong totalConnections = new AtomicLong();
- private final AtomicInteger currentConnections = new AtomicInteger();
-
- private long offlineDurableSubscriberTimeout = -1;
- private long offlineDurableSubscriberTaskSchedule = 300000;
- private DestinationFilter virtualConsumerDestinationFilter;
-
- private final AtomicBoolean persistenceAdapterStarted = new AtomicBoolean(false);
- private Throwable startException = null;
- private boolean startAsync = false;
- private Date startDate;
- private boolean slave = true;
-
- private boolean restartAllowed = true;
- private boolean restartRequested = false;
- private boolean rejectDurableConsumers = false;
-
- private int storeOpenWireVersion = OpenWireFormat.DEFAULT_STORE_VERSION;
-
- static {
-
- try {
- ClassLoader loader = BrokerService.class.getClassLoader();
- Class<?> clazz = loader.loadClass("org.bouncycastle.jce.provider.BouncyCastleProvider");
- Provider bouncycastle = (Provider) clazz.newInstance();
- Security.insertProviderAt(bouncycastle, 2);
- LOG.info("Loaded the Bouncy Castle security provider.");
- } catch(Throwable e) {
- // No BouncyCastle found so we use the default Java Security Provider
- }
-
- String localHostName = "localhost";
- try {
- localHostName = InetAddressUtil.getLocalHostName();
- } catch (UnknownHostException e) {
- LOG.error("Failed to resolve localhost");
- }
- LOCAL_HOST_NAME = localHostName;
-
- String version = null;
- try(InputStream in = BrokerService.class.getResourceAsStream("/org/apache/activemq/version.txt")) {
- if (in != null) {
- try(InputStreamReader isr = new InputStreamReader(in);
- BufferedReader reader = new BufferedReader(isr)) {
- version = reader.readLine();
- }
- }
- } catch (IOException ie) {
- LOG.warn("Error reading broker version ", ie);
- }
- BROKER_VERSION = version;
- }
-
- @Override
- public String toString() {
- return "BrokerService[" + getBrokerName() + "]";
- }
-
- private String getBrokerVersion() {
- String version = ActiveMQConnectionMetaData.PROVIDER_VERSION;
- if (version == null) {
- version = BROKER_VERSION;
- }
-
- return version;
- }
-
- /**
- * Adds a new transport connector for the given bind address
- *
- * @return the newly created and added transport connector
- * @throws Exception
- */
- public TransportConnector addConnector(String bindAddress) throws Exception {
- return addConnector(new URI(bindAddress));
- }
-
- /**
- * Adds a new transport connector for the given bind address
- *
- * @return the newly created and added transport connector
- * @throws Exception
- */
- public TransportConnector addConnector(URI bindAddress) throws Exception {
- return addConnector(createTransportConnector(bindAddress));
- }
-
- /**
- * Adds a new transport connector for the given TransportServer transport
- *
- * @return the newly created and added transport connector
- * @throws Exception
- */
- public TransportConnector addConnector(TransportServer transport) throws Exception {
- return addConnector(new TransportConnector(transport));
- }
-
- /**
- * Adds a new transport connector
- *
- * @return the transport connector
- * @throws Exception
- */
- public TransportConnector addConnector(TransportConnector connector) throws Exception {
- transportConnectors.add(connector);
- return connector;
- }
-
- /**
- * Stops and removes a transport connector from the broker.
- *
- * @param connector
- * @return true if the connector has been previously added to the broker
- * @throws Exception
- */
- public boolean removeConnector(TransportConnector connector) throws Exception {
- boolean rc = transportConnectors.remove(connector);
- if (rc) {
- unregisterConnectorMBean(connector);
- }
- return rc;
- }
-
- /**
- * Adds a new network connector using the given discovery address
- *
- * @return the newly created and added network connector
- * @throws Exception
- */
- public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception {
- return addNetworkConnector(new URI(discoveryAddress));
- }
-
- /**
- * Adds a new proxy connector using the given bind address
- *
- * @return the newly created and added network connector
- * @throws Exception
- */
- public ProxyConnector addProxyConnector(String bindAddress) throws Exception {
- return addProxyConnector(new URI(bindAddress));
- }
-
- /**
- * Adds a new network connector using the given discovery address
- *
- * @return the newly created and added network connector
- * @throws Exception
- */
- public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception {
- NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress);
- return addNetworkConnector(connector);
- }
-
- /**
- * Adds a new proxy connector using the given bind address
- *
- * @return the newly created and added network connector
- * @throws Exception
- */
- public ProxyConnector addProxyConnector(URI bindAddress) throws Exception {
- ProxyConnector connector = new ProxyConnector();
- connector.setBind(bindAddress);
- connector.setRemote(new URI("fanout:multicast://default"));
- return addProxyConnector(connector);
- }
-
- /**
- * Adds a new network connector to connect this broker to a federated
- * network
- */
- public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception {
- connector.setBrokerService(this);
- connector.setLocalUri(getVmConnectorURI());
- // Set a connection filter so that the connector does not establish loop
- // back connections.
- connector.setConnectionFilter(new ConnectionFilter() {
- @Override
- public boolean connectTo(URI location) {
- List<TransportConnector> transportConnectors = getTransportConnectors();
- for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
- try {
- TransportConnector tc = iter.next();
- if (location.equals(tc.getConnectUri())) {
- return false;
- }
- } catch (Throwable e) {
- }
- }
- return true;
- }
- });
- networkConnectors.add(connector);
- return connector;
- }
-
- /**
- * Removes the given network connector without stopping it. The caller
- * should call {@link NetworkConnector#stop()} to close the connector
- */
- public boolean removeNetworkConnector(NetworkConnector connector) {
- boolean answer = networkConnectors.remove(connector);
- if (answer) {
- unregisterNetworkConnectorMBean(connector);
- }
- return answer;
- }
-
- public ProxyConnector addProxyConnector(ProxyConnector connector) throws Exception {
- URI uri = getVmConnectorURI();
- connector.setLocalUri(uri);
- proxyConnectors.add(connector);
- if (isUseJmx()) {
- registerProxyConnectorMBean(connector);
- }
- return connector;
- }
-
- public JmsConnector addJmsConnector(JmsConnector connector) throws Exception {
- connector.setBrokerService(this);
- jmsConnectors.add(connector);
- if (isUseJmx()) {
- registerJmsConnectorMBean(connector);
- }
- return connector;
- }
-
- public JmsConnector removeJmsConnector(JmsConnector connector) {
- if (jmsConnectors.remove(connector)) {
- return connector;
- }
- return null;
- }
-
- public void masterFailed() {
- if (shutdownOnMasterFailure) {
- LOG.error("The Master has failed ... shutting down");
- try {
- stop();
- } catch (Exception e) {
- LOG.error("Failed to stop for master failure", e);
- }
- } else {
- LOG.warn("Master Failed - starting all connectors");
- try {
- startAllConnectors();
- broker.nowMasterBroker();
- } catch (Exception e) {
- LOG.error("Failed to startAllConnectors", e);
- }
- }
- }
-
- public String getUptime() {
- long delta = getUptimeMillis();
-
- if (delta == 0) {
- return "not started";
- }
-
- return TimeUtils.printDuration(delta);
- }
-
- public long getUptimeMillis() {
- if (startDate == null) {
- return 0;
- }
-
- return new Date().getTime() - startDate.getTime();
- }
-
- public boolean isStarted() {
- return started.get() && startedLatch.getCount() == 0;
- }
-
- /**
- * Forces a start of the broker.
- * By default a BrokerService instance that was
- * previously stopped using BrokerService.stop() cannot be restarted
- * using BrokerService.start().
- * This method enforces a restart.
- * It is not recommended to force a restart of the broker and will not work
- * for most but some very trivial broker configurations.
- * For restarting a broker instance we recommend to first call stop() on
- * the old instance and then recreate a new BrokerService instance.
- *
- * @param force - if true enforces a restart.
- * @throws Exception
- */
- public void start(boolean force) throws Exception {
- forceStart = force;
- stopped.set(false);
- started.set(false);
- start();
- }
-
- // Service interface
- // -------------------------------------------------------------------------
-
- protected boolean shouldAutostart() {
- return true;
- }
-
- /**
- * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions
- *
- * delegates to autoStart, done to prevent backwards incompatible signature change
- */
- @PostConstruct
- private void postConstruct() {
- try {
- autoStart();
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- }
-
- /**
- *
- * @throws Exception
- * @org. apache.xbean.InitMethod
- */
- public void autoStart() throws Exception {
- if(shouldAutostart()) {
- start();
- }
- }
-
- @Override
- public void start() throws Exception {
- if (stopped.get() || !started.compareAndSet(false, true)) {
- // lets just ignore redundant start() calls
- // as its way too easy to not be completely sure if start() has been
- // called or not with the gazillion of different configuration
- // mechanisms
- // throw new IllegalStateException("Already started.");
- return;
- }
-
- stopping.set(false);
- startDate = new Date();
- MDC.put("activemq.broker", brokerName);
-
- try {
- if (systemExitOnShutdown && useShutdownHook) {
- throw new ConfigurationException("'useShutdownHook' property cannot be be used with 'systemExitOnShutdown', please turn it off (useShutdownHook=false)");
- }
- processHelperProperties();
- if (isUseJmx()) {
- // need to remove MDC during starting JMX, as that would otherwise causes leaks, as spawned threads inheirt the MDC and
- // we cannot cleanup clear that during shutdown of the broker.
- MDC.remove("activemq.broker");
- try {
- startManagementContext();
- for (NetworkConnector connector : getNetworkConnectors()) {
- registerNetworkConnectorMBean(connector);
- }
- } finally {
- MDC.put("activemq.broker", brokerName);
- }
- }
-
- // in jvm master slave, lets not publish over existing broker till we get the lock
- final BrokerRegistry brokerRegistry = BrokerRegistry.getInstance();
- if (brokerRegistry.lookup(getBrokerName()) == null) {
- brokerRegistry.bind(getBrokerName(), BrokerService.this);
- }
- startPersistenceAdapter(startAsync);
- startBroker(startAsync);
- brokerRegistry.bind(getBrokerName(), BrokerService.this);
- } catch (Exception e) {
- LOG.error("Failed to start Apache ActiveMQ ({}, {})", new Object[]{ getBrokerName(), brokerId }, e);
- try {
- if (!stopped.get()) {
- stop();
- }
- } catch (Exception ex) {
- LOG.warn("Failed to stop broker after failure in start. This exception will be ignored.", ex);
- }
- throw e;
- } finally {
- MDC.remove("activemq.broker");
- }
- }
-
- private void startPersistenceAdapter(boolean async) throws Exception {
- if (async) {
- new Thread("Persistence Adapter Starting Thread") {
- @Override
- public void run() {
- try {
- doStartPersistenceAdapter();
- } catch (Throwable e) {
- startException = e;
- } finally {
- synchronized (persistenceAdapterStarted) {
- persistenceAdapterStarted.set(true);
- persistenceAdapterStarted.notifyAll();
- }
- }
- }
- }.start();
- } else {
- doStartPersistenceAdapter();
- }
- }
-
- private void doStartPersistenceAdapter() throws Exception {
- getPersistenceAdapter().setUsageManager(getProducerSystemUsage());
- getPersistenceAdapter().setBrokerName(getBrokerName());
- LOG.info("Using Persistence Adapter: {}", getPersistenceAdapter());
- if (deleteAllMessagesOnStartup) {
- deleteAllMessages();
- }
- getPersistenceAdapter().start();
-
- getJobSchedulerStore();
- if (jobSchedulerStore != null) {
- try {
- jobSchedulerStore.start();
- } catch (Exception e) {
- RuntimeException exception = new RuntimeException(
- "Failed to start job scheduler store: " + jobSchedulerStore, e);
- LOG.error(exception.getLocalizedMessage(), e);
- throw exception;
- }
- }
- }
-
- private void startBroker(boolean async) throws Exception {
- if (async) {
- new Thread("Broker Starting Thread") {
- @Override
- public void run() {
- try {
- synchronized (persistenceAdapterStarted) {
- if (!persistenceAdapterStarted.get()) {
- persistenceAdapterStarted.wait();
- }
- }
- doStartBroker();
- } catch (Throwable t) {
- startException = t;
- }
- }
- }.start();
- } else {
- doStartBroker();
- }
- }
-
- private void doStartBroker() throws Exception {
- if (startException != null) {
- return;
- }
- startDestinations();
- addShutdownHook();
-
- broker = getBroker();
- brokerId = broker.getBrokerId();
-
- // need to log this after creating the broker so we have its id and name
- LOG.info("Apache ActiveMQ {} ({}, {}) is starting", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId });
- broker.start();
-
- if (isUseJmx()) {
- if (getManagementContext().isCreateConnector() && !getManagementContext().isConnectorStarted()) {
- // try to restart management context
- // typical for slaves that use the same ports as master
- managementContext.stop();
- startManagementContext();
- }
- ManagedRegionBroker managedBroker = (ManagedRegionBroker) regionBroker;
- managedBroker.setContextBroker(broker);
- adminView.setBroker(managedBroker);
- }
-
- if (ioExceptionHandler == null) {
- setIoExceptionHandler(new DefaultIOExceptionHandler());
- }
-
- if (isUseJmx() && Log4JConfigView.isLog4JAvailable()) {
- ObjectName objectName = BrokerMBeanSupport.createLog4JConfigViewName(getBrokerObjectName().toString());
- Log4JConfigView log4jConfigView = new Log4JConfigView();
- AnnotatedMBean.registerMBean(getManagementContext(), log4jConfigView, objectName);
- }
-
- startAllConnectors();
-
- LOG.info("Apache ActiveMQ {} ({}, {}) started", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId});
- LOG.info("For help or more information please see: http://activemq.apache.org");
-
- getBroker().brokerServiceStarted();
- checkSystemUsageLimits();
- startedLatch.countDown();
- getBroker().nowMasterBroker();
- }
-
- /**
- * JSR-250 callback wrapper; converts checked exceptions to runtime exceptions
- *
- * delegates to stop, done to prevent backwards incompatible signature change
- */
- @PreDestroy
- private void preDestroy () {
- try {
- stop();
- } catch (Exception ex) {
- throw new RuntimeException();
- }
- }
-
- /**
- *
- * @throws Exception
- * @org.apache .xbean.DestroyMethod
- */
- @Override
- public void stop() throws Exception {
- if (!stopping.compareAndSet(false, true)) {
- LOG.trace("Broker already stopping/stopped");
- return;
- }
-
- MDC.put("activemq.broker", brokerName);
-
- if (systemExitOnShutdown) {
- new Thread() {
- @Override
- public void run() {
- System.exit(systemExitOnShutdownExitCode);
- }
- }.start();
- }
-
- LOG.info("Apache ActiveMQ {} ({}, {}) is shutting down", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId} );
-
- removeShutdownHook();
- if (this.scheduler != null) {
- this.scheduler.stop();
- this.scheduler = null;
- }
- ServiceStopper stopper = new ServiceStopper();
- if (services != null) {
- for (Service service : services) {
- stopper.stop(service);
- }
- }
- stopAllConnectors(stopper);
- this.slave = true;
- // remove any VMTransports connected
- // this has to be done after services are stopped,
- // to avoid timing issue with discovery (spinning up a new instance)
- BrokerRegistry.getInstance().unbind(getBrokerName());
- VMTransportFactory.stopped(getBrokerName());
- if (broker != null) {
- stopper.stop(broker);
- broker = null;
- }
-
- if (jobSchedulerStore != null) {
- jobSchedulerStore.stop();
- jobSchedulerStore = null;
- }
- if (tempDataStore != null) {
- tempDataStore.stop();
- tempDataStore = null;
- }
- try {
- stopper.stop(persistenceAdapter);
- persistenceAdapter = null;
- if (isUseJmx()) {
- stopper.stop(getManagementContext());
- managementContext = null;
- }
- // Clear SelectorParser cache to free memory
- SelectorParser.clearCache();
- } finally {
- started.set(false);
- stopped.set(true);
- stoppedLatch.countDown();
- }
-
- if (this.taskRunnerFactory != null) {
- this.taskRunnerFactory.shutdown();
- this.taskRunnerFactory = null;
- }
- if (this.executor != null) {
- ThreadPoolUtils.shutdownNow(executor);
- this.executor = null;
- }
-
- this.destinationInterceptors = null;
- this.destinationFactory = null;
-
- if (startDate != null) {
- LOG.info("Apache ActiveMQ {} ({}, {}) uptime {}", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId, getUptime()});
- }
- LOG.info("Apache ActiveMQ {} ({}, {}) is shutdown", new Object[]{ getBrokerVersion(), getBrokerName(), brokerId});
-
- synchronized (shutdownHooks) {
- for (Runnable hook : shutdownHooks) {
- try {
- hook.run();
- } catch (Throwable e) {
- stopper.onException(hook, e);
- }
- }
- }
-
- MDC.remove("activemq.broker");
-
- // and clear start date
- startDate = null;
-
- stopper.throwFirstException();
- }
-
- public boolean checkQueueSize(String queueName) {
- long count = 0;
- long queueSize = 0;
- Map<ActiveMQDestination, Destination> destinationMap = regionBroker.getDestinationMap();
- for (Map.Entry<ActiveMQDestination, Destination> entry : destinationMap.entrySet()) {
- if (entry.getKey().isQueue()) {
- if (entry.getValue().getName().matches(queueName)) {
- queueSize = entry.getValue().getDestinationStatistics().getMessages().getCount();
- count += queueSize;
- if (queueSize > 0) {
- LOG.info("Queue has pending message: {} queueSize is: {}", entry.getValue().getName(), queueSize);
- }
- }
- }
- }
- return count == 0;
- }
-
- /**
- * This method (both connectorName and queueName are using regex to match)
- * 1. stop the connector (supposed the user input the connector which the
- * clients connect to) 2. to check whether there is any pending message on
- * the queues defined by queueName 3. supposedly, after stop the connector,
- * client should failover to other broker and pending messages should be
- * forwarded. if no pending messages, the method finally call stop to stop
- * the broker.
- *
- * @param connectorName
- * @param queueName
- * @param timeout
- * @param pollInterval
- * @throws Exception
- */
- public void stopGracefully(String connectorName, String queueName, long timeout, long pollInterval) throws Exception {
- if (isUseJmx()) {
- if (connectorName == null || queueName == null || timeout <= 0) {
- throw new Exception(
- "connectorName and queueName cannot be null and timeout should be >0 for stopGracefully.");
- }
- if (pollInterval <= 0) {
- pollInterval = 30;
- }
- LOG.info("Stop gracefully with connectorName: {} queueName: {} timeout: {} pollInterval: {}", new Object[]{
- connectorName, queueName, timeout, pollInterval
- });
- TransportConnector connector;
- for (int i = 0; i < transportConnectors.size(); i++) {
- connector = transportConnectors.get(i);
- if (connector != null && connector.getName() != null && connector.getName().matches(connectorName)) {
- connector.stop();
- }
- }
- long start = System.currentTimeMillis();
- while (System.currentTimeMillis() - start < timeout * 1000) {
- // check quesize until it gets zero
- if (checkQueueSize(queueName)) {
- stop();
- break;
- } else {
- Thread.sleep(pollInterval * 1000);
- }
- }
- if (stopped.get()) {
- LOG.info("Successfully stop the broker.");
- } else {
- LOG.info("There is still pending message on the queue. Please check and stop the broker manually.");
- }
- }
- }
-
- /**
- * A helper method to block the caller thread until the broker has been
- * stopped
- */
- public void waitUntilStopped() {
- while (isStarted() && !stopped.get()) {
- try {
- stoppedLatch.await();
- } catch (InterruptedException e) {
- // ignore
- }
- }
- }
-
- public boolean isStopped() {
- return stopped.get();
- }
-
- /**
- * A helper method to block the caller thread until the broker has fully started
- * @return boolean true if wait succeeded false if broker was not started or was stopped
- */
- public boolean waitUntilStarted() {
- return waitUntilStarted(DEFAULT_START_TIMEOUT);
- }
-
- /**
- * A helper method to block the caller thread until the broker has fully started
- *
- * @param timeout
- * the amount of time to wait before giving up and returning false.
- *
- * @return boolean true if wait succeeded false if broker was not started or was stopped
- */
- public boolean waitUntilStarted(long timeout) {
- boolean waitSucceeded = isStarted();
- long expiration = Math.max(0, timeout + System.currentTimeMillis());
- while (!isStarted() && !stopped.get() && !waitSucceeded && expiration > System.currentTimeMillis()) {
- try {
- if (startException != null) {
- return waitSucceeded;
- }
- waitSucceeded = startedLatch.await(100L, TimeUnit.MILLISECONDS);
- } catch (InterruptedException ignore) {
- }
- }
- return waitSucceeded;
- }
-
- // Properties
- // -------------------------------------------------------------------------
- /**
- * Returns the message broker
- */
- public Broker getBroker() throws Exception {
- if (broker == null) {
- broker = createBroker();
- }
- return broker;
- }
-
- /**
- * Returns the administration view of the broker; used to create and destroy
- * resources such as queues and topics. Note this method returns null if JMX
- * is disabled.
- */
- public BrokerView getAdminView() throws Exception {
- if (adminView == null) {
- // force lazy creation
- getBroker();
- }
- return adminView;
- }
-
- public void setAdminView(BrokerView adminView) {
- this.adminView = adminView;
- }
-
- public String getBrokerName() {
- return brokerName;
- }
-
- /**
- * Sets the name of this broker; which must be unique in the network
- *
- * @param brokerName
- */
- public void setBrokerName(String brokerName) {
- if (brokerName == null) {
- throw new NullPointerException("The broker name cannot be null");
- }
- String str = brokerName.replaceAll("[^a-zA-Z0-9\\.\\_\\-\\:]", "_");
- if (!str.equals(brokerName)) {
- LOG.error("Broker Name: {} contained illegal characters - replaced with {}", brokerName, str);
- }
- this.brokerName = str.trim();
- }
-
- public PersistenceAdapterFactory getPersistenceFactory() {
- return persistenceFactory;
- }
-
- public File getDataDirectoryFile() {
- if (dataDirectoryFile == null) {
- dataDirectoryFile = new File(IOHelper.getDefaultDataDirectory());
- }
- return dataDirectoryFile;
- }
-
- public File getBrokerDataDirectory() {
- String brokerDir = getBrokerName();
- return new File(getDataDirectoryFile(), brokerDir);
- }
-
- /**
- * Sets the directory in which the data files will be stored by default for
- * the JDBC and Journal persistence adaptors.
- *
- * @param dataDirectory
- * the directory to store data files
- */
- public void setDataDirectory(String dataDirectory) {
- setDataDirectoryFile(new File(dataDirectory));
- }
-
- /**
- * Sets the directory in which the data files will be stored by default for
- * the JDBC and Journal persistence adaptors.
- *
- * @param dataDirectoryFile
- * the directory to store data files
- */
- public void setDataDirectoryFile(File dataDirectoryFile) {
- this.dataDirectoryFile = dataDirectoryFile;
- }
-
- /**
- * @return the tmpDataDirectory
- */
- public File getTmpDataDirectory() {
- if (tmpDataDirectory == null) {
- tmpDataDirectory = new File(getBrokerDataDirectory(), "tmp_storage");
- }
- return tmpDataDirectory;
- }
-
- /**
- * @param tmpDataDirectory
- * the tmpDataDirectory to set
- */
- public void setTmpDataDirectory(File tmpDataDirectory) {
- this.tmpDataDirectory = tmpDataDirectory;
- }
-
- public void setPersistenceFactory(PersistenceAdapterFactory persistenceFactory) {
- this.persistenceFactory = persistenceFactory;
- }
-
- public void setDestinationFactory(DestinationFactory destinationFactory) {
- this.destinationFactory = destinationFactory;
- }
-
- public boolean isPersistent() {
- return persistent;
- }
-
- /**
- * Sets whether or not persistence is enabled or disabled.
- * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
- */
- public void setPersistent(boolean persistent) {
- this.persistent = persistent;
- }
-
- public boolean isPopulateJMSXUserID() {
- return populateJMSXUserID;
- }
-
- /**
- * Sets whether or not the broker should populate the JMSXUserID header.
- */
- public void setPopulateJMSXUserID(boolean populateJMSXUserID) {
- this.populateJMSXUserID = populateJMSXUserID;
- }
-
- public SystemUsage getSystemUsage() {
- try {
- if (systemUsage == null) {
-
- systemUsage = new SystemUsage("Main", getPersistenceAdapter(), getTempDataStore(), getJobSchedulerStore());
- systemUsage.setExecutor(getExecutor());
- systemUsage.getMemoryUsage().setLimit(1024L * 1024 * 1024 * 1); // 1 GB
- systemUsage.getTempUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB
- systemUsage.getStoreUsage().setLimit(1024L * 1024 * 1024 * 100); // 100 GB
- systemUsage.getJobSchedulerUsage().setLimit(1024L * 1024 * 1024 * 50); // 50 GB
- addService(this.systemUsage);
- }
- return systemUsage;
- } catch (IOException e) {
- LOG.error("Cannot create SystemUsage", e);
- throw new RuntimeException("Fatally failed to create SystemUsage" + e.getMessage(), e);
- }
- }
-
- public void setSystemUsage(SystemUsage memoryManager) {
- if (this.systemUsage != null) {
- removeService(this.systemUsage);
- }
- this.systemUsage = memoryManager;
- if (this.systemUsage.getExecutor()==null) {
- this.systemUsage.setExecutor(getExecutor());
- }
- addService(this.systemUsage);
- }
-
- /**
- * @return the consumerUsageManager
- * @throws IOException
- */
- public SystemUsage getConsumerSystemUsage() throws IOException {
- if (this.consumerSystemUsaage == null) {
- if (splitSystemUsageForProducersConsumers) {
- this.consumerSystemUsaage = new SystemUsage(getSystemUsage(), "Consumer");
- float portion = consumerSystemUsagePortion / 100f;
- this.consumerSystemUsaage.getMemoryUsage().setUsagePortion(portion);
- addService(this.consumerSystemUsaage);
- } else {
- consumerSystemUsaage = getSystemUsage();
- }
- }
- return this.consumerSystemUsaage;
- }
-
- /**
- * @param consumerSystemUsaage
- * the storeSystemUsage to set
- */
- public void setConsumerSystemUsage(SystemUsage consumerSystemUsaage) {
- if (this.consumerSystemUsaage != null) {
- removeService(this.consumerSystemUsaage);
- }
- this.consumerSystemUsaage = consumerSystemUsaage;
- addService(this.consumerSystemUsaage);
- }
-
- /**
- * @return the producerUsageManager
- * @throws IOException
- */
- public SystemUsage getProducerSystemUsage() throws IOException {
- if (producerSystemUsage == null) {
- if (splitSystemUsageForProducersConsumers) {
- producerSystemUsage = new SystemUsage(getSystemUsage(), "Producer");
- float portion = producerSystemUsagePortion / 100f;
- producerSystemUsage.getMemoryUsage().setUsagePortion(portion);
- addService(producerSystemUsage);
- } else {
- producerSystemUsage = getSystemUsage();
- }
- }
- return producerSystemUsage;
- }
-
- /**
- * @param producerUsageManager
- * the producerUsageManager to set
- */
- public void setProducerSystemUsage(SystemUsage producerUsageManager) {
- if (this.producerSystemUsage != null) {
- removeService(this.producerSystemUsage);
- }
- this.producerSystemUsage = producerUsageManager;
- addService(this.producerSystemUsage);
- }
-
- public PersistenceAdapter getPersistenceAdapter() throws IOException {
- if (persistenceAdapter == null) {
- persistenceAdapter = createPersistenceAdapter();
- configureService(persistenceAdapter);
- this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
- }
- return persistenceAdapter;
- }
-
- /**
- * Sets the persistence adaptor implementation to use for this broker
- *
- * @throws IOException
- */
- public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) throws IOException {
- if (!isPersistent() && ! (persistenceAdapter instanceof MemoryPersistenceAdapter)) {
- LOG.warn("persistent=\"false\", ignoring configured persistenceAdapter: {}", persistenceAdapter);
- return;
- }
- this.persistenceAdapter = persistenceAdapter;
- configureService(this.persistenceAdapter);
- this.persistenceAdapter = registerPersistenceAdapterMBean(persistenceAdapter);
- }
-
- public TaskRunnerFactory getTaskRunnerFactory() {
- if (this.taskRunnerFactory == null) {
- this.taskRunnerFactory = new TaskRunnerFactory("ActiveMQ BrokerService["+getBrokerName()+"] Task", getTaskRunnerPriority(), true, 1000,
- isDedicatedTaskRunner());
- this.taskRunnerFactory.setThreadClassLoader(this.getClass().getClassLoader());
- }
- return this.taskRunnerFactory;
- }
-
- public void setTaskRunnerFactory(TaskRunnerFactory taskRunnerFactory) {
- this.taskRunnerFactory = taskRunnerFactory;
- }
-
- public TaskRunnerFactory getPersistenceTaskRunnerFactory() {
- if (taskRunnerFactory == null) {
- persistenceTaskRunnerFactory = new TaskRunnerFactory("Persistence Adaptor Task", persistenceThreadPriority,
- true, 1000, isDedicatedTaskRunner());
- }
- return persistenceTaskRunnerFactory;
- }
-
- public void setPersistenceTaskRunnerFactory(TaskRunnerFactory persistenceTaskRunnerFactory) {
- this.persistenceTaskRunnerFactory = persistenceTaskRunnerFactory;
- }
-
- public boolean isUseJmx() {
- return useJmx;
- }
-
- public boolean isEnableStatistics() {
- return enableStatistics;
- }
-
- /**
- * Sets whether or not the Broker's services enable statistics or not.
- */
- public void setEnableStatistics(boolean enableStatistics) {
- this.enableStatistics = enableStatistics;
- }
-
- /**
- * Sets whether or not the Broker's services should be exposed into JMX or
- * not.
- * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
- */
- public void setUseJmx(boolean useJmx) {
- this.useJmx = useJmx;
- }
-
- public ObjectName getBrokerObjectName() throws MalformedObjectNameException {
- if (brokerObjectName == null) {
- brokerObjectName = createBrokerObjectName();
- }
- return brokerObjectName;
- }
-
- /**
- * Sets the JMX ObjectName for this broker
- */
- public void setBrokerObjectName(ObjectName brokerObjectName) {
- this.brokerObjectName = brokerObjectName;
- }
-
- public ManagementContext getManagementContext() {
- if (managementContext == null) {
- managementContext = new ManagementContext();
- }
- return managementContext;
- }
-
- public void setManagementContext(ManagementContext managementContext) {
- this.managementContext = managementContext;
- }
-
- public NetworkConnector getNetworkConnectorByName(String connectorName) {
- for (NetworkConnector connector : networkConnectors) {
- if (connector.getName().equals(connectorName)) {
- return connector;
- }
- }
- return null;
- }
-
- public String[] getNetworkConnectorURIs() {
- return networkConnectorURIs;
- }
-
- public void setNetworkConnectorURIs(String[] networkConnectorURIs) {
- this.networkConnectorURIs = networkConnectorURIs;
- }
-
- public TransportConnector getConnectorByName(String connectorName) {
- for (TransportConnector connector : transportConnectors) {
- if (connector.getName().equals(connectorName)) {
- return connector;
- }
- }
- return null;
- }
-
- public Map<String, String> getTransportConnectorURIsAsMap() {
- Map<String, String> answer = new HashMap<String, String>();
- for (TransportConnector connector : transportConnectors) {
- try {
- URI uri = connector.getConnectUri();
- if (uri != null) {
- String scheme = uri.getScheme();
- if (scheme != null) {
- answer.put(scheme.toLowerCase(Locale.ENGLISH), uri.toString());
- }
- }
- } catch (Exception e) {
- LOG.debug("Failed to read URI to build transportURIsAsMap", e);
- }
- }
- return answer;
- }
-
- public ProducerBrokerExchange getProducerBrokerExchange(ProducerInfo producerInfo){
- ProducerBrokerExchange result = null;
-
- for (TransportConnector connector : transportConnectors) {
- for (TransportConnection tc: connector.getConnections()){
- result = tc.getProducerBrokerExchangeIfExists(producerInfo);
- if (result !=null){
- return result;
- }
- }
- }
- return result;
- }
-
- public String[] getTransportConnectorURIs() {
- return transportConnectorURIs;
- }
-
- public void setTransportConnectorURIs(String[] transportConnectorURIs) {
- this.transportConnectorURIs = transportConnectorURIs;
- }
-
- /**
- * @return Returns the jmsBridgeConnectors.
- */
- public JmsConnector[] getJmsBridgeConnectors() {
- return jmsBridgeConnectors;
- }
-
- /**
- * @param jmsConnectors
- * The jmsBridgeConnectors to set.
- */
- public void setJmsBridgeConnectors(JmsConnector[] jmsConnectors) {
- this.jmsBridgeConnectors = jmsConnectors;
- }
-
- public Service[] getServices() {
- return services.toArray(new Service[0]);
- }
-
- /**
- * Sets the services associated with this broker.
- */
- public void setServices(Service[] services) {
- this.services.clear();
- if (services != null) {
- for (int i = 0; i < services.length; i++) {
- this.services.add(services[i]);
- }
- }
- }
-
- /**
- * Adds a new service so that it will be started as part of the broker
- * lifecycle
- */
- public void addService(Service service) {
- services.add(service);
- }
-
- public void removeService(Service service) {
- services.remove(service);
- }
-
- public boolean isUseLoggingForShutdownErrors() {
- return useLoggingForShutdownErrors;
- }
-
- /**
- * Sets whether or not we should use commons-logging when reporting errors
- * when shutting down the broker
- */
- public void setUseLoggingForShutdownErrors(boolean useLoggingForShutdownErrors) {
- this.useLoggingForShutdownErrors = useLoggingForShutdownErrors;
- }
-
- public boolean isUseShutdownHook() {
- return useShutdownHook;
- }
-
- /**
- * Sets whether or not we should use a shutdown handler to close down the
- * broker cleanly if the JVM is terminated. It is recommended you leave this
- * enabled.
- */
- public void setUseShutdownHook(boolean useShutdownHook) {
- this.useShutdownHook = useShutdownHook;
- }
-
- public boolean isAdvisorySupport() {
- return advisorySupport;
- }
-
- /**
- * Allows the support of advisory messages to be disabled for performance
- * reasons.
- * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
- */
- public void setAdvisorySupport(boolean advisorySupport) {
- this.advisorySupport = advisorySupport;
- }
-
- public List<TransportConnector> getTransportConnectors() {
- return new ArrayList<TransportConnector>(transportConnectors);
- }
-
- /**
- * Sets the transport connectors which this broker will listen on for new
- * clients
- *
- * @org.apache.xbean.Property
- * nestedType="org.apache.activemq.broker.TransportConnector"
- */
- public void setTransportConnectors(List<TransportConnector> transportConnectors) throws Exception {
- for (TransportConnector connector : transportConnectors) {
- addConnector(connector);
- }
- }
-
- public TransportConnector getTransportConnectorByName(String name){
- for (TransportConnector transportConnector : transportConnectors){
- if (name.equals(transportConnector.getName())){
- return transportConnector;
- }
- }
- return null;
- }
-
- public TransportConnector getTransportConnectorByScheme(String scheme){
- for (TransportConnector transportConnector : transportConnectors){
- if (scheme.equals(transportConnector.getUri().getScheme())){
- return transportConnector;
- }
- }
- return null;
- }
-
- public List<NetworkConnector> getNetworkConnectors() {
- return new ArrayList<NetworkConnector>(networkConnectors);
- }
-
- public List<ProxyConnector> getProxyConnectors() {
- return new ArrayList<ProxyConnector>(proxyConnectors);
- }
-
- /**
- * Sets the network connectors which this broker will use to connect to
- * other brokers in a federated network
- *
- * @org.apache.xbean.Property
- * nestedType="org.apache.activemq.network.NetworkConnector"
- */
- public void setNetworkConnectors(List<?> networkConnectors) throws Exception {
- for (Object connector : networkConnectors) {
- addNetworkConnector((NetworkConnector) connector);
- }
- }
-
- /**
- * Sets the network connectors which this broker will use to connect to
- * other brokers in a federated network
- */
- public void setProxyConnectors(List<?> proxyConnectors) throws Exception {
- for (Object connector : proxyConnectors) {
- addProxyConnector((ProxyConnector) connector);
- }
- }
-
- public PolicyMap getDestinationPolicy() {
- return destinationPolicy;
- }
-
- /**
- * Sets the destination specific policies available either for exact
- * destinations or for wildcard areas of destinations.
- */
- public void setDestinationPolicy(PolicyMap policyMap) {
- this.destinationPolicy = policyMap;
- }
-
- public BrokerPlugin[] getPlugins() {
- return plugins;
- }
-
- /**
- * Sets a number of broker plugins to install such as for security
- * authentication or authorization
- */
- public void setPlugins(BrokerPlugin[] plugins) {
- this.plugins = plugins;
- }
-
- public MessageAuthorizationPolicy getMessageAuthorizationPolicy() {
- return messageAuthorizationPolicy;
- }
-
- /**
- * Sets the policy used to decide if the current connection is authorized to
- * consume a given message
- */
- public void setMessageAuthorizationPolicy(MessageAuthorizationPolicy messageAuthorizationPolicy) {
- this.messageAuthorizationPolicy = messageAuthorizationPolicy;
- }
-
- /**
- * Delete all messages from the persistent store
- *
- * @throws IOException
- */
- public void deleteAllMessages() throws IOException {
- getPersistenceAdapter().deleteAllMessages();
- }
-
- public boolean isDeleteAllMessagesOnStartup() {
- return deleteAllMessagesOnStartup;
- }
-
- /**
- * Sets whether or not all messages are deleted on startup - mostly only
- * useful for testing.
- * @org.apache.xbean.Property propertyEditor="org.apache.activemq.util.BooleanEditor"
- */
- public void setDeleteAllMessagesOnStartup(boolean deletePersistentMessagesOnStartup) {
- this.deleteAllMessagesOnStartup = deletePersistentMessagesOnStartup;
- }
-
- public URI getVmConnectorURI() {
- if (vmConnectorURI == null) {
- try {
- vmConnectorURI = new URI("vm://" + getBrokerName().replaceAll("[^a-zA-Z0-9\\.\\_\\-]", "_"));
- } catch (URISyntaxException e) {
- LOG.error("Badly formed URI from {}", getBrokerName(), e);
- }
- }
- return vmConnectorURI;
- }
-
- public void setVmConnectorURI(URI vmConnectorURI) {
- this.vmConnectorURI = vmConnectorURI;
- }
-
- public String getDefaultSocketURIString() {
- if (started.get()) {
- if (this.defaultSocketURIString == null) {
- for (TransportConnector tc:this.transportConnectors) {
- String result = null;
- try {
- result = tc.getPublishableConnectString();
- } catch (Exception e) {
- LOG.warn("Failed to get the ConnectURI for {}", tc, e);
- }
- if (result != null) {
- // find first publishable uri
- if (tc.isUpdateClusterClients() || tc.isRebalanceClusterClients()) {
- this.defaultSocketURIString = result;
- break;
- } else {
- // or use the first defined
- if (this.defaultSocketURIString == null) {
- this.defaultSocketURIString = result;
- }
- }
- }
- }
-
- }
- return this.defaultSocketURIString;
- }
- return null;
- }
-
- /**
- * @return Returns the shutdownOnMasterFailure.
- */
- public boolean isShutdownOnMasterFailure() {
- return shutdownOnMasterFailure;
- }
-
- /**
- * @param shutdownOnMasterFailure
- * The shutdownOnMasterFailure to set.
- */
- public void setShutdownOnMasterFailure(boolean shutdownOnMasterFailure) {
- this.shutdownOnMasterFailure = shutdownOnMasterFailure;
- }
-
- public boolean isKeepDurableSubsActive() {
- return keepDurableSubsActive;
- }
-
- public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
- this.keepDurableSubsActive = keepDurableSubsActive;
- }
-
- public boolean isUseVirtualTopics() {
- return useVirtualTopics;
- }
-
- /**
- * Sets whether or not <a
- * href="http://activemq.apache.org/virtual-destinations.html">Virtual
- * Topics</a> should be supported by default if they have not been
- * explicitly configured.
- */
- public void setUseVirtualTopics(boolean useVirtualTopics) {
- this.useVirtualTopics = useVirtualTopics;
- }
-
- public DestinationInterceptor[] getDestinationInterceptors() {
- return destinationInterceptors;
- }
-
- public boolean isUseMirroredQueues() {
- return useMirroredQueues;
- }
-
- /**
- * Sets whether or not <a
- * href="http://activemq.apache.org/mirrored-queues.html">Mirrored
- * Queues</a> should be supported by default if they have not been
- * explicitly configured.
- */
- public void setUseMirroredQueues(boolean useMirroredQueues) {
- this.useMirroredQueues = useMirroredQueues;
- }
-
- /**
- * Sets the destination interceptors to use
- */
- public void setDestinationInterceptors(DestinationInterceptor[] destinationInterceptors) {
- this.destinationInterceptors = destinationInterceptors;
- }
-
- public ActiveMQDestination[] getDestinations() {
- return destinations;
- }
-
- /**
- * Sets the destinations which should be loaded/created on startup
- */
- public void setDestinations(ActiveMQDestination[] destinations) {
- this.destinations = destinations;
- }
-
- /**
- * @return the tempDataStore
- */
- public synchronized PListStore getTempDataStore() {
- if (tempDataStore == null) {
- if (!isPersistent()) {
- return null;
- }
-
- try {
- PersistenceAdapter pa = getPersistenceAdapter();
- if( pa!=null && pa instanceof PListStore) {
- return (PListStore) pa;
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- boolean result = true;
- boolean empty = true;
- try {
- File directory = getTmpDataDirectory();
- if (directory.exists() && directory.isDirectory()) {
- File[] files = directory.listFiles();
- if (files != null && files.length > 0) {
- empty = false;
- for (int i = 0; i < files.length; i++) {
- File file = files[i];
- if (!file.isDirectory()) {
- result &= file.delete();
- }
- }
- }
- }
- if (!empty) {
- String str = result ? "Successfully deleted" : "Failed to delete";
- LOG.info("{} temporary storage", str);
- }
-
- String clazz = "org.apache.activemq.store.kahadb.plist.PListStoreImpl";
- this.tempDataStore = (PListStore) getClass().getClassLoader().loadClass(clazz).newInstance();
- this.tempDataStore.setDirectory(getTmpDataDirectory());
- configureService(tempDataStore);
- this.tempDataStore.start();
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- return tempDataStore;
- }
-
- /**
- * @param tempDataStore
- * the tempDataStore to set
- */
- public void setTempDataStore(PListStore tempDataStore) {
- this.tempDataStore = tempDataStore;
- configureService(tempDataStore);
- try {
- tempDataStore.start();
- } catch (Exception e) {
- RuntimeException exception = new RuntimeException("Failed to start provided temp data store: " + tempDataStore, e);
- LOG.error(exception.getLocalizedMessage(), e);
- throw exception;
- }
- }
-
- public int getPersistenceThreadPriority() {
- return persistenceThreadPriority;
- }
-
- public void setPersistenceThreadPriority(int persistenceThreadPriority) {
- this.persistenceThreadPriority = persistenceThreadPriority;
- }
-
- /**
- * @return the useLocalHostBrokerName
- */
- public boolean isUseLocalHostBrokerName() {
- return this.useLocalHostBrokerName;
- }
-
- /**
- * @param useLocalHostBrokerName
- * the useLocalHostBrokerName to set
- */
- public void setUseLocalHostBrokerName(boolean useLocalHostBrokerName) {
- this.useLocalHostBrokerName = useLocalHostBrokerName;
- if (useLocalHostBrokerName && !started.get() && brokerName == null || brokerName == DEFAULT_BROKER_NAME) {
- brokerName = LOCAL_HOST_NAME;
- }
- }
-
- /**
- * Looks up and lazily creates if necessary the destination for the given
- * JMS name
- */
- public Destination getDestination(ActiveMQDestination destination) throws Exception {
- return getBroker().addDestination(getAdminConnectionContext(), destination,false);
- }
-
- public void removeDestination(ActiveMQDestination destination) throws Exception {
- getBroker().removeDestination(getAdminConnectionContext(), destination, 0);
- }
-
- public int getProducerSystemUsagePortion() {
- return producerSystemUsagePortion;
- }
-
- public void setProducerSystemUsagePortion(int producerSystemUsagePortion) {
- this.producerSystemUsagePortion = producerSystemUsagePortion;
- }
-
- public int getConsumerSystemUsagePortion() {
- return consumerSystemUsagePortion;
- }
-
- public void setConsumerSystemUsagePortion(int consumerSystemUsagePortion) {
- this.consumerSystemUsagePortion = consumerSystemUsagePortion;
- }
-
- public boolean isSplitSystemUsageForProducersConsumers() {
- return splitSystemUsageForProducersConsumers;
- }
-
- public void setSplitSystemUsageForProducersConsumers(boolean splitSystemUsageForProducersConsumers) {
- this.splitSystemUsageForProducersConsumers = splitSystemUsageForProducersConsumers;
- }
-
- public boolean isMonitorConnectionSplits() {
- return monitorConnectionSplits;
- }
-
- public void setMonitorConnectionSplits(boolean monitorConnectionSplits) {
- this.monitorConnectionSplits = monitorConnectionSplits;
- }
-
- public int getTaskRunnerPriority() {
- return taskRunnerPriority;
- }
-
- public void setTaskRunnerPriority(int taskRunnerPriority) {
- this.taskRunnerPriority = taskRunnerPriority;
- }
-
- public boolean isDedicatedTaskRunner() {
- return dedicatedTaskRunner;
- }
-
- public void setDedicatedTaskRunner(boolean dedicatedTaskRunner) {
- this.dedicatedTaskRunner = dedicatedTaskRunner;
- }
-
- public boolean isCacheTempDestinations() {
- return cacheTempDestinations;
- }
-
- public void setCacheTempDestinations(boolean cacheTempDestinations) {
- this.cacheTempDestinations = cacheTempDestinations;
- }
-
- public int getTimeBeforePurgeTempDestinations() {
- return timeBeforePurgeTempDestinations;
- }
-
- public void setTimeBeforePurgeTempDestinations(int timeBeforePurgeTempDestinations) {
- this.timeBeforePurgeTempDestinations = timeBeforePurgeTempDestinations;
- }
-
- public boolean isUseTempMirroredQueues() {
- return useTempMirroredQueues;
- }
-
- public void setUseTempMirroredQueues(boolean useTempMirroredQueues) {
- this.useTempMirroredQueues = useTempMirroredQueues;
- }
-
- public synchronized JobSchedulerStore getJobSchedulerStore() {
-
- // If support is off don't allow any scheduler even is user configured their own.
- if (!isSchedulerSupport()) {
- return null;
- }
-
- // If the user configured their own we use it even if persistence is disabled since
- // we don't know anything about their implementation.
- if (jobSchedulerStore == null) {
-
- if (!isPersistent()) {
- this.jobSchedulerStore = new InMemoryJobSchedulerStore();
- configureService(jobSchedulerStore);
- return this.jobSchedulerStore;
- }
-
- try {
- PersistenceAdapter pa = getPersistenceAdapter();
- if (pa != null) {
- this.jobSchedulerStore = pa.createJobSchedulerStore();
- jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
- configureService(jobSchedulerStore);
- return this.jobSchedulerStore;
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- } catch (UnsupportedOperationException ex) {
- // It's ok if the store doesn't implement a scheduler.
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
-
- try {
- PersistenceAdapter pa = getPersistenceAdapter();
- if (pa != null && pa instanceof JobSchedulerStore) {
- this.jobSchedulerStore = (JobSchedulerStore) pa;
- configureService(jobSchedulerStore);
- return this.jobSchedulerStore;
- }
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
-
- // Load the KahaDB store as a last resort, this only works if KahaDB is
- // included at runtime, otherwise this will fail. User should disable
- // scheduler support if this fails.
- try {
- String clazz = "org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter";
- PersistenceAdapter adaptor = (PersistenceAdapter)getClass().getClassLoader().loadClass(clazz).newInstance();
- jobSchedulerStore = adaptor.createJobSchedulerStore();
- jobSchedulerStore.setDirectory(getSchedulerDirectoryFile());
- configureService(jobSchedulerStore);
- LOG.info("JobScheduler using directory: {}", getSchedulerDirectoryFile());
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
- return jobSchedulerStore;
- }
-
- public void setJobSchedulerStore(JobSchedulerStore jobSchedulerStore) {
- this.jobSchedulerStore = jobSchedulerStore;
- configureService(jobSchedulerStore);
- }
-
- //
- // Implementation methods
- // -------------------------------------------------------------------------
- /**
- * Handles any lazy-creation helper properties which are added to make
- * things easier to configure inside environments such as Spring
- *
- * @throws Exception
- */
- protected void processHelperProperties() throws Exception {
- if (transportConnectorURIs != null) {
- for (int i = 0; i < transportConnectorURIs.length; i++) {
- String uri = transportConnectorURIs[i];
- addConnector(uri);
- }
- }
- if (networkConnectorURIs != null) {
- for (int i = 0; i < networkConnectorURIs.length; i++) {
- String uri = networkConnectorURIs[i];
- addNetworkConnector(uri);
- }
- }
- if (jmsBridgeConnectors != null) {
- for (int i = 0; i < jmsBridgeConnectors.length; i++) {
- addJmsConnector(jmsBridgeConnectors[i]);
- }
- }
- }
-
- public static int t = 0;
- /**
- * Check that the store usage limit is not greater than max usable
- * space and adjust if it is
- */
- protected void checkStoreUsageLimits() throws IOException {
- final SystemUsage usage = getSystemUsage();
-
- if (getPersistenceAdapter() != null) {
- PersistenceAdapter adapter = getPersistenceAdapter();
- File dir = adapter.getDirectory();
-
- if (dir != null) {
- dir = StoreUtil.findParentDirectory(dir);
-
- int usagePercent = usage.getStoreUsage().getPercentLimit();
- long storeLimit = usage.getStoreUsage().getLimit();
- long storeCurrent = usage.getStoreUsage().getUsage();
- long dirFreeSpace = dir.getUsableSpace();
-
- if (diskUsageCheckRegrowPercentChange && usagePercent > 0 &&
- storeLimit < (dirFreeSpace + storeCurrent)) {
- LOG.info("Usable disk space has been increased, attempting to regrow store limit to " +
- + usagePercent + "% of the partition size.");
-
- //trigger the recomputation of the absolute value by
- //calling setPercentLimit again
- usage.getStoreUsage().setPercentLimit(usagePercent);
- storeLimit = usage.getStoreUsage().getLimit();
- storeCurrent = usage.getStoreUsage().getUsage();
- }
-
- if (storeLimit > (dirFreeSpace + storeCurrent)) {
- if (usagePercent > 0) {
- LOG.warn("Store limit has been set to "
- + usagePercent + "% of the partition size but "
- + "there is not enough usable space.");
- }
-
- LOG.warn("Store limit is " + storeLimit / (1024 * 1024) +
- " mb (current store usage is " + storeCurrent / (1024 * 1024) +
- " mb). The data directory: " + dir.getAbsolutePath() +
- " only has " + dirFreeSpace / (1024 * 1024) +
- " mb of usable space - resetting to maximum available disk space: " +
- (dirFreeSpace + storeCurrent) / (1024 * 1024) + " mb");
- usage.getStoreUsage().setLimit(dirFreeSpace + storeCurrent);
- }
- }
-
- long maxJournalFileSize = 0;
- long storeLimit = usage.getStoreUsage().getLimit();
-
- if (adapter instanceof JournaledStore) {
- maxJournalFileSize = ((JournaledStore) adapter).getJournalMaxFileLength();
- }
-
- if (storeLimit < maxJournalFileSize) {
- LOG.error("Store limit is " + storeLimit / (1024 * 1024) +
- " mb, whilst the max journal file size for the store is: " +
- maxJournalFileSize / (1024 * 1024) + " mb, " +
- "the store will not accept any data when used.");
-
- }
- }
- }
-
- /**
- * Check that temporary usage limit is not greater than max usable
- * space and adjust if it is
- */
- protected void checkTmpStoreUsageLimits() throws IOException {
- final SystemUsage usage = getSystemUsage();
-
- File tmpDir = getTmpDataDirectory();
-
- if (tmpDir != null) {
- tmpDir = StoreUtil.findParentDirectory(tmpDir);
-
- long storeLimit = usage.getTempUsage().getLimit();
-<<<<<<< 4f8d56aaf60f99abe643e79c6c4940a571289a86
- long storeCurrent = usage.getTempUsage().getUsage();
- while (tmpDir != null && !tmpDir.isDirectory()) {
- tmpDir = tmpDir.getParentFile();
- }
- long dirFreeSpace = tmpDir.getUsableSpace();
- if (storeLimit > (dirFreeSpace + storeCurrent)) {
- LOG.warn("Temporary Store limit is " + storeLimit / (1024 * 1024) +
- " mb (current temporary store usage is " + storeCurrent / (1024 * 1024) +
- " mb). The temporary data directory: " + tmpDir.getAbsolutePath() +
- " only has " + dirFreeSpace / (1024 * 1024) +
- " mb of usable space - resetting to maximum available disk space: " +
- (dirFreeSpace + storeCurrent) / (1024 * 1024) + " mb");
- usage.getTempUsage().setLimit(dirFreeSpace + storeCurrent);
-=======
- long dirFreeSpace = tmpDir.getUsableSpace();
- int usagePercent = usage.getTempUsage().getPercentLimit();
-
-// if (diskUsageCheckRegrowPercentChange && usagePercent > 0 &&
-// storeLimit < (dirFreeSpace + storeCurrent)) {
-// LOG.info("Usable disk space has been increased, attempting to regrow store limit to " +
-// + usagePercent + "% of the partition size.");
-//
-// //trigger the recomputation of the absolute value by
-// //calling setPercentLimit again
-// usage.getStoreUsage().setPercentLimit(usagePercent);
-// storeLimit = usage.getStoreUsage().getLimit();
-// storeCurrent = usage.getStoreUsage().getUsage();
-// }
-
- if (storeLimit > dirFreeSpace) {
- if (usagePercent > 0) {
- LOG.warn("Temporary Store limit has been set to "
- + usagePercent + "% of the partition size but "
- + "there is not enough usable space.");
- }
-
- LOG.warn("Temporary Store limit is " + storeLimit / (1024 * 1024) +
- " mb, whilst the temporary data directory: " + tmpDir.getAbsolutePath() +
- " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space - resetting to maximum available " +
- dirFreeSpace / (1024 * 1024) + " mb.");
- usage.getTempUsage().setLimit(dirFreeSpace);
->>>>>>> https://issues.apache.org/jira/browse/AMQ-5965
- }
-
- if (isPersistent()) {
- long maxJournalFileSize;
-
- PListStore store = usage.getTempUsage().getStore();
- if (store != null && store instanceof JournaledStore) {
- maxJournalFileSize = ((JournaledStore) store).getJournalMaxFileLength();
- } else {
- maxJournalFileSize = DEFAULT_MAX_FILE_LENGTH;
- }
-
- if (storeLimit < maxJournalFileSize) {
- LOG.error("Temporary Store limit is " + storeLimit / (1024 * 1024) +
- " mb, whilst the max journal file size for the temporary store is: " +
- maxJournalFileSize / (1024 * 1024) + " mb, " +
- "the temp store will not accept any data when used.");
- }
- }
- }
- }
-
- /**
- * Schedules a periodic task based on schedulePeriodForDiskLimitCheck to
- * update store and temporary store limits if the amount of available space
- * plus current store size is less than the existin configured limit
- */
- protected void scheduleDiskUsageLimitsCheck() throws IOException {
- if (schedulePeriodForDiskUsageCheck > 0 &&
- (getPersistenceAdapter() != null || getTmpDataDirectory() != null)) {
- Runnable diskLimitCheckTask = new Runnable() {
- @Override
- public void run() {
- try {
- checkStoreUsageLimits();
- } catch (IOException e) {
- LOG.error("Failed to check persistent disk usage limits", e);
- }
-
- try {
- checkTmpStoreUsageLimits();
- } catch (IOException e) {
- LOG.error("Failed to check temporary store usage limits", e);
- }
- }
- };
- scheduler.executePeriodically(diskLimitCheckTask, schedulePeriodForDiskUsageCheck);
- }
- }
-
- protected void checkSystemUsageLimits() throws IOException {
- final SystemUsage usage = getSystemUsage();
- long memLimit = usage.getMemoryUsage().getLimit();
- long jvmLimit = Runtime.getRuntime().maxMemory();
-
- if (memLimit > jvmLimit) {
- usage.getMemoryUsage().setPercentOfJvmHeap(70);
- LOG.warn("Memory Usage for the Broker (" + memLimit / (1024 * 1024) +
- " mb) is more than the maximum available for the JVM: " +
- jvmLimit / (1024 * 1024) + " mb - resetting to 70% of maximum available: " + (usage.getMemoryUsage().getLimit() / (1024 * 1024)) + " mb");
- }
-
- //Check the persistent store and temp store limits if they exist
- //and schedule a periodic check to update disk limits if
- //schedulePeriodForDiskLimitCheck is set
- checkStoreUsageLimits();
- checkTmpStoreUsageLimits();
- scheduleDiskUsageLimitsCheck();
-
- if (getJobSchedulerStore() != null) {
- JobSchedulerStore scheduler = getJobSchedulerStore();
- File schedulerDir = scheduler.getDirectory();
- if (schedulerDir != null) {
-
- String schedulerDirPath = schedulerDir.getAbsolutePath();
- if (!schedulerDir.isAbsolute()) {
- schedulerDir = new File(schedulerDirPath);
- }
-
- while (schedulerDir != null && !schedulerDir.isDirectory()) {
- schedulerDir = schedulerDir.getParentFile();
- }
- long schedulerLimit = usage.getJobSchedulerUsage().getLimit();
- long dirFreeSpace = schedulerDir.getUsableSpace();
- if (schedulerLimit > dirFreeSpace) {
- LOG.warn("Job Scheduler Store limit is " + schedulerLimit / (1024 * 1024) +
- " mb, whilst the data directory: " + schedulerDir.getAbsolutePath() +
- " only has " + dirFreeSpace / (1024 * 1024) + " mb of usable space - resetting to " +
- dirFreeSpace / (1024 * 1024) + " mb.");
- usage.getJobSchedulerUsage().setLimit(dirFreeSpace);
- }
- }
- }
- }
-
- public void stopAllConnectors(ServiceStopper stopper) {
- for (Iterator<NetworkConnector> iter = getNetworkConnectors().iterator(); iter.hasNext();) {
- NetworkConnector connector = iter.next();
- unregisterNetworkConnectorMBean(connector);
- stopper.stop(connector);
- }
- for (Iterator<ProxyConnector> iter = getProxyConnectors().iterator(); iter.hasNext();) {
- ProxyConnector connector = iter.next();
- stopper.stop(connector);
- }
- for (Iterator<JmsConnector> iter = jmsConnectors.iterator(); iter.hasNext();) {
- JmsConnector connector = iter.next();
- stopper.stop(connector);
- }
- for (Iterator<TransportConnector> iter = getTransportConnectors().iterator(); iter.hasNext();) {
- TransportConnector connector = iter.next();
- try {
- unregisterConnectorMBean(connector);
- } catch (IOException e) {
- }
- stopper.stop(connector);
- }
- }
-
- protected TransportConnector registerConnectorMBean(TransportConnector connector) throws IOException {
- try {
- ObjectName objectName = createConnectorObjectName(connector);
- connector = connector.asManagedConnector(getManagementContext(), objectName);
- ConnectorViewMBean view = new ConnectorView(connector);
- AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
- return connector;
- } catch (Throwable e) {
- throw IOExceptionSupport.create("Transport Connector could not be registered in JMX: " + e, e);
- }
- }
-
- protected void unregisterConnectorMBean(TransportConnector connector) throws IOException {
- if (isUseJmx()) {
- try {
- ObjectName objectName = createConnectorObjectName(connector);
- getManagementContext().unregisterMBean(objectName);
- } catch (Throwable e) {
- throw IOExceptionSupport.create(
- "Transport Connector could not be unregistered in JMX: " + e.getMessage(), e);
- }
- }
- }
-
- protected PersistenceAdapter registerPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
- return adaptor;
- }
-
- protected void unregisterPersistenceAdapterMBean(PersistenceAdapter adaptor) throws IOException {
- if (isUseJmx()) {}
- }
-
- private ObjectName createConnectorObjectName(TransportConnector connector) throws MalformedObjectNameException {
- return BrokerMBeanSupport.createConnectorName(getBrokerObjectName(), "clientConnectors", connector.getName());
- }
-
- public void registerNetworkConnectorMBean(NetworkConnector connector) throws IOException {
- NetworkConnectorViewMBean view = new NetworkConnectorView(connector);
- try {
- ObjectName objectName = createNetworkConnectorObjectName(connector);
- connector.setObjectName(objectName);
- AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
- } catch (Throwable e) {
- throw IOExceptionSupport.create("Network Connector could not be registered in JMX: " + e.getMessage(), e);
- }
- }
-
- protected ObjectName createNetworkConnectorObjectName(NetworkConnector connector) throws MalformedObjectNameException {
- return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "networkConnectors", connector.getName());
- }
-
- public ObjectName createDuplexNetworkConnectorObjectName(String transport) throws MalformedObjectNameException {
- return BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "duplexNetworkConnectors", transport);
- }
-
- protected void unregisterNetworkConnectorMBean(NetworkConnector connector) {
- if (isUseJmx()) {
- try {
- ObjectName objectName = createNetworkConnectorObjectName(connector);
- getManagementContext().unregisterMBean(objectName);
- } catch (Exception e) {
- LOG.warn("Network Connector could not be unregistered from JMX due " + e.getMessage() + ". This exception is ignored.", e);
- }
- }
- }
-
- protected void registerProxyConnectorMBean(ProxyConnector connector) throws IOException {
- ProxyConnectorView view = new ProxyConnectorView(connector);
- try {
- ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "proxyConnectors", connector.getName());
- AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
- } catch (Throwable e) {
- throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
- }
- }
-
- protected void registerJmsConnectorMBean(JmsConnector connector) throws IOException {
- JmsConnectorView view = new JmsConnectorView(connector);
- try {
- ObjectName objectName = BrokerMBeanSupport.createNetworkConnectorName(getBrokerObjectName(), "jmsConnectors", connector.getName());
- AnnotatedMBean.registerMBean(getManagementContext(), view, objectName);
- } catch (Throwable e) {
- throw IOExceptionSupport.create("Broker could not be registered in JMX: " + e.getMessage(), e);
- }
- }
-
- /**
- * Factory method to create a new broker
- *
- * @
<TRUNCATED>
[2/2] activemq git commit: Removing inadvertantly checked in file
Posted by cs...@apache.org.
Removing inadvertantly checked in file
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/00d19e7b
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/00d19e7b
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/00d19e7b
Branch: refs/heads/master
Commit: 00d19e7b66e1d6e35adb43336fb5ec5bad41093e
Parents: cc9b9b0
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Mon Sep 21 17:13:36 2015 +0000
Committer: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Committed: Mon Sep 21 17:13:36 2015 +0000
----------------------------------------------------------------------
.../activemq/broker/BrokerService.java.orig | 3147 ------------------
1 file changed, 3147 deletions(-)
----------------------------------------------------------------------