You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 16:42:51 UTC
svn commit: r1187375 [20/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/
cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf...
Modified: qpid/branches/QPID-2519/java/broker/etc/config.xml
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/etc/config.xml?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/etc/config.xml (original)
+++ qpid/branches/QPID-2519/java/broker/etc/config.xml Fri Oct 21 14:42:12 2011
@@ -30,28 +30,27 @@
<connector>
<!-- To enable SSL edit the keystorePath and keystorePassword
and set enabled to true.
- To disasble Non-SSL port set sslOnly to true -->
+ To disable Non-SSL port set sslOnly to true -->
<ssl>
<enabled>false</enabled>
+ <port>5671</port>
<sslOnly>false</sslOnly>
- <keystorePath>/path/to/keystore.ks</keystorePath>
- <keystorePassword>keystorepass</keystorePassword>
+ <keyStorePath>/path/to/keystore.ks</keyStorePath>
+ <keyStorePassword>keystorepass</keyStorePassword>
</ssl>
- <qpidnio>false</qpidnio>
- <protectio>
- <enabled>false</enabled>
- <readBufferLimitSize>262144</readBufferLimitSize>
- <writeBufferLimitSize>262144</writeBufferLimitSize>
- </protectio>
- <transport>nio</transport>
<port>5672</port>
- <sslport>8672</sslport>
- <socketReceiveBuffer>32768</socketReceiveBuffer>
- <socketSendBuffer>32768</socketSendBuffer>
+ <socketReceiveBuffer>262144</socketReceiveBuffer>
+ <socketSendBuffer>262144</socketSendBuffer>
</connector>
<management>
<enabled>true</enabled>
- <jmxport>8999</jmxport>
+ <jmxport>
+ <registryServer>8999</registryServer>
+ <!--
+ If unspecified, connectorServer defaults to 100 + registryServer port.
+ <connectorServer>9099</connectionServer>
+ -->
+ </jmxport>
<ssl>
<enabled>false</enabled>
<!-- Update below path to your keystore location, or run the bin/create-example-ssl-stores(.sh|.bat)
@@ -69,10 +68,8 @@
</advanced>
<security>
- <principal-databases>
- <!-- Example use of Base64 encoded MD5 hashes for authentication via CRAM-MD5-Hashed -->
+ <pd-auth-manager>
<principal-database>
- <name>passwordfile</name>
<class>org.apache.qpid.server.security.auth.database.PlainPasswordFilePrincipalDatabase</class>
<attributes>
<attribute>
@@ -81,16 +78,11 @@
</attribute>
</attributes>
</principal-database>
- </principal-databases>
+ </pd-auth-manager>
<allow-all />
<msg-auth>false</msg-auth>
-
- <jmx>
- <access>${conf}/jmxremote.access</access>
- <principal-database>passwordfile</principal-database>
- </jmx>
</security>
<virtualhosts>${conf}/virtualhosts.xml</virtualhosts>
Modified: qpid/branches/QPID-2519/java/broker/etc/qpid-server.conf.jpp
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/etc/qpid-server.conf.jpp?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/etc/qpid-server.conf.jpp (original)
+++ qpid/branches/QPID-2519/java/broker/etc/qpid-server.conf.jpp Fri Oct 21 14:42:12 2011
@@ -17,8 +17,7 @@
# under the License.
#
-QPID_LIBS=$(build-classpath backport-util-concurrent \
- commons-beanutils \
+QPID_LIBS=$(build-classpath commons-beanutils \
commons-beanutils-core \
commons-cli \
commons-codec \
Modified: qpid/branches/QPID-2519/java/broker/etc/virtualhosts.xml
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/etc/virtualhosts.xml?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/etc/virtualhosts.xml (original)
+++ qpid/branches/QPID-2519/java/broker/etc/virtualhosts.xml Fri Oct 21 14:42:12 2011
@@ -31,7 +31,7 @@
<housekeeping>
<threadCount>2</threadCount>
- <expiredMessageCheckPeriod>20000</expiredMessageCheckPeriod>
+ <checkPeriod>20000</checkPeriod>
</housekeeping>
<exchanges>
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java Fri Oct 21 14:42:12 2011
@@ -694,7 +694,8 @@ public class QMFService implements Confi
public BrokerSchema.BrokerClass.QueueMoveMessagesMethodResponseCommand queueMoveMessages(final BrokerSchema.BrokerClass.QueueMoveMessagesMethodResponseCommandFactory factory,
final String srcQueue,
final String destQueue,
- final Long qty)
+ final Long qty,
+ final Map filter) // TODO: move based on group identifier
{
// TODO
return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
@@ -712,6 +713,46 @@ public class QMFService implements Confi
return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
}
+ public BrokerSchema.BrokerClass.GetTimestampConfigMethodResponseCommand getTimestampConfig(final BrokerSchema.BrokerClass.GetTimestampConfigMethodResponseCommandFactory factory)
+ {
+ // TODO: timestamp support
+ return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
+ }
+
+ public BrokerSchema.BrokerClass.SetTimestampConfigMethodResponseCommand setTimestampConfig(final BrokerSchema.BrokerClass.SetTimestampConfigMethodResponseCommandFactory factory,
+ final java.lang.Boolean receive)
+ {
+ // TODO: timestamp support
+ return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
+ }
+
+ public BrokerSchema.BrokerClass.CreateMethodResponseCommand create(final BrokerSchema.BrokerClass.CreateMethodResponseCommandFactory factory,
+ final String type,
+ final String name,
+ final Map properties,
+ final java.lang.Boolean lenient)
+ {
+ //TODO:
+ return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
+ }
+
+ public BrokerSchema.BrokerClass.DeleteMethodResponseCommand delete(final BrokerSchema.BrokerClass.DeleteMethodResponseCommandFactory factory,
+ final String type,
+ final String name,
+ final Map options)
+ {
+ //TODO:
+ return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
+ }
+
+ public BrokerSchema.BrokerClass.QueryMethodResponseCommand query(final BrokerSchema.BrokerClass.QueryMethodResponseCommandFactory factory,
+ final String type,
+ final String name)
+ {
+ //TODO:
+ return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
+ }
+
public UUID getId()
{
return _obj.getId();
@@ -1072,8 +1113,19 @@ public class QMFService implements Confi
return 0l;
}
+ public Boolean getFlowStopped()
+ {
+ return Boolean.FALSE;
+ }
+
+ public Long getFlowStoppedCount()
+ {
+ return 0L;
+ }
+
public BrokerSchema.QueueClass.PurgeMethodResponseCommand purge(final BrokerSchema.QueueClass.PurgeMethodResponseCommandFactory factory,
- final Long request)
+ final Long request,
+ final Map filter) // TODO: support for purge-by-group-identifier
{
try
{
@@ -1089,7 +1141,8 @@ public class QMFService implements Confi
public BrokerSchema.QueueClass.RerouteMethodResponseCommand reroute(final BrokerSchema.QueueClass.RerouteMethodResponseCommandFactory factory,
final Long request,
final Boolean useAltExchange,
- final String exchange)
+ final String exchange,
+ final Map filter) // TODO: support for re-route-by-group-identifier
{
//TODO
return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
@@ -1282,6 +1335,23 @@ public class QMFService implements Confi
{
return _obj.isShadow();
}
+
+ public Boolean getUserProxyAuth()
+ {
+ // TODO
+ return false;
+ }
+
+ public String getSaslMechanism()
+ {
+ // TODO
+ return null;
+ }
+ public Integer getSaslSsf()
+ {
+ // TODO
+ return 0;
+ }
}
private class SessionDelegate implements BrokerSchema.SessionDelegate
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java Fri Oct 21 14:42:12 2011
@@ -327,4 +327,74 @@ public class AMQBrokerManagerMBean exten
{
return getObjectNameForSingleInstanceMBean();
}
+
+ public void resetStatistics() throws Exception
+ {
+ getVirtualHost().resetStatistics();
+ }
+
+ public double getPeakMessageDeliveryRate()
+ {
+ return getVirtualHost().getMessageDeliveryStatistics().getPeak();
+ }
+
+ public double getPeakDataDeliveryRate()
+ {
+ return getVirtualHost().getDataDeliveryStatistics().getPeak();
+ }
+
+ public double getMessageDeliveryRate()
+ {
+ return getVirtualHost().getMessageDeliveryStatistics().getRate();
+ }
+
+ public double getDataDeliveryRate()
+ {
+ return getVirtualHost().getDataDeliveryStatistics().getRate();
+ }
+
+ public long getTotalMessagesDelivered()
+ {
+ return getVirtualHost().getMessageDeliveryStatistics().getTotal();
+ }
+
+ public long getTotalDataDelivered()
+ {
+ return getVirtualHost().getDataDeliveryStatistics().getTotal();
+ }
+
+ public double getPeakMessageReceiptRate()
+ {
+ return getVirtualHost().getMessageReceiptStatistics().getPeak();
+ }
+
+ public double getPeakDataReceiptRate()
+ {
+ return getVirtualHost().getDataReceiptStatistics().getPeak();
+ }
+
+ public double getMessageReceiptRate()
+ {
+ return getVirtualHost().getMessageReceiptStatistics().getRate();
+ }
+
+ public double getDataReceiptRate()
+ {
+ return getVirtualHost().getDataReceiptStatistics().getRate();
+ }
+
+ public long getTotalMessagesReceived()
+ {
+ return getVirtualHost().getMessageReceiptStatistics().getTotal();
+ }
+
+ public long getTotalDataReceived()
+ {
+ return getVirtualHost().getDataReceiptStatistics().getTotal();
+ }
+
+ public boolean isStatisticsEnabled()
+ {
+ return getVirtualHost().isStatisticsEnabled();
+ }
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Fri Oct 21 14:42:12 2011
@@ -22,6 +22,7 @@ package org.apache.qpid.server;
import org.apache.log4j.Logger;
+import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQMethodBody;
@@ -49,6 +50,7 @@ import org.apache.qpid.server.logging.Lo
import org.apache.qpid.server.logging.actors.AMQPChannelActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.MessageMetaData;
@@ -74,6 +76,7 @@ import org.apache.qpid.server.txn.AutoCo
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.TransportException;
import java.util.ArrayList;
import java.util.Collection;
@@ -136,11 +139,12 @@ public class AMQChannel implements Sessi
private final AtomicBoolean _suspended = new AtomicBoolean(false);
private ServerTransaction _transaction;
-
+
private final AtomicLong _txnStarts = new AtomicLong(0);
private final AtomicLong _txnCommits = new AtomicLong(0);
private final AtomicLong _txnRejects = new AtomicLong(0);
private final AtomicLong _txnCount = new AtomicLong(0);
+ private final AtomicLong _txnUpdateTime = new AtomicLong(0);
private final AMQProtocolSession _session;
private AtomicBoolean _closing = new AtomicBoolean(false);
@@ -199,7 +203,12 @@ public class AMQChannel implements Sessi
// theory
return !(_transaction instanceof AutoCommitTransaction);
}
-
+
+ public boolean inTransaction()
+ {
+ return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0;
+ }
+
private void incrementOutstandingTxnsIfNecessary()
{
if(isTransactional())
@@ -209,7 +218,7 @@ public class AMQChannel implements Sessi
_txnCount.compareAndSet(0,1);
}
}
-
+
private void decrementOutstandingTxnsIfNecessary()
{
if(isTransactional())
@@ -295,7 +304,6 @@ public class AMQChannel implements Sessi
});
deliverCurrentMessageIfComplete();
-
}
}
@@ -308,7 +316,6 @@ public class AMQChannel implements Sessi
try
{
_currentMessage.getStoredMessage().flushToStore();
-
final ArrayList<? extends BaseQueue> destinationQueues = _currentMessage.getDestinationQueues();
if(!checkMessageUserId(_currentMessage.getContentHeader()))
@@ -317,7 +324,7 @@ public class AMQChannel implements Sessi
}
else
{
- if(destinationQueues == null || _currentMessage.getDestinationQueues().isEmpty())
+ if(destinationQueues == null || destinationQueues.isEmpty())
{
if (_currentMessage.isMandatory() || _currentMessage.isImmediate())
{
@@ -325,7 +332,7 @@ public class AMQChannel implements Sessi
}
else
{
- _logger.warn("MESSAGE DISCARDED: No routes for message - " + createAMQMessage(_currentMessage));
+ _actor.message(ExchangeMessages.DISCARDMSG(_currentMessage.getExchange().asString(), _currentMessage.getRoutingKey()));
}
}
@@ -333,11 +340,15 @@ public class AMQChannel implements Sessi
{
_transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues, isTransactional()));
incrementOutstandingTxnsIfNecessary();
+ updateTransactionalActivity();
}
}
}
finally
{
+ long bodySize = _currentMessage.getSize();
+ long timestamp = ((BasicContentHeaderProperties) _currentMessage.getContentHeader().getProperties()).getTimestamp();
+ _session.registerMessageReceived(bodySize, timestamp);
_currentMessage = null;
}
}
@@ -375,6 +386,13 @@ public class AMQChannel implements Sessi
_currentMessage = null;
throw e;
}
+ catch (RuntimeException e)
+ {
+ // we want to make sure we don't keep a reference to the message in the
+ // event of an error
+ _currentMessage = null;
+ throw e;
+ }
}
protected void routeCurrentMessage() throws AMQException
@@ -425,7 +443,7 @@ public class AMQChannel implements Sessi
{
throw new AMQException("Consumer already exists with same tag: " + tag);
}
-
+
Subscription subscription =
SubscriptionFactoryImpl.INSTANCE.createSubscription(_channelId, _session, tag, acks, filters, noLocal, _creditManager);
@@ -446,6 +464,11 @@ public class AMQChannel implements Sessi
_tag2SubscriptionMap.remove(tag);
throw e;
}
+ catch (RuntimeException e)
+ {
+ _tag2SubscriptionMap.remove(tag);
+ throw e;
+ }
return tag;
}
@@ -503,7 +526,11 @@ public class AMQChannel implements Sessi
}
catch (AMQException e)
{
- _logger.error("Caught AMQException whilst attempting to reque:" + e);
+ _logger.error("Caught AMQException whilst attempting to requeue:" + e);
+ }
+ catch (TransportException e)
+ {
+ _logger.error("Caught TransportException whilst attempting to requeue:" + e);
}
getConfigStore().removeConfiguredObject(this);
@@ -794,6 +821,7 @@ public class AMQChannel implements Sessi
{
Collection<QueueEntry> ackedMessages = getAckedMessages(deliveryTag, multiple);
_transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages));
+ updateTransactionalActivity();
}
private Collection<QueueEntry> getAckedMessages(long deliveryTag, boolean multiple)
@@ -933,7 +961,7 @@ public class AMQChannel implements Sessi
finally
{
_rollingBack = false;
-
+
_txnRejects.incrementAndGet();
_txnStarts.incrementAndGet();
decrementOutstandingTxnsIfNecessary();
@@ -968,6 +996,17 @@ public class AMQChannel implements Sessi
}
+ /**
+ * Update last transaction activity timestamp
+ */
+ private void updateTransactionalActivity()
+ {
+ if (isTransactional())
+ {
+ _txnUpdateTime.set(System.currentTimeMillis());
+ }
+ }
+
public String toString()
{
return "["+_session.toString()+":"+_channelId+"]";
@@ -1016,6 +1055,7 @@ public class AMQChannel implements Sessi
public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag)
throws AMQException
{
+ _session.registerMessageDelivered(entry.getMessage().getSize());
getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(),
deliveryTag, sub.getConsumerTag());
}
@@ -1056,11 +1096,11 @@ public class AMQChannel implements Sessi
private boolean checkMessageUserId(ContentHeaderBody header)
{
AMQShortString userID =
- header.properties instanceof BasicContentHeaderProperties
- ? ((BasicContentHeaderProperties) header.properties).getUserId()
+ header.getProperties() instanceof BasicContentHeaderProperties
+ ? ((BasicContentHeaderProperties) header.getProperties()).getUserId()
: null;
- return (!MSG_AUTH || _session.getPrincipal().getName().equals(userID == null? "" : userID.toString()));
+ return (!MSG_AUTH || _session.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString()));
}
@@ -1402,9 +1442,41 @@ public class AMQChannel implements Sessi
{
return _createTime;
}
-
+
public void mgmtClose() throws AMQException
{
_session.mgmtCloseChannel(_channelId);
}
+
+ public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
+ {
+ if (inTransaction())
+ {
+ long currentTime = System.currentTimeMillis();
+ long openTime = currentTime - _transaction.getTransactionStartTime();
+ long idleTime = currentTime - _txnUpdateTime.get();
+
+ // Log a warning on idle or open transactions
+ if (idleWarn > 0L && idleTime > idleWarn)
+ {
+ CurrentActor.get().message(_logSubject, ChannelMessages.IDLE_TXN(idleTime));
+ _logger.warn("IDLE TRANSACTION ALERT " + _logSubject.toString() + " " + idleTime + " ms");
+ }
+ else if (openWarn > 0L && openTime > openWarn)
+ {
+ CurrentActor.get().message(_logSubject, ChannelMessages.OPEN_TXN(openTime));
+ _logger.warn("OPEN TRANSACTION ALERT " + _logSubject.toString() + " " + openTime + " ms");
+ }
+
+ // Close connection for idle or open transactions that have timed out
+ if (idleClose > 0L && idleTime > idleClose)
+ {
+ getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out");
+ }
+ else if (openClose > 0L && openTime > openClose)
+ {
+ getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out");
+ }
+ }
+ }
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/Main.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/Main.java Fri Oct 21 14:42:12 2011
@@ -20,17 +20,6 @@
*/
package org.apache.qpid.server;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.EnumSet;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
@@ -38,29 +27,9 @@ import org.apache.commons.cli.OptionBuil
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
-import org.apache.log4j.Logger;
-import org.apache.log4j.PropertyConfigurator;
-import org.apache.log4j.xml.QpidLog4JConfigurator;
-import org.apache.qpid.common.QpidProperties;
-import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.server.configuration.ServerConfiguration;
-import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean;
-import org.apache.qpid.server.information.management.ServerInformationMBean;
-import org.apache.qpid.server.logging.SystemOutMessageLogger;
-import org.apache.qpid.server.logging.actors.BrokerActor;
-import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.GenericActor;
-import org.apache.qpid.server.logging.management.LoggingManagementMBean;
-import org.apache.qpid.server.logging.messages.BrokerMessages;
-import org.apache.qpid.server.protocol.AMQProtocolEngineFactory;
-import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
-import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory.VERSION;
+import org.apache.qpid.server.Broker.InitException;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
-import org.apache.qpid.server.transport.QpidAcceptor;
-import org.apache.qpid.ssl.SSLContextFactory;
-import org.apache.qpid.transport.NetworkDriver;
-import org.apache.qpid.transport.network.mina.MINANetworkDriver;
+
/**
* Main entry point for AMQPD.
@@ -68,41 +37,129 @@ import org.apache.qpid.transport.network
*/
public class Main
{
- private static Logger _logger;
- private static final String DEFAULT_CONFIG_FILE = "etc/config.xml";
+ private static final Option OPTION_HELP = new Option("h", "help", false, "print this message");
- public static final String DEFAULT_LOG_CONFIG_FILENAME = "log4j.xml";
- public static final String QPID_HOME = "QPID_HOME";
- private static final int IPV4_ADDRESS_LENGTH = 4;
+ private static final Option OPTION_VERSION = new Option("v", "version", false, "print the version information and exit");
- private static final char IPV4_LITERAL_SEPARATOR = '.';
+ private static final Option OPTION_CONFIG_FILE =
+ OptionBuilder.withArgName("file").hasArg().withDescription("use given configuration file").withLongOpt("config")
+ .create("c");
+
+ private static final Option OPTION_PORT =
+ OptionBuilder.withArgName("port").hasArg()
+ .withDescription("listen on the specified port. Overrides any value in the config file")
+ .withLongOpt("port").create("p");
+
+ private static final Option OPTION_SSLPORT =
+ OptionBuilder.withArgName("port").hasArg()
+ .withDescription("SSL port. Overrides any value in the config file")
+ .withLongOpt("sslport").create("s");
+
+ private static final Option OPTION_EXCLUDE_0_10 =
+ OptionBuilder.withArgName("port").hasArg()
+ .withDescription("when listening on the specified port do not accept AMQP0-10 connections. The specified port must be one specified on the command line")
+ .withLongOpt("exclude-0-10").create();
+
+ private static final Option OPTION_EXCLUDE_0_9_1 =
+ OptionBuilder.withArgName("port").hasArg()
+ .withDescription("when listening on the specified port do not accept AMQP0-9-1 connections. The specified port must be one specified on the command line")
+ .withLongOpt("exclude-0-9-1").create();
+
+ private static final Option OPTION_EXCLUDE_0_9 =
+ OptionBuilder.withArgName("port").hasArg()
+ .withDescription("when listening on the specified port do not accept AMQP0-9 connections. The specified port must be one specified on the command line")
+ .withLongOpt("exclude-0-9").create();
+
+ private static final Option OPTION_EXCLUDE_0_8 =
+ OptionBuilder.withArgName("port").hasArg()
+ .withDescription("when listening on the specified port do not accept AMQP0-8 connections. The specified port must be one specified on the command line")
+ .withLongOpt("exclude-0-8").create();
+
+ private static final Option OPTION_JMX_PORT_REGISTRY_SERVER =
+ OptionBuilder.withArgName("port").hasArg()
+ .withDescription("listen on the specified management (registry server) port. Overrides any value in the config file")
+ .withLongOpt("jmxregistryport").create("m");
+
+ private static final Option OPTION_JMX_PORT_CONNECTOR_SERVER =
+ OptionBuilder.withArgName("port").hasArg()
+ .withDescription("listen on the specified management (connector server) port. Overrides any value in the config file")
+ .withLongOpt("jmxconnectorport").create();
+
+ private static final Option OPTION_BIND =
+ OptionBuilder.withArgName("address").hasArg()
+ .withDescription("bind to the specified address. Overrides any value in the config file")
+ .withLongOpt("bind").create("b");
+
+ private static final Option OPTION_LOG_CONFIG_FILE =
+ OptionBuilder.withArgName("file").hasArg()
+ .withDescription("use the specified log4j xml configuration file. By "
+ + "default looks for a file named " + BrokerOptions.DEFAULT_LOG_CONFIG_FILE
+ + " in the same directory as the configuration file").withLongOpt("logconfig").create("l");
+
+ private static final Option OPTION_LOG_WATCH =
+ OptionBuilder.withArgName("period").hasArg()
+ .withDescription("monitor the log file configuration file for changes. Units are seconds. "
+ + "Zero means do not check for changes.").withLongOpt("logwatch").create("w");
+
+ private static final Options OPTIONS = new Options();
+
+ static
+ {
+ OPTIONS.addOption(OPTION_HELP);
+ OPTIONS.addOption(OPTION_VERSION);
+ OPTIONS.addOption(OPTION_CONFIG_FILE);
+ OPTIONS.addOption(OPTION_LOG_CONFIG_FILE);
+ OPTIONS.addOption(OPTION_LOG_WATCH);
+ OPTIONS.addOption(OPTION_PORT);
+ OPTIONS.addOption(OPTION_SSLPORT);
+ OPTIONS.addOption(OPTION_EXCLUDE_0_10);
+ OPTIONS.addOption(OPTION_EXCLUDE_0_9_1);
+ OPTIONS.addOption(OPTION_EXCLUDE_0_9);
+ OPTIONS.addOption(OPTION_EXCLUDE_0_8);
+ OPTIONS.addOption(OPTION_BIND);
+
+ OPTIONS.addOption(OPTION_JMX_PORT_REGISTRY_SERVER);
+ OPTIONS.addOption(OPTION_JMX_PORT_CONNECTOR_SERVER);
+ }
- protected static class InitException extends Exception
+ private CommandLine commandLine;
+
+ public static void main(String[] args)
{
- InitException(String msg, Throwable cause)
+ //if the -Dlog4j.configuration property has not been set, enable the init override
+ //to stop Log4J wondering off and picking up the first log4j.xml/properties file it
+ //finds from the classpath when we get the first Loggers
+ if(System.getProperty("log4j.configuration") == null)
{
- super(msg, cause);
+ System.setProperty("log4j.defaultInitOverride", "true");
}
- }
- protected final Options options = new Options();
- protected CommandLine commandLine;
+ new Main(args);
+ }
- protected Main(String[] args)
+ public Main(final String[] args)
{
- setOptions(options);
if (parseCommandline(args))
{
- execute();
+ try
+ {
+ execute();
+ }
+ catch(Throwable e)
+ {
+ System.err.println("Exception during startup: " + e);
+ e.printStackTrace();
+ shutdown(1);
+ }
}
}
- protected boolean parseCommandline(String[] args)
+ protected boolean parseCommandline(final String[] args)
{
try
{
- commandLine = new PosixParser().parse(options, args);
+ commandLine = new PosixParser().parse(OPTIONS, args);
return true;
}
@@ -110,509 +167,129 @@ public class Main
{
System.err.println("Error: " + e.getMessage());
HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp("Qpid", options, true);
+ formatter.printHelp("Qpid", OPTIONS, true);
return false;
}
}
- @SuppressWarnings("static-access")
- protected void setOptions(Options options)
- {
- Option help = new Option("h", "help", false, "print this message");
- Option version = new Option("v", "version", false, "print the version information and exit");
- Option configFile =
- OptionBuilder.withArgName("file").hasArg().withDescription("use given configuration file").withLongOpt("config")
- .create("c");
- Option port =
- OptionBuilder.withArgName("port").hasArg()
- .withDescription("listen on the specified port. Overrides any value in the config file")
- .withLongOpt("port").create("p");
-
- Option exclude0_10 =
- OptionBuilder.withArgName("exclude-0-10").hasArg()
- .withDescription("when listening on the specified port do not accept AMQP0-10 connections. The specified port must be one specified on the command line")
- .withLongOpt("exclude-0-10").create();
-
- Option exclude0_9_1 =
- OptionBuilder.withArgName("exclude-0-9-1").hasArg()
- .withDescription("when listening on the specified port do not accept AMQP0-9-1 connections. The specified port must be one specified on the command line")
- .withLongOpt("exclude-0-9-1").create();
-
-
- Option exclude0_9 =
- OptionBuilder.withArgName("exclude-0-9").hasArg()
- .withDescription("when listening on the specified port do not accept AMQP0-9 connections. The specified port must be one specified on the command line")
- .withLongOpt("exclude-0-9").create();
-
-
- Option exclude0_8 =
- OptionBuilder.withArgName("exclude-0-8").hasArg()
- .withDescription("when listening on the specified port do not accept AMQP0-8 connections. The specified port must be one specified on the command line")
- .withLongOpt("exclude-0-8").create();
-
-
- Option mport =
- OptionBuilder.withArgName("mport").hasArg()
- .withDescription("listen on the specified management port. Overrides any value in the config file")
- .withLongOpt("mport").create("m");
-
-
- Option bind =
- OptionBuilder.withArgName("bind").hasArg()
- .withDescription("bind to the specified address. Overrides any value in the config file")
- .withLongOpt("bind").create("b");
- Option logconfig =
- OptionBuilder.withArgName("logconfig").hasArg()
- .withDescription("use the specified log4j xml configuration file. By "
- + "default looks for a file named " + DEFAULT_LOG_CONFIG_FILENAME
- + " in the same directory as the configuration file").withLongOpt("logconfig").create("l");
- Option logwatchconfig =
- OptionBuilder.withArgName("logwatch").hasArg()
- .withDescription("monitor the log file configuration file for changes. Units are seconds. "
- + "Zero means do not check for changes.").withLongOpt("logwatch").create("w");
-
- options.addOption(help);
- options.addOption(version);
- options.addOption(configFile);
- options.addOption(logconfig);
- options.addOption(logwatchconfig);
- options.addOption(port);
- options.addOption(exclude0_10);
- options.addOption(exclude0_9_1);
- options.addOption(exclude0_9);
- options.addOption(exclude0_8);
- options.addOption(mport);
- options.addOption(bind);
- }
-
- protected void execute()
+ protected void execute() throws Exception
{
- // note this understands either --help or -h. If an option only has a long name you can use that but if
- // an option has a short name and a long name you must use the short name here.
- if (commandLine.hasOption("h"))
+ BrokerOptions options = new BrokerOptions();
+ String configFile = commandLine.getOptionValue(OPTION_CONFIG_FILE.getOpt());
+ if(configFile != null)
{
- HelpFormatter formatter = new HelpFormatter();
- formatter.printHelp("Qpid", options, true);
+ options.setConfigFile(configFile);
}
- else if (commandLine.hasOption("v"))
- {
- String ver = QpidProperties.getVersionString();
- StringBuilder protocol = new StringBuilder("AMQP version(s) [major.minor]: ");
-
- boolean first = true;
- for (ProtocolVersion pv : ProtocolVersion.getSupportedProtocolVersions())
- {
- if (first)
- {
- first = false;
- }
- else
- {
- protocol.append(", ");
- }
-
- protocol.append(pv.getMajorVersion()).append('-').append(pv.getMinorVersion());
-
- }
-
- System.out.println(ver + " (" + protocol + ")");
- }
- else
+ String logWatchConfig = commandLine.getOptionValue(OPTION_LOG_WATCH.getOpt());
+ if(logWatchConfig != null)
{
- try
- {
- CurrentActor.set(new BrokerActor(new SystemOutMessageLogger()));
- startup();
- CurrentActor.remove();
- }
- catch (InitException e)
- {
- System.out.println("Initialisation Error : " + e.getMessage());
- shutdown(1);
- }
- catch (Throwable e)
- {
- System.out.println("Error initialising message broker: " + e);
- e.printStackTrace();
- shutdown(1);
- }
+ options.setLogWatchFrequency(Integer.parseInt(logWatchConfig));
}
- }
-
- protected void shutdown(int status)
- {
- ApplicationRegistry.removeAll();
- System.exit(status);
- }
- protected void startup() throws Exception
- {
- final String QpidHome = System.getProperty(QPID_HOME);
- final File defaultConfigFile = new File(QpidHome, DEFAULT_CONFIG_FILE);
- final File configFile = new File(commandLine.getOptionValue("c", defaultConfigFile.getPath()));
- if (!configFile.exists())
- {
- String error = "File " + configFile + " could not be found. Check the file exists and is readable.";
-
- if (QpidHome == null)
- {
- error = error + "\nNote: " + QPID_HOME + " is not set.";
- }
-
- throw new InitException(error, null);
- }
- else
+ String logConfig = commandLine.getOptionValue(OPTION_LOG_CONFIG_FILE.getOpt());
+ if(logConfig != null)
{
- CurrentActor.get().message(BrokerMessages.CONFIG(configFile.getAbsolutePath()));
+ options.setLogConfigFile(logConfig);
}
- String logConfig = commandLine.getOptionValue("l");
- String logWatchConfig = commandLine.getOptionValue("w", "0");
-
- int logWatchTime = 0;
- try
- {
- logWatchTime = Integer.parseInt(logWatchConfig);
- }
- catch (NumberFormatException e)
+ String jmxPortRegistryServer = commandLine.getOptionValue(OPTION_JMX_PORT_REGISTRY_SERVER.getOpt());
+ if(jmxPortRegistryServer != null)
{
- System.err.println("Log watch configuration value of " + logWatchConfig + " is invalid. Must be "
- + "a non-negative integer. Using default of zero (no watching configured");
+ options.setJmxPortRegistryServer(Integer.parseInt(jmxPortRegistryServer));
}
- File logConfigFile;
- if (logConfig != null)
- {
- logConfigFile = new File(logConfig);
- configureLogging(logConfigFile, logWatchTime);
- }
- else
+ String jmxPortConnectorServer = commandLine.getOptionValue(OPTION_JMX_PORT_CONNECTOR_SERVER.getLongOpt());
+ if(jmxPortConnectorServer != null)
{
- File configFileDirectory = configFile.getParentFile();
- logConfigFile = new File(configFileDirectory, DEFAULT_LOG_CONFIG_FILENAME);
- configureLogging(logConfigFile, logWatchTime);
+ options.setJmxPortConnectorServer(Integer.parseInt(jmxPortConnectorServer));
}
- ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile);
- ServerConfiguration serverConfig = config.getConfiguration();
- updateManagementPort(serverConfig, commandLine.getOptionValue("m"));
-
- ApplicationRegistry.initialise(config);
-
- // We have already loaded the BrokerMessages class by this point so we
- // need to refresh the locale setting incase we had a different value in
- // the configuration.
- BrokerMessages.reload();
-
- // AR.initialise() sets and removes its own actor so we now need to set the actor
- // for the remainder of the startup, and the default actor if the stack is empty
- CurrentActor.set(new BrokerActor(config.getCompositeStartupMessageLogger()));
- CurrentActor.setDefault(new BrokerActor(config.getRootMessageLogger()));
- GenericActor.setDefaultMessageLogger(config.getRootMessageLogger());
-
-
- try
+ String bindAddr = commandLine.getOptionValue(OPTION_BIND.getOpt());
+ if (bindAddr != null)
{
- configureLoggingManagementMBean(logConfigFile, logWatchTime);
-
- ConfigurationManagementMBean configMBean = new ConfigurationManagementMBean();
- configMBean.register();
-
- ServerInformationMBean sysInfoMBean =
- new ServerInformationMBean(QpidProperties.getBuildVersion(), QpidProperties.getReleaseVersion());
- sysInfoMBean.register();
-
-
- String[] portStr = commandLine.getOptionValues("p");
-
- Set<Integer> ports = new HashSet<Integer>();
- Set<Integer> exclude_0_10 = new HashSet<Integer>();
- Set<Integer> exclude_0_9_1 = new HashSet<Integer>();
- Set<Integer> exclude_0_9 = new HashSet<Integer>();
- Set<Integer> exclude_0_8 = new HashSet<Integer>();
-
- if(portStr == null || portStr.length == 0)
- {
-
- parsePortList(ports, serverConfig.getPorts());
- parsePortList(exclude_0_10, serverConfig.getPortExclude010());
- parsePortList(exclude_0_9_1, serverConfig.getPortExclude091());
- parsePortList(exclude_0_9, serverConfig.getPortExclude09());
- parsePortList(exclude_0_8, serverConfig.getPortExclude08());
-
- }
- else
- {
- parsePortArray(ports, portStr);
- parsePortArray(exclude_0_10, commandLine.getOptionValues("exclude-0-10"));
- parsePortArray(exclude_0_9_1, commandLine.getOptionValues("exclude-0-9-1"));
- parsePortArray(exclude_0_9, commandLine.getOptionValues("exclude-0-9"));
- parsePortArray(exclude_0_8, commandLine.getOptionValues("exclude-0-8"));
-
- }
-
-
-
-
- String bindAddr = commandLine.getOptionValue("b");
- if (bindAddr == null)
- {
- bindAddr = serverConfig.getBind();
- }
- InetAddress bindAddress = null;
-
-
-
- if (bindAddr.equals("wildcard"))
- {
- bindAddress = new InetSocketAddress(0).getAddress();
- }
- else
- {
- bindAddress = InetAddress.getByAddress(parseIP(bindAddr));
- }
-
- String hostName = bindAddress.getCanonicalHostName();
-
-
- String keystorePath = serverConfig.getKeystorePath();
- String keystorePassword = serverConfig.getKeystorePassword();
- String certType = serverConfig.getCertType();
- SSLContextFactory sslFactory = null;
-
- if (!serverConfig.getSSLOnly())
- {
-
- for(int port : ports)
- {
-
- NetworkDriver driver = new MINANetworkDriver();
-
- Set<VERSION> supported = EnumSet.allOf(VERSION.class);
-
- if(exclude_0_10.contains(port))
- {
- supported.remove(VERSION.v0_10);
- }
-
- if(exclude_0_9_1.contains(port))
- {
- supported.remove(VERSION.v0_9_1);
- }
- if(exclude_0_9.contains(port))
- {
- supported.remove(VERSION.v0_9);
- }
- if(exclude_0_8.contains(port))
- {
- supported.remove(VERSION.v0_8);
- }
-
- MultiVersionProtocolEngineFactory protocolEngineFactory =
- new MultiVersionProtocolEngineFactory(hostName, supported);
-
-
-
- driver.bind(port, new InetAddress[]{bindAddress}, protocolEngineFactory,
- serverConfig.getNetworkConfiguration(), null);
- ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, port),
- new QpidAcceptor(driver,"TCP"));
- CurrentActor.get().message(BrokerMessages.LISTENING("TCP", port));
-
- }
-
- }
-
- if (serverConfig.getEnableSSL())
- {
- sslFactory = new SSLContextFactory(keystorePath, keystorePassword, certType);
- NetworkDriver driver = new MINANetworkDriver();
- driver.bind(serverConfig.getSSLPort(), new InetAddress[]{bindAddress},
- new AMQProtocolEngineFactory(), serverConfig.getNetworkConfiguration(), sslFactory);
- ApplicationRegistry.getInstance().addAcceptor(new InetSocketAddress(bindAddress, serverConfig.getSSLPort()),
- new QpidAcceptor(driver,"TCP"));
- CurrentActor.get().message(BrokerMessages.LISTENING("TCP/SSL", serverConfig.getSSLPort()));
- }
-
- CurrentActor.get().message(BrokerMessages.READY());
-
- }
- finally
- {
- // Startup is complete so remove the AR initialised Startup actor
- CurrentActor.remove();
+ options.setBind(bindAddr);
}
-
-
- }
-
- private void parsePortArray(Set<Integer> ports, String[] portStr)
- throws InitException
- {
+ String[] portStr = commandLine.getOptionValues(OPTION_PORT.getOpt());
if(portStr != null)
{
- for(int i = 0; i < portStr.length; i++)
+ parsePortArray(options, portStr, false);
+ for(ProtocolExclusion pe : ProtocolExclusion.values())
{
- try
- {
- ports.add(Integer.parseInt(portStr[i]));
- }
- catch (NumberFormatException e)
- {
- throw new InitException("Invalid port: " + portStr[i], e);
- }
+ parsePortArray(options, commandLine.getOptionValues(pe.getExcludeName()), pe);
}
}
- }
- private void parsePortList(Set<Integer> output, List input)
- throws InitException
- {
- if(input != null)
+ String[] sslPortStr = commandLine.getOptionValues(OPTION_SSLPORT.getOpt());
+ if(sslPortStr != null)
{
- for(Object port : input)
+ parsePortArray(options, sslPortStr, true);
+ for(ProtocolExclusion pe : ProtocolExclusion.values())
{
- try
- {
- output.add(Integer.parseInt(String.valueOf(port)));
- }
- catch (NumberFormatException e)
- {
- throw new InitException("Invalid port: " + port, e);
- }
- }
- }
- }
-
- /**
- * Update the configuration data with the management port.
- * @param configuration
- * @param managementPort The string from the command line
- */
- private void updateManagementPort(ServerConfiguration configuration, String managementPort)
- {
- if (managementPort != null)
- {
- try
- {
- configuration.setJMXManagementPort(Integer.parseInt(managementPort));
- }
- catch (NumberFormatException e)
- {
- _logger.warn("Invalid management port: " + managementPort + " will use:" + configuration.getJMXManagementPort(), e);
+ parsePortArray(options, commandLine.getOptionValues(pe.getExcludeName()), pe);
}
}
+
+ startBroker(options);
}
- public static void main(String[] args)
+ protected void startBroker(final BrokerOptions options) throws Exception
{
- //if the -Dlog4j.configuration property has not been set, enable the init override
- //to stop Log4J wondering off and picking up the first log4j.xml/properties file it
- //finds from the classpath when we get the first Loggers
- if(System.getProperty("log4j.configuration") == null)
- {
- System.setProperty("log4j.defaultInitOverride", "true");
- }
-
- //now that the override status is know, we can instantiate the Loggers
- _logger = Logger.getLogger(Main.class);
-
- new Main(args);
+ Broker broker = new Broker();
+ broker.startup(options);
}
- private byte[] parseIP(String address) throws Exception
+ protected void shutdown(final int status)
{
- char[] literalBuffer = address.toCharArray();
- int byteCount = 0;
- int currByte = 0;
- byte[] ip = new byte[IPV4_ADDRESS_LENGTH];
- for (int i = 0; i < literalBuffer.length; i++)
- {
- char currChar = literalBuffer[i];
- if ((currChar >= '0') && (currChar <= '9'))
- {
- currByte = (currByte * 10) + (Character.digit(currChar, 10) & 0xFF);
- }
-
- if (currChar == IPV4_LITERAL_SEPARATOR || (i + 1 == literalBuffer.length))
- {
- ip[byteCount++] = (byte) currByte;
- currByte = 0;
- }
- }
-
- if (byteCount != 4)
- {
- throw new Exception("Invalid IP address: " + address);
- }
- return ip;
+ ApplicationRegistry.remove();
+ System.exit(status);
}
- private void configureLogging(File logConfigFile, int logWatchTime) throws InitException, IOException
+ private static void parsePortArray(final BrokerOptions options,final Object[] ports,
+ final boolean ssl) throws InitException
{
- if (logConfigFile.exists() && logConfigFile.canRead())
+ if(ports != null)
{
- CurrentActor.get().message(BrokerMessages.LOG_CONFIG(logConfigFile.getAbsolutePath()));
-
- if (logWatchTime > 0)
+ for(int i = 0; i < ports.length; i++)
{
- System.out.println("log file " + logConfigFile.getAbsolutePath() + " will be checked for changes every "
- + logWatchTime + " seconds");
- // log4j expects the watch interval in milliseconds
try
{
- QpidLog4JConfigurator.configureAndWatch(logConfigFile.getPath(), logWatchTime * 1000);
- }
- catch (Exception e)
- {
- throw new InitException(e.getMessage(),e);
- }
- }
- else
- {
- try
- {
- QpidLog4JConfigurator.configure(logConfigFile.getPath());
+ if(ssl)
+ {
+ options.addSSLPort(Integer.parseInt(String.valueOf(ports[i])));
+ }
+ else
+ {
+ options.addPort(Integer.parseInt(String.valueOf(ports[i])));
+ }
}
- catch (Exception e)
+ catch (NumberFormatException e)
{
- throw new InitException(e.getMessage(),e);
+ throw new InitException("Invalid port: " + ports[i], e);
}
}
}
- else
- {
- System.err.println("Logging configuration error: unable to read file " + logConfigFile.getAbsolutePath());
- System.err.println("Using the fallback internal log4j.properties configuration");
+ }
- InputStream propsFile = this.getClass().getResourceAsStream("/log4j.properties");
- if(propsFile == null)
- {
- throw new IOException("Unable to load the fallback internal log4j.properties configuration file");
- }
- else
+ private static void parsePortArray(final BrokerOptions options, final Object[] ports,
+ final ProtocolExclusion excludedProtocol) throws InitException
+ {
+ if(ports != null)
+ {
+ for(int i = 0; i < ports.length; i++)
{
try
{
- Properties fallbackProps = new Properties();
- fallbackProps.load(propsFile);
- PropertyConfigurator.configure(fallbackProps);
+ options.addExcludedPort(excludedProtocol,
+ Integer.parseInt(String.valueOf(ports[i])));
}
- finally
+ catch (NumberFormatException e)
{
- propsFile.close();
+ throw new InitException("Invalid port for exclusion: " + ports[i], e);
}
}
}
}
-
- private void configureLoggingManagementMBean(File logConfigFile, int logWatchTime) throws Exception
- {
- LoggingManagementMBean blm = new LoggingManagementMBean(logConfigFile.getPath(),logWatchTime);
-
- blm.register();
- }
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java Fri Oct 21 14:42:12 2011
@@ -20,6 +20,8 @@
package org.apache.qpid.server.configuration;
+import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
+
import java.io.File;
import java.util.Collections;
import java.util.HashMap;
@@ -37,33 +39,28 @@ import org.apache.commons.configuration.
import org.apache.commons.configuration.SystemConfiguration;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean;
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.signal.SignalHandlerTask;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import org.apache.qpid.transport.NetworkDriverConfiguration;
-
-import sun.misc.Signal;
-import sun.misc.SignalHandler;
-public class ServerConfiguration extends ConfigurationPlugin implements SignalHandler
+public class ServerConfiguration extends ConfigurationPlugin
{
protected static final Logger _logger = Logger.getLogger(ServerConfiguration.class);
// Default Configuration values
- public static final int DEFAULT_BUFFER_READ_LIMIT_SIZE = 262144;
- public static final int DEFAULT_BUFFER_WRITE_LIMIT_SIZE = 262144;
- public static final boolean DEFAULT_BROKER_CONNECTOR_PROTECTIO_ENABLED = false;
+ public static final int DEFAULT_BUFFER_SIZE = 262144;
public static final String DEFAULT_STATUS_UPDATES = "on";
public static final String SECURITY_CONFIG_RELOADED = "SECURITY CONFIGURATION RELOADED";
public static final int DEFAULT_FRAME_SIZE = 65536;
public static final int DEFAULT_PORT = 5672;
- public static final int DEFAULT_SSL_PORT = 8672;
+ public static final int DEFAULT_SSL_PORT = 5671;
public static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L;
- public static final int DEFAULT_JMXPORT = 8999;
-
+ public static final int DEFAULT_JMXPORT_REGISTRYSERVER = 8999;
+ public static final int JMXPORT_CONNECTORSERVER_OFFSET = 100;
+
public static final String QPID_HOME = "QPID_HOME";
public static final String QPID_WORK = "QPID_WORK";
public static final String LIB_DIR = "lib";
@@ -75,19 +72,14 @@ public class ServerConfiguration extends
private File _configFile;
private File _vhostsFile;
- private Logger _log = Logger.getLogger(this.getClass());
-
- private ConfigurationManagementMBean _mbean;
-
// Map of environment variables to config items
private static final Map<String, String> envVarMap = new HashMap<String, String>();
// Configuration values to be read from the configuration file
//todo Move all properties to static values to ensure system testing can be performed.
- public static final String CONNECTOR_PROTECTIO_ENABLED = "connector.protectio.enabled";
- public static final String CONNECTOR_PROTECTIO_READ_BUFFER_LIMIT_SIZE = "connector.protectio.readBufferLimitSize";
- public static final String CONNECTOR_PROTECTIO_WRITE_BUFFER_LIMIT_SIZE = "connector.protectio.writeBufferLimitSize";
public static final String MGMT_CUSTOM_REGISTRY_SOCKET = "management.custom-registry-socket";
+ public static final String MGMT_JMXPORT_REGISTRYSERVER = "management.jmxport.registryServer";
+ public static final String MGMT_JMXPORT_CONNECTORSERVER = "management.jmxport.connectorServer";
public static final String STATUS_UPDATES = "status-updates";
public static final String ADVANCED_LOCALE = "advanced.locale";
@@ -95,9 +87,9 @@ public class ServerConfiguration extends
envVarMap.put("QPID_PORT", "connector.port");
envVarMap.put("QPID_ENABLEDIRECTBUFFERS", "advanced.enableDirectBuffers");
envVarMap.put("QPID_SSLPORT", "connector.ssl.port");
- envVarMap.put("QPID_NIO", "connector.qpidnio");
envVarMap.put("QPID_WRITEBIASED", "advanced.useWriteBiasedPool");
- envVarMap.put("QPID_JMXPORT", "management.jmxport");
+ envVarMap.put("QPID_JMXPORT_REGISTRYSERVER", MGMT_JMXPORT_REGISTRYSERVER);
+ envVarMap.put("QPID_JMXPORT_CONNECTORSERVER", MGMT_JMXPORT_CONNECTORSERVER);
envVarMap.put("QPID_FRAMESIZE", "advanced.framesize");
envVarMap.put("QPID_MSGAUTH", "security.msg-auth");
envVarMap.put("QPID_AUTOREGISTER", "auto_register");
@@ -131,7 +123,7 @@ public class ServerConfiguration extends
* Configuration Manager to be initialised in the Application Registry.
* <p>
* If using this ServerConfiguration via an ApplicationRegistry there is no
- * need to explictly call {@link #initialise()} as this is done via the
+ * need to explicitly call {@link #initialise()} as this is done via the
* {@link ApplicationRegistry#initialise()} method.
*
* @param configurationURL
@@ -141,15 +133,26 @@ public class ServerConfiguration extends
{
this(parseConfig(configurationURL));
_configFile = configurationURL;
- try
+
+ SignalHandlerTask hupReparseTask = new SignalHandlerTask()
{
- Signal sig = new sun.misc.Signal("HUP");
- sun.misc.Signal.handle(sig, this);
- }
- catch (Exception e)
+ public void handle()
+ {
+ try
+ {
+ reparseConfigFileSecuritySections();
+ }
+ catch (ConfigurationException e)
+ {
+ _logger.error("Could not reload configuration file security sections", e);
+ }
+ }
+ };
+
+ if(!hupReparseTask.register("HUP"))
{
- _logger.error("Signal HUP not supported for OS: " + System.getProperty("os.name"));
- // We're on something that doesn't handle SIGHUP, how sad, Windows.
+ _logger.info("Unable to register Signal HUP handler to reload security configuration.");
+ _logger.info("Signal HUP not supported for this OS / JVM combination - " + SignalHandlerTask.getPlatformDescription());
}
}
@@ -166,7 +169,7 @@ public class ServerConfiguration extends
* Configuration Manager to be initialised in the Application Registry.
* <p>
* If using this ServerConfiguration via an ApplicationRegistry there is no
- * need to explictly call {@link #initialise()} as this is done via the
+ * need to explicitly call {@link #initialise()} as this is done via the
* {@link ApplicationRegistry#initialise()} method.
*
* @param conf
@@ -205,7 +208,53 @@ public class ServerConfiguration extends
@Override
public void validateConfiguration() throws ConfigurationException
{
- //Currently doesn't do validation
+ // Support for security.jmx.access was removed when JMX access rights were incorporated into the main ACL.
+ // This ensure that users remove the element from their configuration file.
+
+ if (getListValue("security.jmx.access").size() > 0)
+ {
+ String message = "Validation error : security/jmx/access is no longer a supported element within the configuration xml."
+ + (_configFile == null ? "" : " Configuration file : " + _configFile);
+ throw new ConfigurationException(message);
+ }
+
+ if (getListValue("security.jmx.principal-database").size() > 0)
+ {
+ String message = "Validation error : security/jmx/principal-database is no longer a supported element within the configuration xml."
+ + (_configFile == null ? "" : " Configuration file : " + _configFile);
+ throw new ConfigurationException(message);
+ }
+
+ if (getListValue("security.principal-databases.principal-database(0).class").size() > 0)
+ {
+ String message = "Validation error : security/principal-databases is no longer supported within the configuration xml."
+ + (_configFile == null ? "" : " Configuration file : " + _configFile);
+ throw new ConfigurationException(message);
+ }
+
+ // QPID-3266. Tidy up housekeeping configuration option for scheduling frequency
+ if (contains("housekeeping.expiredMessageCheckPeriod"))
+ {
+ String message = "Validation error : housekeeping/expiredMessageCheckPeriod must be replaced by housekeeping/checkPeriod."
+ + (_configFile == null ? "" : " Configuration file : " + _configFile);
+ throw new ConfigurationException(message);
+ }
+
+ // QPID-3517: Inconsistency in capitalisation in the SSL configuration keys used within the connector and management configuration
+ // sections. For the moment, continue to understand both but generate a deprecated warning if the less preferred keystore is used.
+ for (String key : new String[] {"management.ssl.keystorePath",
+ "management.ssl.keystorePassword," +
+ "connector.ssl.keystorePath",
+ "connector.ssl.keystorePassword"})
+ {
+ if (contains(key))
+ {
+ final String deprecatedXpath = key.replaceAll("\\.", "/");
+ final String preferredXpath = deprecatedXpath.replaceAll("keystore", "keyStore");
+ _logger.warn("Validation warning: " + deprecatedXpath + " is deprecated and must be replaced by " + preferredXpath
+ + (_configFile == null ? "" : " Configuration file : " + _configFile));
+ }
+ }
}
/*
@@ -371,7 +420,7 @@ public class ServerConfiguration extends
public final static Configuration flatConfig(File file) throws ConfigurationException
{
// We have to override the interpolate methods so that
- // interpolation takes place accross the entirety of the
+ // interpolation takes place across the entirety of the
// composite configuration. Without doing this each
// configuration object only interpolates variables defined
// inside itself.
@@ -398,18 +447,6 @@ public class ServerConfiguration extends
return _configFile == null ? "" : _configFile.getAbsolutePath();
}
- public void handle(Signal arg0)
- {
- try
- {
- reparseConfigFileSecuritySections();
- }
- catch (ConfigurationException e)
- {
- _logger.error("Could not reload configuration file security sections", e);
- }
- }
-
public void reparseConfigFileSecuritySections() throws ConfigurationException
{
if (_configFile != null)
@@ -453,14 +490,24 @@ public class ServerConfiguration extends
return System.getProperty(QPID_HOME);
}
- public void setJMXManagementPort(int mport)
+ public void setJMXPortRegistryServer(int registryServerPort)
+ {
+ getConfig().setProperty(MGMT_JMXPORT_REGISTRYSERVER, registryServerPort);
+ }
+
+ public int getJMXPortRegistryServer()
{
- getConfig().setProperty("management.jmxport", mport);
+ return getIntValue(MGMT_JMXPORT_REGISTRYSERVER, DEFAULT_JMXPORT_REGISTRYSERVER);
}
- public int getJMXManagementPort()
+ public void setJMXPortConnectorServer(int connectorServerPort)
{
- return getIntValue("management.jmxport", DEFAULT_JMXPORT);
+ getConfig().setProperty(MGMT_JMXPORT_CONNECTORSERVER, connectorServerPort);
+ }
+
+ public int getJMXConnectorServerPort()
+ {
+ return getIntValue(MGMT_JMXPORT_CONNECTORSERVER, getJMXPortRegistryServer() + JMXPORT_CONNECTORSERVER_OFFSET);
}
public boolean getUseCustomRMISocketFactory()
@@ -503,58 +550,11 @@ public class ServerConfiguration extends
_virtualHosts.put(config.getName(), config);
}
- public List<String> getPrincipalDatabaseNames()
- {
- return getListValue("security.principal-databases.principal-database.name");
- }
-
- public List<String> getPrincipalDatabaseClass()
- {
- return getListValue("security.principal-databases.principal-database.class");
- }
-
- public List<String> getPrincipalDatabaseAttributeNames(int index)
- {
- String name = "security.principal-databases.principal-database(" + index + ")." + "attributes.attribute.name";
- return getListValue(name);
- }
-
- public List<String> getPrincipalDatabaseAttributeValues(int index)
- {
- String name = "security.principal-databases.principal-database(" + index + ")." + "attributes.attribute.value";
- return getListValue(name);
- }
-
- public List<String> getManagementPrincipalDBs()
- {
- return getListValue("security.jmx.principal-database");
- }
-
- public List<String> getManagementAccessList()
- {
- return getListValue("security.jmx.access");
- }
-
public int getFrameSize()
{
return getIntValue("advanced.framesize", DEFAULT_FRAME_SIZE);
}
- public boolean getProtectIOEnabled()
- {
- return getBooleanValue(CONNECTOR_PROTECTIO_ENABLED, DEFAULT_BROKER_CONNECTOR_PROTECTIO_ENABLED);
- }
-
- public int getBufferReadLimit()
- {
- return getIntValue(CONNECTOR_PROTECTIO_READ_BUFFER_LIMIT_SIZE, DEFAULT_BUFFER_READ_LIMIT_SIZE);
- }
-
- public int getBufferWriteLimit()
- {
- return getIntValue(CONNECTOR_PROTECTIO_WRITE_BUFFER_LIMIT_SIZE, DEFAULT_BUFFER_WRITE_LIMIT_SIZE);
- }
-
public boolean getSynchedClocks()
{
return getBooleanValue("advanced.synced-clocks");
@@ -565,14 +565,10 @@ public class ServerConfiguration extends
return getBooleanValue("security.msg-auth");
}
- public String getJMXPrincipalDatabase()
- {
- return getStringValue("security.jmx.principal-database");
- }
-
public String getManagementKeyStorePath()
{
- return getStringValue("management.ssl.keyStorePath");
+ final String fallback = getStringValue("management.ssl.keystorePath");
+ return getStringValue("management.ssl.keyStorePath", fallback);
}
public boolean getManagementSSLEnabled()
@@ -582,7 +578,8 @@ public class ServerConfiguration extends
public String getManagementKeyStorePassword()
{
- return getStringValue("management.ssl.keyStorePassword");
+ final String fallback = getStringValue("management.ssl.keystorePassword");
+ return getStringValue("management.ssl.keyStorePassword", fallback);
}
public boolean getQueueAutoRegister()
@@ -650,14 +647,14 @@ public class ServerConfiguration extends
return getLongValue("flowResumeCapacity", getCapacity());
}
- public int getProcessors()
+ public int getConnectorProcessors()
{
return getIntValue("connector.processors", 4);
}
public List getPorts()
{
- return getListValue("connector.port", Collections.singletonList(DEFAULT_PORT));
+ return getListValue("connector.port", Collections.<Integer>singletonList(DEFAULT_PORT));
}
public List getPortExclude010()
@@ -682,17 +679,17 @@ public class ServerConfiguration extends
public String getBind()
{
- return getStringValue("connector.bind", "wildcard");
+ return getStringValue("connector.bind", WILDCARD_ADDRESS);
}
public int getReceiveBufferSize()
{
- return getIntValue("connector.socketReceiveBuffer", 32767);
+ return getIntValue("connector.socketReceiveBuffer", DEFAULT_BUFFER_SIZE);
}
public int getWriteBufferSize()
{
- return getIntValue("connector.socketWriteBuffer", 32767);
+ return getIntValue("connector.socketWriteBuffer", DEFAULT_BUFFER_SIZE);
}
public boolean getTcpNoDelay()
@@ -715,31 +712,28 @@ public class ServerConfiguration extends
return getBooleanValue("connector.ssl.sslOnly");
}
- public int getSSLPort()
+ public List getSSLPorts()
{
- return getIntValue("connector.ssl.port", DEFAULT_SSL_PORT);
+ return getListValue("connector.ssl.port", Collections.<Integer>singletonList(DEFAULT_SSL_PORT));
}
- public String getKeystorePath()
+ public String getConnectorKeyStorePath()
{
- return getStringValue("connector.ssl.keystorePath", "none");
+ final String fallback = getStringValue("connector.ssl.keystorePath"); // pre-0.13 broker supported this name.
+ return getStringValue("connector.ssl.keyStorePath", fallback);
}
- public String getKeystorePassword()
+ public String getConnectorKeyStorePassword()
{
- return getStringValue("connector.ssl.keystorePassword", "none");
+ final String fallback = getStringValue("connector.ssl.keystorePassword"); // pre-0.13 brokers supported this name.
+ return getStringValue("connector.ssl.keyStorePassword", fallback);
}
- public String getCertType()
+ public String getConnectorCertType()
{
return getStringValue("connector.ssl.certType", "SunX509");
}
- public boolean getQpidNIO()
- {
- return getBooleanValue("connector.qpidnio");
- }
-
public boolean getUseBiasedWrites()
{
return getBooleanValue("advanced.useWriteBiasedPool");
@@ -755,69 +749,44 @@ public class ServerConfiguration extends
getConfig().setProperty("virtualhosts.default", vhost);
}
- public void setHousekeepingExpiredMessageCheckPeriod(long value)
+ public void setHousekeepingCheckPeriod(long value)
{
- getConfig().setProperty("housekeeping.expiredMessageCheckPeriod", value);
+ getConfig().setProperty("housekeeping.checkPeriod", value);
}
public long getHousekeepingCheckPeriod()
{
- return getLongValue("housekeeping.checkPeriod",
- getLongValue("housekeeping.expiredMessageCheckPeriod",
- DEFAULT_HOUSEKEEPING_PERIOD));
+ return getLongValue("housekeeping.checkPeriod", DEFAULT_HOUSEKEEPING_PERIOD);
}
- public NetworkDriverConfiguration getNetworkConfiguration()
+ public long getStatisticsSamplePeriod()
{
- return new NetworkDriverConfiguration()
- {
-
- public Integer getTrafficClass()
- {
- return null;
- }
-
- public Boolean getTcpNoDelay()
- {
- // Can't call parent getTcpNoDelay since it just calls this one
- return getBooleanValue("connector.tcpNoDelay", true);
- }
-
- public Integer getSoTimeout()
- {
- return null;
- }
-
- public Integer getSoLinger()
- {
- return null;
- }
+ return getConfig().getLong("statistics.sample.period", 5000L);
+ }
- public Integer getSendBufferSize()
- {
- return getBufferWriteLimit();
- }
+ public boolean isStatisticsGenerationBrokerEnabled()
+ {
+ return getConfig().getBoolean("statistics.generation.broker", false);
+ }
- public Boolean getReuseAddress()
- {
- return null;
- }
+ public boolean isStatisticsGenerationVirtualhostsEnabled()
+ {
+ return getConfig().getBoolean("statistics.generation.virtualhosts", false);
+ }
- public Integer getReceiveBufferSize()
- {
- return getBufferReadLimit();
- }
+ public boolean isStatisticsGenerationConnectionsEnabled()
+ {
+ return getConfig().getBoolean("statistics.generation.connections", false);
+ }
- public Boolean getOOBInline()
- {
- return null;
- }
+ public long getStatisticsReportingPeriod()
+ {
+ return getConfig().getLong("statistics.reporting.period", 0L);
+ }
- public Boolean getKeepAlive()
- {
- return null;
- }
- };
+ public boolean isStatisticsReportResetEnabled()
+ {
+ return getConfig().getBoolean("statistics.reporting.reset", false);
}
public int getMaxChannelCount()
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java Fri Oct 21 14:42:12 2011
@@ -86,9 +86,9 @@ public class VirtualHostConfiguration ex
return _name;
}
- public long getHousekeepingExpiredMessageCheckPeriod()
+ public long getHousekeepingCheckPeriod()
{
- return getLongValue("housekeeping.expiredMessageCheckPeriod", ApplicationRegistry.getInstance().getConfiguration().getHousekeepingCheckPeriod());
+ return getLongValue("housekeeping.checkPeriod", ApplicationRegistry.getInstance().getConfiguration().getHousekeepingCheckPeriod());
}
public String getAuthenticationDatabase()
@@ -306,11 +306,45 @@ public class VirtualHostConfiguration ex
@Override
public void validateConfiguration() throws ConfigurationException
{
- //Currently doesn't do validation
+ // QPID-3249. Support for specifying authentication name at vhost level is no longer supported.
+ if (getListValue("security.authentication.name").size() > 0)
+ {
+ String message = "Validation error : security/authentication/name is no longer a supported element within the configuration xml."
+ + " It appears in virtual host definition : " + _name;
+ throw new ConfigurationException(message);
+ }
+
+ // QPID-3266. Tidy up housekeeping configuration option for scheduling frequency
+ if (contains("housekeeping.expiredMessageCheckPeriod"))
+ {
+ String message = "Validation error : housekeeping/expiredMessageCheckPeriod must be replaced by housekeeping/checkPeriod."
+ + " It appears in virtual host definition : " + _name;
+ throw new ConfigurationException(message);
+ }
}
public int getHouseKeepingThreadCount()
{
return getIntValue("housekeeping.poolSize", Runtime.getRuntime().availableProcessors());
}
+
+ public long getTransactionTimeoutOpenWarn()
+ {
+ return getLongValue("transactionTimeout.openWarn", 0L);
+ }
+
+ public long getTransactionTimeoutOpenClose()
+ {
+ return getLongValue("transactionTimeout.openClose", 0L);
+ }
+
+ public long getTransactionTimeoutIdleWarn()
+ {
+ return getLongValue("transactionTimeout.idleWarn", 0L);
+ }
+
+ public long getTransactionTimeoutIdleClose()
+ {
+ return getLongValue("transactionTimeout.idleClose", 0L);
+ }
}
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/configuration/plugins/ConfigurationPlugin.java Fri Oct 21 14:42:12 2011
@@ -24,6 +24,7 @@ import org.apache.commons.configuration.
import org.apache.log4j.Logger;
import org.apache.qpid.server.configuration.ConfigurationManager;
import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.registry.IApplicationRegistry;
import java.util.Collections;
import java.util.HashMap;
@@ -138,10 +139,28 @@ public abstract class ConfigurationPlugi
}
}
+ offerRemainingConfigurationToOtherPlugins(path, configuration, elements);
+
+ validateConfiguration();
+ }
+
+ private void offerRemainingConfigurationToOtherPlugins(String path,
+ Configuration configuration, Set<String> elements) throws ConfigurationException
+ {
+ final IApplicationRegistry appRegistry = safeGetApplicationRegistryInstance();
+
+ if (appRegistry == null)
+ {
+ // We see this happen during shutdown due to asynchronous reconfig using IO threads.
+ // Need to remove the responsibility for offering configuration to other class.
+ _logger.info("Cannot offer remaining config to other plugins, can't find app registry");
+ return;
+ }
+
+ final ConfigurationManager configurationManager = appRegistry.getConfigurationManager();
// Process the elements in the configuration
for (String element : elements)
{
- ConfigurationManager configurationManager = ApplicationRegistry.getInstance().getConfigurationManager();
Configuration handled = element.length() == 0 ? configuration : configuration.subset(element);
String configurationElement = element;
@@ -162,8 +181,18 @@ public abstract class ConfigurationPlugi
_pluginConfiguration.put(plugin.getClass().getName(), plugin);
}
}
+ }
- validateConfiguration();
+ private IApplicationRegistry safeGetApplicationRegistryInstance()
+ {
+ try
+ {
+ return ApplicationRegistry.getInstance();
+ }
+ catch (IllegalStateException ise)
+ {
+ return null;
+ }
}
/** Helper method to print out list of keys in a {@link Configuration}. */
Modified: qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java?rev=1187375&r1=1187374&r2=1187375&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java (original)
+++ qpid/branches/QPID-2519/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java Fri Oct 21 14:42:12 2011
@@ -20,19 +20,21 @@
*/
package org.apache.qpid.server.connection;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.log4j.Logger;
-import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.common.Closeable;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQProtocolEngine;
+import org.apache.qpid.transport.TransportException;
public class ConnectionRegistry implements IConnectionRegistry, Closeable
{
- private List<AMQProtocolSession> _registry = new CopyOnWriteArrayList<AMQProtocolSession>();
+ private List<AMQConnectionModel> _registry = new CopyOnWriteArrayList<AMQConnectionModel>();
private Logger _logger = Logger.getLogger(ConnectionRegistry.class);
@@ -40,44 +42,46 @@ public class ConnectionRegistry implemen
{
// None required
}
-
- public void expireClosedChannels()
- {
- for (AMQProtocolSession connection : _registry)
- {
- connection.closeIfLingeringClosedChannels();
- }
- }
/** Close all of the currently open connections. */
public void close()
{
+ _logger.debug("Closing connection registry :" + _registry.size() + " connections.");
while (!_registry.isEmpty())
{
- AMQProtocolSession connection = _registry.get(0);
+ AMQConnectionModel connection = _registry.get(0);
+ closeConnection(connection, AMQConstant.CONNECTION_FORCED, "Broker is shutting down");
+ }
+ }
- try
- {
- connection.closeConnection(0, new AMQConnectionException(AMQConstant.INTERNAL_ERROR, "Broker is shutting down",
- 0, 0,
- connection.getProtocolOutputConverter().getProtocolMajorVersion(),
- connection.getProtocolOutputConverter().getProtocolMinorVersion(),
- (Throwable) null), true);
- }
- catch (AMQException e)
- {
- _logger.warn("Error closing connection:" + e.getMessage());
- }
+ public void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message)
+ {
+ try
+ {
+ connection.close(cause, message);
+ }
+ catch (TransportException e)
+ {
+ _logger.warn("Error closing connection:" + e.getMessage());
+ }
+ catch (AMQException e)
+ {
+ _logger.warn("Error closing connection:" + e.getMessage());
}
}
- public void registerConnection(AMQProtocolSession connnection)
+ public void registerConnection(AMQConnectionModel connnection)
{
_registry.add(connnection);
}
- public void deregisterConnection(AMQProtocolSession connnection)
+ public void deregisterConnection(AMQConnectionModel connnection)
{
_registry.remove(connnection);
}
+
+ public List<AMQConnectionModel> getConnections()
+ {
+ return new ArrayList<AMQConnectionModel>(_registry);
+ }
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org