You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/02/28 17:14:57 UTC
svn commit: r1451244 [24/45] - in /qpid/branches/asyncstore: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/
cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf2/rub...
Modified: qpid/branches/asyncstore/java/broker/etc/broker_example.acl
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/etc/broker_example.acl?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/etc/broker_example.acl (original)
+++ qpid/branches/asyncstore/java/broker/etc/broker_example.acl Thu Feb 28 16:14:30 2013
@@ -19,24 +19,20 @@
### EXAMPLE ACL V2 FILE
### NOTE: Rules are considered from top to bottom, and the first matching rule governs the decision.
-
-### DEFINE GROUPS ###
-
-#Define a 'messaging-users' group with users 'client' and 'server' in it
-GROUP messaging-users client server
-
-#Define a group for management web console users
-GROUP webadmins webadmin
+### Rules may refer to users or groups. Groups are currently defined in the etc/groups file.
### JMX MANAGEMENT ####
-# Allow everyone to perform read operations on the ServerInformation mbean
-# This is used for items such as querying the management API and broker release versions.
-ACL ALLOW ALL ACCESS METHOD component="ServerInformation"
-
-# Allow 'admin' all management operations. To reduce log file noise, only non-read-only operations are logged.
-ACL ALLOW admin ACCESS METHOD
-ACL ALLOW-LOG admin ALL METHOD
+# To use JMX management, first give the user/group ACCESS MANAGEMENT permission
+ACL ALLOW administrators ACCESS MANAGEMENT
+ACL ALLOW guest ACCESS MANAGEMENT
+
+# Allow guest to perform read operations on the ServerInformation mbean
+ACL ALLOW guest ACCESS METHOD component="ServerInformation"
+
+# Allow 'administrators' all management operations. To reduce log file noise, only non-read-only operations are logged.
+ACL ALLOW administrators ACCESS METHOD
+ACL ALLOW-LOG administrators ALL METHOD
# Allow 'guest' to view logger levels, and use getter methods on LoggingManagement
ACL ALLOW guest ACCESS METHOD component="LoggingManagement" name="viewEffectiveRuntimeLoggerLevels"
@@ -49,17 +45,61 @@ ACL DENY-LOG ALL ACCESS METHOD component
ACL DENY-LOG ALL ACCESS METHOD component="ConfigurationManagement"
ACL DENY-LOG ALL ACCESS METHOD component="LoggingManagement"
-# Allow everyone to perform all read operations (using ALLOW rather than ALLOW-LOG to reduce log file noise)
-# on the mbeans not listed in the DENY rules above
+# Allow everyone to perform all read operations on the mbeans not listed in the DENY rules above
ACL ALLOW ALL ACCESS METHOD
+### WEB MANAGEMENT ####
+
+# To use web management, first give the user/group ACCESS MANAGEMENT permission
+ACL ALLOW webadmins ACCESS MANAGEMENT
+
+# ACL for web management console admins
+# All rules below are required for console admin users
+# to perform create/update/delete operations
+ACL ALLOW-LOG webadmins CREATE QUEUE
+ACL ALLOW-LOG webadmins DELETE QUEUE
+ACL ALLOW-LOG webadmins PURGE QUEUE
+ACL ALLOW-LOG webadmins CREATE EXCHANGE
+ACL ALLOW-LOG webadmins DELETE EXCHANGE
+ACL ALLOW-LOG webadmins BIND EXCHANGE
+ACL ALLOW-LOG webadmins UNBIND EXCHANGE
+ACL ALLOW-LOG webadmins CREATE GROUP
+ACL ALLOW-LOG webadmins DELETE GROUP
+ACL ALLOW-LOG webadmins UPDATE GROUP
+ACL ALLOW-LOG webadmins CREATE USER
+ACL ALLOW-LOG webadmins DELETE USER
+ACL ALLOW-LOG webadmins UPDATE USER
+
+ACL ALLOW-LOG webadmins UPDATE METHOD
+
+# at the moment only the following UPDATE METHOD rules are supported by web management console
+#ACL ALLOW-LOG webadmins UPDATE METHOD component="VirtualHost.Queue" name="moveMessages"
+#ACL ALLOW-LOG webadmins UPDATE METHOD component="VirtualHost.Queue" name="copyMessages"
+#ACL ALLOW-LOG webadmins UPDATE METHOD component="VirtualHost.Queue" name="deleteMessages"
+
### MESSAGING ###
+# The 'ACCESS VIRTUALHOST' rules below apply to messaging operations (as opposed to management operations)
+
+# Firewall examples
+
+# Deny access to all users from *.example.company1.com and *.example.company2.com
+ACL DENY-LOG all ACCESS VIRTUALHOST from_hostname=".*\.example\.company1.com,.*\.example\.company2.com"
-#Example permissions for request-response based messaging.
+# Deny access to all users in the IP ranges 192.168.1.0-192.168.1.255 and 192.168.2.0-192.168.2.255,
+# using the notation specified in RFC 4632, "Classless Inter-domain Routing (CIDR)"
+ACL DENY-LOG messaging-users ACCESS VIRTUALHOST from_network="192.168.1.0/24,192.168.2.0/24"
-#Allow 'messaging-users' group to connect to the virtualhost
+# Deny access to all users in the IP ranges 192.169.1.0-192.169.1.255 and 192.169.2.0-192.169.2.255,
+# using wildcard notation.
+ACL DENY-LOG messaging-users ACCESS VIRTUALHOST from_network="192.169.1.*,192.169.2.*"
+
+# Allow 'messaging-users' group to connect to all virtualhosts
ACL ALLOW-LOG messaging-users ACCESS VIRTUALHOST
+# Deny messaging-users management
+ACL DENY-LOG messaging-users ACCESS MANAGEMENT
+
+
# Client side
# Allow the 'client' user to publish requests to the request queue and create, consume from, and delete temporary reply queues.
ACL ALLOW-LOG client CREATE QUEUE temporary="true"
@@ -77,24 +117,8 @@ ACL ALLOW-LOG server CONSUME QUEUE name=
ACL ALLOW-LOG server BIND EXCHANGE
ACL ALLOW-LOG server PUBLISH EXCHANGE name="amq.direct" routingKey="TempQueue*"
-# ACL for web management console admins
-# All rules below are required for console admin users
-# to perform create/update/delete operations
-ACL ALLOW-LOG webadmins CREATE QUEUE
-ACL ALLOW-LOG webadmins DELETE QUEUE
-ACL ALLOW-LOG webadmins PURGE QUEUE
-ACL ALLOW-LOG webadmins CREATE EXCHANGE
-ACL ALLOW-LOG webadmins DELETE EXCHANGE
-ACL ALLOW-LOG webadmins BIND EXCHANGE
-ACL ALLOW-LOG webadmins UNBIND EXCHANGE
-ACL ALLOW-LOG webadmins UPDATE METHOD
-
-# at the moment only the following UPDATE METHOD rules are supported by web management console
-#ACL ALLOW-LOG webadmins UPDATE METHOD component="VirtualHost.Queue" name="moveMessages"
-#ACL ALLOW-LOG webadmins UPDATE METHOD component="VirtualHost.Queue" name="copyMessages"
-#ACL ALLOW-LOG webadmins UPDATE METHOD component="VirtualHost.Queue" name="deleteMessages"
### DEFAULT ###
-#Deny all users from performing all operations
+# Deny all users from performing all operations
ACL DENY-LOG all all
Modified: qpid/branches/asyncstore/java/broker/etc/log4j.xml
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/etc/log4j.xml?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/etc/log4j.xml (original)
+++ qpid/branches/asyncstore/java/broker/etc/log4j.xml Thu Feb 28 16:14:30 2013
@@ -68,7 +68,7 @@
<param name="backupFilesToPath" value="${QPID_WORK}/backup/log"/>
<layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d %-5p [%t] (%F:%L) - %m%n"/>
+ <param name="ConversionPattern" value="%d %-5p [%t] (%c{2}) - %m%n"/>
</layout>
</appender>
@@ -77,20 +77,20 @@
<param name="Append" value="false"/>
<layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d %-5p [%t] (%F:%L) - %m%n"/>
+ <param name="ConversionPattern" value="%d %-5p [%t] (%c{2}) - %m%n"/>
</layout>
</appender>
<appender class="org.apache.log4j.ConsoleAppender" name="STDOUT">
<layout class="org.apache.log4j.PatternLayout">
- <param name="ConversionPattern" value="%d %-5p [%t] (%F:%L) - %m%n"/>
+ <param name="ConversionPattern" value="%d %-5p [%t] (%c{2}) - %m%n"/>
</layout>
</appender>
<!-- Provide warnings to standard output -->
- <category additivity="true" name="org.apache.qpid">
- <priority value="warn"/>
- </category>
+ <logger additivity="true" name="org.apache.qpid">
+ <level value="warn"/>
+ </logger>
<!-- Enable info messages for the status-logging hierarchy -->
<logger additivity="true" name="qpid.message">
@@ -108,21 +108,14 @@
<level value="info"/>
</logger>
- <!-- Examples of additional logging settings -->
- <!-- Used to generate extra debug. See debug.log4j.xml -->
-
- <!--<category additivity="true" name="org.apache.qpid.server.store">
- <priority value="debug"/>
- </category-->
-
<!-- Set the commons logging that the XML parser uses to WARN, it is very chatty at debug -->
<logger name="org.apache.commons">
- <level value="WARN"/>
+ <level value="warn"/>
</logger>
<!-- Log all info events to file -->
<root>
- <priority value="info"/>
+ <level value="info"/>
<appender-ref ref="FileAppender"/>
<!--appender-ref ref="ArchivingFileAppender"/-->
</root>
Modified: qpid/branches/asyncstore/java/broker/src/main/java/broker.bnd
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/broker.bnd?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/broker.bnd (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/broker.bnd Thu Feb 28 16:14:30 2013
@@ -17,7 +17,7 @@
# under the License.
#
-ver: 0.19.0
+ver: 0.21.0
Bundle-SymbolicName: qpid-broker
Bundle-Version: ${ver}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Thu Feb 28 16:14:30 2013
@@ -51,13 +51,10 @@ import org.apache.qpid.framing.MethodReg
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl;
-import org.apache.qpid.server.configuration.ConfigStore;
-import org.apache.qpid.server.configuration.ConfiguredObject;
-import org.apache.qpid.server.configuration.ConnectionConfig;
-import org.apache.qpid.server.configuration.SessionConfig;
-import org.apache.qpid.server.configuration.SessionConfigType;
+import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.Pre0_10CreditManager;
@@ -75,7 +72,6 @@ import org.apache.qpid.server.message.Me
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.server.protocol.AMQProtocolEngine;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
@@ -83,7 +79,6 @@ import org.apache.qpid.server.queue.Base
import org.apache.qpid.server.queue.InboundMessageAdapter;
import org.apache.qpid.server.queue.IncomingMessage;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
@@ -93,19 +88,19 @@ import org.apache.qpid.server.subscripti
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
+import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.TransportException;
-public class AMQChannel implements SessionConfig, AMQSessionModel, AsyncAutoCommitTransaction.FutureRecorder
+public class AMQChannel implements AMQSessionModel, AsyncAutoCommitTransaction.FutureRecorder
{
public static final int DEFAULT_PREFETCH = 4096;
private static final Logger _logger = Logger.getLogger(AMQChannel.class);
- private static final boolean MSG_AUTH =
- ApplicationRegistry.getInstance().getConfiguration().getMsgAuth();
-
+ //TODO use Broker property to configure message authorization requirements
+ private boolean _messageAuthorizationRequired = Boolean.getBoolean(BrokerProperties.PROPERTY_MSG_AUTH);
private final int _channelId;
@@ -118,7 +113,7 @@ public class AMQChannel implements Sessi
*/
private long _deliveryTag = 0;
- /** A channel has a default queue (the last declared) that is used when no queue name is explictily set */
+ /** A channel has a default queue (the last declared) that is used when no queue name is explicitly set */
private AMQQueue _defaultQueue;
/** This tag is unique per subscription to a queue. The server returns this in response to a basic.consume request. */
@@ -151,7 +146,6 @@ public class AMQChannel implements Sessi
private final AtomicLong _txnCommits = new AtomicLong(0);
private final AtomicLong _txnRejects = new AtomicLong(0);
private final AtomicLong _txnCount = new AtomicLong(0);
- private final AtomicLong _txnUpdateTime = new AtomicLong(0);
private final AMQProtocolSession _session;
private AtomicBoolean _closing = new AtomicBoolean(false);
@@ -169,12 +163,12 @@ public class AMQChannel implements Sessi
private List<QueueEntry> _resendList = new ArrayList<QueueEntry>();
private static final
AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible.");
- private final UUID _qmfId;
private long _createTime = System.currentTimeMillis();
private final ClientDeliveryMethod _clientDeliveryMethod;
private final TransactionTimeoutHelper _transactionTimeoutHelper;
+ private final UUID _id = UUID.randomUUID();
public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
throws AMQException
@@ -184,30 +178,36 @@ public class AMQChannel implements Sessi
_actor = new AMQPChannelActor(this, session.getLogActor().getRootMessageLogger());
_logSubject = new ChannelLogSubject(this);
- _qmfId = getConfigStore().createId();
_actor.message(ChannelMessages.CREATE());
- getConfigStore().addConfiguredObject(this);
-
_messageStore = messageStore;
// by default the session is non-transactional
_transaction = new AsyncAutoCommitTransaction(_messageStore, this);
- _clientDeliveryMethod = session.createDeliveryMethod(_channelId);
-
- _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject);
- }
+ _clientDeliveryMethod = session.createDeliveryMethod(_channelId);
- public ConfigStore getConfigStore()
- {
- return getVirtualHost().getConfigStore();
+ _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, new CloseAction()
+ {
+ @Override
+ public void doTimeoutAction(String reason) throws AMQException
+ {
+ closeConnection(reason);
+ }
+ });
}
/** Sets this channel to be part of a local transaction */
public void setLocalTransactional()
{
- _transaction = new LocalTransaction(_messageStore);
+ _transaction = new LocalTransaction(_messageStore, new ActivityTimeAccessor()
+ {
+ @Override
+ public long getActivityTime()
+ {
+ return _session.getLastReceivedTime();
+ }
+ });
_txnStarts.incrementAndGet();
}
@@ -221,12 +221,6 @@ public class AMQChannel implements Sessi
sync();
}
-
- public boolean inTransaction()
- {
- return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0;
- }
-
private void incrementOutstandingTxnsIfNecessary()
{
if(isTransactional())
@@ -247,11 +241,6 @@ public class AMQChannel implements Sessi
}
}
- public Long getTxnStarts()
- {
- return _txnStarts.get();
- }
-
public Long getTxnCommits()
{
return _txnCommits.get();
@@ -369,9 +358,8 @@ public class AMQChannel implements Sessi
}
});
- _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues), getProtocolSession().getLastReceivedTime());
+ _transaction.enqueue(destinationQueues, _currentMessage, new MessageDeliveryAction(_currentMessage, destinationQueues));
incrementOutstandingTxnsIfNecessary();
- updateTransactionalActivity();
_currentMessage.getStoredMessage().flushToStore();
}
}
@@ -396,7 +384,7 @@ public class AMQChannel implements Sessi
if (_logger.isDebugEnabled())
{
- _logger.debug(debugIdentity() + "Content body received on channel " + _channelId);
+ _logger.debug(debugIdentity() + " content body received on channel " + _channelId);
}
try
@@ -556,9 +544,6 @@ public class AMQChannel implements Sessi
{
_logger.error("Caught TransportException whilst attempting to requeue:" + e);
}
-
- getConfigStore().removeConfiguredObject(this);
-
}
private void unsubscribeAllConsumers() throws AMQException
@@ -860,7 +845,6 @@ public class AMQChannel implements Sessi
{
Collection<QueueEntry> ackedMessages = getAckedMessages(deliveryTag, multiple);
_transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages));
- updateTransactionalActivity();
}
private Collection<QueueEntry> getAckedMessages(long deliveryTag, boolean multiple)
@@ -1054,19 +1038,6 @@ public class AMQChannel implements Sessi
}
}
-
-
- }
-
- /**
- * Update last transaction activity timestamp
- */
- private void updateTransactionalActivity()
- {
- if (isTransactional())
- {
- _txnUpdateTime.set(getProtocolSession().getLastReceivedTime());
- }
}
public String toString()
@@ -1149,10 +1120,16 @@ public class AMQChannel implements Sessi
? ((BasicContentHeaderProperties) header.getProperties()).getUserId()
: null;
- return (!MSG_AUTH || _session.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString()));
+ return (!_messageAuthorizationRequired || _session.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString()));
}
+ @Override
+ public UUID getId()
+ {
+ return _id;
+ }
+
public AMQConnectionModel getConnectionModel()
{
return _session;
@@ -1168,6 +1145,12 @@ public class AMQChannel implements Sessi
return _logSubject;
}
+ @Override
+ public int compareTo(AMQSessionModel o)
+ {
+ return getId().compareTo(o.getId());
+ }
+
private class MessageDeliveryAction implements ServerTransaction.Action
{
private IncomingMessage _incommingMessage;
@@ -1221,11 +1204,6 @@ public class AMQChannel implements Sessi
// TODO
throw new RuntimeException(e);
}
-
-
-
-
-
}
public void onRollback()
@@ -1375,7 +1353,6 @@ public class AMQChannel implements Sessi
public void onRollback()
{
- //To change body of implemented methods use File | Settings | File Templates.
}
}
@@ -1469,97 +1446,24 @@ public class AMQChannel implements Sessi
return getProtocolSession().getVirtualHost();
}
-
- public ConfiguredObject getParent()
- {
- return getVirtualHost();
- }
-
- public SessionConfigType getConfigType()
- {
- return SessionConfigType.getInstance();
- }
-
public int getChannel()
{
return getChannelId();
}
- public boolean isAttached()
- {
- return true;
- }
-
- public long getDetachedLifespan()
- {
- return 0;
- }
-
- public ConnectionConfig getConnectionConfig()
- {
- return (AMQProtocolEngine)getProtocolSession();
- }
-
- public Long getExpiryTime()
- {
- return null;
- }
-
- public Long getMaxClientRate()
- {
- return null;
- }
-
public boolean isDurable()
{
return false;
}
- @Override
- public UUID getQMFId()
- {
- return _qmfId;
- }
-
- public String getSessionName()
- {
- return getConnectionConfig().getAddress() + "/" + getChannelId();
- }
-
public long getCreateTime()
{
return _createTime;
}
- public void mgmtClose() throws AMQException
- {
- _session.mgmtCloseChannel(_channelId);
- }
-
public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
{
- if (inTransaction())
- {
- long currentTime = System.currentTimeMillis();
- long openTime = currentTime - _transaction.getTransactionStartTime();
- long idleTime = currentTime - _txnUpdateTime.get();
-
- _transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime),
- TransactionTimeoutHelper.IDLE_TRANSACTION_ALERT);
- if (_transactionTimeoutHelper.isTimedOut(idleTime, idleClose))
- {
- closeConnection("Idle transaction timed out");
- return;
- }
-
- _transactionTimeoutHelper.logIfNecessary(openTime, openWarn, ChannelMessages.OPEN_TXN(openTime),
- TransactionTimeoutHelper.OPEN_TRANSACTION_ALERT);
- if (_transactionTimeoutHelper.isTimedOut(openTime, openClose))
- {
- closeConnection("Open transaction timed out");
- return;
- }
- }
+ _transactionTimeoutHelper.checkIdleOrOpenTimes(_transaction, openWarn, openClose, idleWarn, idleClose);
}
/**
@@ -1637,6 +1541,11 @@ public class AMQChannel implements Sessi
public void sync()
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("sync() called on channel " + debugIdentity());
+ }
+
AsyncCommand cmd;
while((cmd = _unfinishedCommandsQueue.poll()) != null)
{
@@ -1674,16 +1583,6 @@ public class AMQChannel implements Sessi
_action.postCommit();
_action = null;
}
-
- boolean isReadyForCompletion()
- {
- return _future.isComplete();
- }
- }
-
- public int compareTo(AMQSessionModel session)
- {
- return getQMFId().compareTo(session.getQMFId());
}
@Override
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/Broker.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/Broker.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/Broker.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/Broker.java Thu Feb 28 16:14:30 2013
@@ -23,37 +23,31 @@ package org.apache.qpid.server;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.*;
-import javax.net.ssl.SSLContext;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
-import org.apache.qpid.server.configuration.ServerConfiguration;
-import org.apache.qpid.server.configuration.ServerNetworkTransportConfiguration;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.configuration.ConfigurationEntryStore;
+import org.apache.qpid.server.configuration.BrokerConfigurationStoreCreator;
+import org.apache.qpid.server.configuration.store.ManagementModeStoreHandler;
import org.apache.qpid.server.logging.SystemOutMessageLogger;
import org.apache.qpid.server.logging.actors.BrokerActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
-import org.apache.qpid.server.logging.actors.GenericActor;
-import org.apache.qpid.server.logging.log4j.LoggingFacade;
+import org.apache.qpid.server.logging.log4j.LoggingManagementFacade;
import org.apache.qpid.server.logging.messages.BrokerMessages;
-import org.apache.qpid.server.protocol.AmqpProtocolVersion;
-import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
-import org.apache.qpid.server.transport.QpidAcceptor;
-import org.apache.qpid.ssl.SSLContextFactory;
-import org.apache.qpid.transport.NetworkTransportConfiguration;
-import org.apache.qpid.transport.network.IncomingNetworkTransport;
-import org.apache.qpid.transport.network.Transport;
-
-import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
+import org.apache.qpid.server.registry.IApplicationRegistry;
public class Broker
{
private static final Logger LOGGER = Logger.getLogger(Broker.class);
private volatile Thread _shutdownHookThread;
+ private volatile IApplicationRegistry _applicationRegistry;
protected static class InitException extends RuntimeException
{
@@ -73,7 +67,17 @@ public class Broker
}
finally
{
- ApplicationRegistry.remove();
+ try
+ {
+ if (_applicationRegistry != null)
+ {
+ _applicationRegistry.close();
+ }
+ }
+ finally
+ {
+ clearAMQShortStringCache();
+ }
}
}
@@ -84,274 +88,76 @@ public class Broker
public void startup(final BrokerOptions options) throws Exception
{
+ CurrentActor.set(new BrokerActor(new SystemOutMessageLogger()));
try
{
- CurrentActor.set(new BrokerActor(new SystemOutMessageLogger()));
startupImpl(options);
addShutdownHook();
}
finally
{
- CurrentActor.remove();
+ try
+ {
+ CurrentActor.remove();
+ }
+ finally
+ {
+ clearAMQShortStringCache();
+ }
}
}
private void startupImpl(final BrokerOptions options) throws Exception
{
- final String qpidHome = options.getQpidHome();
- final File configFile = getConfigFile(options.getConfigFile(),
- BrokerOptions.DEFAULT_CONFIG_FILE, qpidHome, true);
-
- CurrentActor.get().message(BrokerMessages.CONFIG(configFile.getAbsolutePath()));
-
- File logConfigFile = getConfigFile(options.getLogConfigFile(),
- BrokerOptions.DEFAULT_LOG_CONFIG_FILE, qpidHome, false);
-
- configureLogging(logConfigFile, options.getLogWatchFrequency());
+ final String qpidHome = System.getProperty(BrokerProperties.PROPERTY_QPID_HOME);
+ String storeLocation = options.getConfigurationStoreLocation();
+ String storeType = options.getConfigurationStoreType();
- ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile, options.getBundleContext());
- ServerConfiguration serverConfig = config.getConfiguration();
- if (options.getQpidWork() != null)
+ if (storeLocation == null)
{
- serverConfig.setQpidWork(options.getQpidWork());
- }
- if (options.getQpidHome() != null)
- {
- serverConfig.setQpidHome(options.getQpidHome());
- }
- updateManagementPorts(serverConfig, options.getJmxPortRegistryServer(), options.getJmxPortConnectorServer());
-
- ApplicationRegistry.initialise(config);
-
- // We have already loaded the BrokerMessages class by this point so we
- // need to refresh the locale setting incase we had a different value in
- // the configuration.
- BrokerMessages.reload();
-
- // AR.initialise() sets and removes its own actor so we now need to set the actor
- // for the remainder of the startup, and the default actor if the stack is empty
- CurrentActor.set(new BrokerActor(config.getCompositeStartupMessageLogger()));
- CurrentActor.setDefault(new BrokerActor(config.getRootMessageLogger()));
- GenericActor.setDefaultMessageLogger(config.getRootMessageLogger());
-
- try
- {
- Set<Integer> ports = new HashSet<Integer>(options.getPorts());
- if(ports.isEmpty())
- {
- parsePortList(ports, serverConfig.getPorts());
- }
-
- Set<Integer> sslPorts = new HashSet<Integer>(options.getSSLPorts());
- if(sslPorts.isEmpty())
- {
- parsePortList(sslPorts, serverConfig.getSSLPorts());
- }
-
- //1-0 excludes and includes
- Set<Integer> exclude_1_0 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v1_0));
- if(exclude_1_0.isEmpty())
- {
- parsePortList(exclude_1_0, serverConfig.getPortExclude10());
- }
-
- Set<Integer> include_1_0 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v1_0));
- if(include_1_0.isEmpty())
- {
- parsePortList(include_1_0, serverConfig.getPortInclude10());
- }
-
- //0-10 excludes and includes
- Set<Integer> exclude_0_10 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_10));
- if(exclude_0_10.isEmpty())
- {
- parsePortList(exclude_0_10, serverConfig.getPortExclude010());
- }
-
- Set<Integer> include_0_10 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_10));
- if(include_0_10.isEmpty())
- {
- parsePortList(include_0_10, serverConfig.getPortInclude010());
- }
-
- //0-9-1 excludes and includes
- Set<Integer> exclude_0_9_1 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_9_1));
- if(exclude_0_9_1.isEmpty())
+ String qpidWork = System.getProperty(BrokerProperties.PROPERTY_QPID_WORK);
+ if (qpidWork == null)
{
- parsePortList(exclude_0_9_1, serverConfig.getPortExclude091());
+ qpidWork = new File(System.getProperty("user.dir"), "work").getAbsolutePath();
}
-
- Set<Integer> include_0_9_1 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_9_1));
- if(include_0_9_1.isEmpty())
- {
- parsePortList(include_0_9_1, serverConfig.getPortInclude091());
- }
-
- //0-9 excludes and includes
- Set<Integer> exclude_0_9 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_9));
- if(exclude_0_9.isEmpty())
- {
- parsePortList(exclude_0_9, serverConfig.getPortExclude09());
- }
-
- Set<Integer> include_0_9 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_9));
- if(include_0_9.isEmpty())
- {
- parsePortList(include_0_9, serverConfig.getPortInclude09());
- }
-
- //0-8 excludes and includes
- Set<Integer> exclude_0_8 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_8));
- if(exclude_0_8.isEmpty())
- {
- parsePortList(exclude_0_8, serverConfig.getPortExclude08());
- }
-
- Set<Integer> include_0_8 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_8));
- if(include_0_8.isEmpty())
- {
- parsePortList(include_0_8, serverConfig.getPortInclude08());
- }
-
- String bindAddr = options.getBind();
- if (bindAddr == null)
- {
- bindAddr = serverConfig.getBind();
- }
-
- InetAddress bindAddress;
- if (bindAddr.equals(WILDCARD_ADDRESS))
- {
- bindAddress = null;
- }
- else
- {
- bindAddress = InetAddress.getByName(bindAddr);
- }
-
- final AmqpProtocolVersion defaultSupportedProtocolReply = serverConfig.getDefaultSupportedProtocolReply();
-
- if (!serverConfig.getSSLOnly())
- {
- for(int port : ports)
- {
- final InetSocketAddress inetSocketAddress = new InetSocketAddress(bindAddress, port);
-
- final Set<AmqpProtocolVersion> supported =
- getSupportedVersions(port, exclude_1_0, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8,
- include_1_0, include_0_10, include_0_9_1, include_0_9, include_0_8,serverConfig);
-
- final NetworkTransportConfiguration settings =
- new ServerNetworkTransportConfiguration(serverConfig, inetSocketAddress, Transport.TCP);
-
- final IncomingNetworkTransport transport = Transport.getIncomingTransportInstance();
- final MultiVersionProtocolEngineFactory protocolEngineFactory =
- new MultiVersionProtocolEngineFactory(supported, defaultSupportedProtocolReply);
-
- transport.accept(settings, protocolEngineFactory, null);
-
- ApplicationRegistry.getInstance().addAcceptor(inetSocketAddress,
- new QpidAcceptor(transport,QpidAcceptor.Transport.TCP, supported));
- CurrentActor.get().message(BrokerMessages.LISTENING("TCP", port));
- }
- }
-
- if (serverConfig.getEnableSSL())
- {
- final String keystorePath = serverConfig.getConnectorKeyStorePath();
- final String keystorePassword = serverConfig.getConnectorKeyStorePassword();
- final String keystoreType = serverConfig.getConnectorKeyStoreType();
- final String keyManagerFactoryAlgorithm = serverConfig.getConnectorKeyManagerFactoryAlgorithm();
- final SSLContext sslContext;
- if(serverConfig.getConnectorTrustStorePath()!=null)
- {
- sslContext = SSLContextFactory.buildClientContext(serverConfig.getConnectorTrustStorePath(),
- serverConfig.getConnectorTrustStorePassword(),
- serverConfig.getConnectorTrustStoreType(),
- serverConfig.getConnectorTrustManagerFactoryAlgorithm(),
- keystorePath,
- keystorePassword, keystoreType, keyManagerFactoryAlgorithm,
- serverConfig.getCertAlias());
- }
- else
- {
- sslContext = SSLContextFactory.buildServerContext(keystorePath, keystorePassword, keystoreType, keyManagerFactoryAlgorithm);
- }
-
- for(int sslPort : sslPorts)
- {
- final InetSocketAddress inetSocketAddress = new InetSocketAddress(bindAddress, sslPort);
-
- final Set<AmqpProtocolVersion> supported =
- getSupportedVersions(sslPort, exclude_1_0, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8,
- include_1_0, include_0_10, include_0_9_1, include_0_9, include_0_8, serverConfig);
- final NetworkTransportConfiguration settings =
- new ServerNetworkTransportConfiguration(serverConfig, inetSocketAddress, Transport.TCP);
-
- final IncomingNetworkTransport transport = Transport.getIncomingTransportInstance();
- final MultiVersionProtocolEngineFactory protocolEngineFactory =
- new MultiVersionProtocolEngineFactory(supported, defaultSupportedProtocolReply);
-
- transport.accept(settings, protocolEngineFactory, sslContext);
-
- ApplicationRegistry.getInstance().addAcceptor(inetSocketAddress,
- new QpidAcceptor(transport,QpidAcceptor.Transport.SSL, supported));
- CurrentActor.get().message(BrokerMessages.LISTENING("TCP/SSL", sslPort));
- }
- }
-
- CurrentActor.get().message(BrokerMessages.READY());
+ storeLocation = new File(qpidWork, BrokerOptions.DEFAULT_CONFIG_FILE + "." + storeType).getAbsolutePath();
}
- finally
- {
- // Startup is complete so remove the AR initialised Startup actor
- CurrentActor.remove();
- }
- }
- private static Set<AmqpProtocolVersion> getSupportedVersions(final int port,
- final Set<Integer> exclude_1_0,
- final Set<Integer> exclude_0_10,
- final Set<Integer> exclude_0_9_1,
- final Set<Integer> exclude_0_9,
- final Set<Integer> exclude_0_8,
- final Set<Integer> include_1_0,
- final Set<Integer> include_0_10,
- final Set<Integer> include_0_9_1,
- final Set<Integer> include_0_9,
- final Set<Integer> include_0_8,
- final ServerConfiguration serverConfig)
- {
- final EnumSet<AmqpProtocolVersion> supported = EnumSet.allOf(AmqpProtocolVersion.class);
+ CurrentActor.get().message(BrokerMessages.CONFIG(storeLocation));
- if((exclude_1_0.contains(port) || !serverConfig.isAmqp10enabled()) && !include_1_0.contains(port))
- {
- supported.remove(AmqpProtocolVersion.v1_0_0);
- }
+ File logConfigFile = getConfigFile(options.getLogConfigFile(), BrokerOptions.DEFAULT_LOG_CONFIG_FILE, qpidHome, false);
+ configureLogging(logConfigFile, options.getLogWatchFrequency());
- if((exclude_0_10.contains(port) || !serverConfig.isAmqp010enabled()) && !include_0_10.contains(port))
- {
- supported.remove(AmqpProtocolVersion.v0_10);
- }
+ BrokerConfigurationStoreCreator storeCreator = new BrokerConfigurationStoreCreator();
+ ConfigurationEntryStore store = storeCreator.createStore(storeLocation, storeType,
+ options.getInitialConfigurationStoreLocation(), options.getInitialConfigurationStoreLocation());
- if((exclude_0_9_1.contains(port) || !serverConfig.isAmqp091enabled()) && !include_0_9_1.contains(port))
+ if (options.isManagementMode())
{
- supported.remove(AmqpProtocolVersion.v0_9_1);
+ store = new ManagementModeStoreHandler(store, options);
}
- if((exclude_0_9.contains(port) || !serverConfig.isAmqp09enabled()) && !include_0_9.contains(port))
+ _applicationRegistry = new ApplicationRegistry(store);
+ try
{
- supported.remove(AmqpProtocolVersion.v0_9);
+ _applicationRegistry.initialise();
}
-
- if((exclude_0_8.contains(port) || !serverConfig.isAmqp08enabled()) && !include_0_8.contains(port))
+ catch(Exception e)
{
- supported.remove(AmqpProtocolVersion.v0_8);
+ try
+ {
+ _applicationRegistry.close();
+ }
+ catch(Exception ce)
+ {
+ LOGGER.debug("An error occured when closing the registry following initialization failure", ce);
+ }
+ throw e;
}
- return supported;
}
+
private File getConfigFile(final String fileName,
final String defaultFileName,
final String qpidHome, boolean throwOnFileNotFound) throws InitException
@@ -368,11 +174,11 @@ public class Broker
if (!configFile.exists() && throwOnFileNotFound)
{
- String error = "File " + fileName + " could not be found. Check the file exists and is readable.";
+ String error = "File " + configFile + " could not be found. Check the file exists and is readable.";
if (qpidHome == null)
{
- error = error + "\nNote: " + BrokerOptions.QPID_HOME + " is not set.";
+ error = error + "\nNote: " + BrokerProperties.PROPERTY_QPID_HOME + " is not set.";
}
throw new InitException(error, null);
@@ -399,37 +205,6 @@ public class Broker
}
}
- /**
- * Update the configuration data with the management port.
- * @param configuration
- * @param registryServerPort The string from the command line
- */
- private void updateManagementPorts(ServerConfiguration configuration, Integer registryServerPort, Integer connectorServerPort)
- {
- if (registryServerPort != null)
- {
- try
- {
- configuration.setJMXPortRegistryServer(registryServerPort);
- }
- catch (NumberFormatException e)
- {
- throw new InitException("Invalid management (registry server) port: " + registryServerPort, null);
- }
- }
- if (connectorServerPort != null)
- {
- try
- {
- configuration.setJMXPortConnectorServer(connectorServerPort);
- }
- catch (NumberFormatException e)
- {
- throw new InitException("Invalid management (connector server) port: " + connectorServerPort, null);
- }
- }
- }
-
private void configureLogging(File logConfigFile, int logWatchTime) throws InitException, IOException
{
if (logConfigFile.exists() && logConfigFile.canRead())
@@ -443,7 +218,7 @@ public class Broker
// log4j expects the watch interval in milliseconds
try
{
- LoggingFacade.configureAndWatch(logConfigFile.getPath(), logWatchTime * 1000);
+ LoggingManagementFacade.configureAndWatch(logConfigFile.getPath(), logWatchTime * 1000);
}
catch (Exception e)
{
@@ -454,7 +229,7 @@ public class Broker
{
try
{
- LoggingFacade.configure(logConfigFile.getPath());
+ LoggingManagementFacade.configure(logConfigFile.getPath());
}
catch (Exception e)
{
@@ -531,6 +306,24 @@ public class Broker
LOGGER.debug("Skipping shutdown hook removal as there either isnt one, or we are it.");
}
}
+ /**
+ * Workaround that prevents AMQShortStrings cache from being left in the thread local. This is important
+ * when embedding the Broker in containers where the starting thread may not belong to Qpid.
+ * The long term solution here is to stop our use of AMQShortString outside the AMQP transport layer.
+ */
+ private void clearAMQShortStringCache()
+ {
+ AMQShortString.clearLocalCache();
+ }
+
+ public org.apache.qpid.server.model.Broker getBroker()
+ {
+ if (_applicationRegistry == null)
+ {
+ return null;
+ }
+ return _applicationRegistry.getBroker();
+ }
private class ShutdownService implements Runnable
{
@@ -540,4 +333,5 @@ public class Broker
Broker.this.shutdown();
}
}
+
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java Thu Feb 28 16:14:30 2013
@@ -20,66 +20,25 @@
*/
package org.apache.qpid.server;
-import org.osgi.framework.BundleContext;
-
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
public class BrokerOptions
{
- public static final String DEFAULT_CONFIG_FILE = "etc/config.xml";
+ public static final String DEFAULT_STORE_TYPE = "json";
+ public static final String DEFAULT_CONFIG_FILE = "config";
public static final String DEFAULT_LOG_CONFIG_FILE = "etc/log4j.xml";
- public static final String QPID_HOME = "QPID_HOME";
- public static final String QPID_WORK = "QPID_WORK";
-
- private final Set<Integer> _ports = new HashSet<Integer>();
- private final Set<Integer> _sslPorts = new HashSet<Integer>();
- private final Map<ProtocolExclusion,Set<Integer>> _exclusionMap = new HashMap<ProtocolExclusion, Set<Integer>>();
- private final Map<ProtocolInclusion,Set<Integer>> _inclusionMap = new HashMap<ProtocolInclusion, Set<Integer>>();
- private String _configFile;
private String _logConfigFile;
- private String _bind;
- private Integer _jmxPortRegistryServer;
- private Integer _jmxPortConnectorServer;
- private BundleContext _bundleContext;
-
private Integer _logWatchFrequency = 0;
- private String _qpidWorkFolder;
- private String _qpidHomeFolder;
- public void addPort(final int port)
- {
- _ports.add(port);
- }
+ private String _configurationStoreLocation;
+ private String _configurationStoreType = DEFAULT_STORE_TYPE;
- public void addSSLPort(final int sslPort)
- {
- _sslPorts.add(sslPort);
- }
+ private String _initialConfigurationStoreLocation;
+ private String _initialConfigurationStoreType = DEFAULT_STORE_TYPE;
- public Set<Integer> getPorts()
- {
- return Collections.unmodifiableSet(_ports);
- }
-
- public Set<Integer> getSSLPorts()
- {
- return Collections.unmodifiableSet(_sslPorts);
- }
-
- public String getConfigFile()
- {
- return _configFile;
- }
-
- public void setConfigFile(final String configFile)
- {
- _configFile = configFile;
- }
+ private boolean _managementMode;
+ private int _managementModeRmiPort;
+ private int _managementModeConnectorPort;
+ private int _managementModeHttpPort;
public String getLogConfigFile()
{
@@ -91,110 +50,97 @@ public class BrokerOptions
_logConfigFile = logConfigFile;
}
- public Integer getJmxPortRegistryServer()
+ public int getLogWatchFrequency()
{
- return _jmxPortRegistryServer;
+ return _logWatchFrequency;
}
- public void setJmxPortRegistryServer(final int jmxPortRegistryServer)
+ /**
+ * Set the frequency with which the log config file will be checked for updates.
+ * @param logWatchFrequency frequency in seconds
+ */
+ public void setLogWatchFrequency(final int logWatchFrequency)
{
- _jmxPortRegistryServer = jmxPortRegistryServer;
+ _logWatchFrequency = logWatchFrequency;
}
- public Integer getJmxPortConnectorServer()
+ public String getConfigurationStoreLocation()
{
- return _jmxPortConnectorServer;
+ return _configurationStoreLocation;
}
- public void setJmxPortConnectorServer(final int jmxPortConnectorServer)
+ public void setConfigurationStoreLocation(String cofigurationStore)
{
- _jmxPortConnectorServer = jmxPortConnectorServer;
+ _configurationStoreLocation = cofigurationStore;
}
- public String getQpidHome()
+
+ public String getConfigurationStoreType()
{
- return _qpidHomeFolder == null? System.getProperty(QPID_HOME): _qpidHomeFolder;
+ return _configurationStoreType;
}
- public Set<Integer> getExcludedPorts(final ProtocolExclusion excludeProtocol)
+ public void setConfigurationStoreType(String cofigurationStoreType)
{
- final Set<Integer> excludedPorts = _exclusionMap.get(excludeProtocol);
- return excludedPorts == null ? Collections.<Integer>emptySet() : excludedPorts;
+ _configurationStoreType = cofigurationStoreType;
}
- public void addExcludedPort(final ProtocolExclusion excludeProtocol, final int port)
+ public void setInitialConfigurationStoreLocation(String initialConfigurationStore)
{
- if (!_exclusionMap.containsKey(excludeProtocol))
- {
- _exclusionMap.put(excludeProtocol, new HashSet<Integer>());
- }
-
- Set<Integer> ports = _exclusionMap.get(excludeProtocol);
- ports.add(port);
+ _initialConfigurationStoreLocation = initialConfigurationStore;
}
- public String getBind()
+ public void setInitialConfigurationStoreType(String initialConfigurationStoreType)
{
- return _bind;
+ _initialConfigurationStoreType = initialConfigurationStoreType;
}
- public void setBind(final String bind)
+ public String getInitialConfigurationStoreLocation()
{
- _bind = bind;
+ return _initialConfigurationStoreLocation;
}
- public int getLogWatchFrequency()
+ public String getInitialConfigurationStoreType()
{
- return _logWatchFrequency;
+ return _initialConfigurationStoreType;
}
- /**
- * Set the frequency with which the log config file will be checked for updates.
- * @param logWatchFrequency frequency in seconds
- */
- public void setLogWatchFrequency(final int logWatchFrequency)
+ public boolean isManagementMode()
{
- _logWatchFrequency = logWatchFrequency;
+ return _managementMode;
}
- public BundleContext getBundleContext()
+ public void setManagementMode(boolean managementMode)
{
- return _bundleContext ;
+ _managementMode = managementMode;
}
- public void setBundleContext(final BundleContext bundleContext)
+ public int getManagementModeRmiPort()
{
- _bundleContext = bundleContext;
+ return _managementModeRmiPort;
}
- public Set<Integer> getIncludedPorts(final ProtocolInclusion includeProtocol)
+ public void setManagementModeRmiPort(int managementModeRmiPort)
{
- final Set<Integer> includedPorts = _inclusionMap.get(includeProtocol);
- return includedPorts == null ? Collections.<Integer>emptySet() : includedPorts;
+ _managementModeRmiPort = managementModeRmiPort;
}
- public void addIncludedPort(final ProtocolInclusion includeProtocol, final int port)
+ public int getManagementModeConnectorPort()
{
- if (!_inclusionMap.containsKey(includeProtocol))
- {
- _inclusionMap.put(includeProtocol, new HashSet<Integer>());
- }
-
- Set<Integer> ports = _inclusionMap.get(includeProtocol);
- ports.add(port);
+ return _managementModeConnectorPort;
}
- public String getQpidWork()
+ public void setManagementModeConnectorPort(int managementModeConnectorPort)
{
- return _qpidWorkFolder;
+ _managementModeConnectorPort = managementModeConnectorPort;
}
- public void setQpidWork(String qpidWorkFolder)
+ public int getManagementModeHttpPort()
{
- _qpidWorkFolder = qpidWorkFolder;
+ return _managementModeHttpPort;
}
- public void setQpidHome(String qpidHomeFolder)
+ public void setManagementModeHttpPort(int managementModeHttpPort)
{
- _qpidHomeFolder = qpidHomeFolder;
+ _managementModeHttpPort = managementModeHttpPort;
}
}
\ No newline at end of file
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/Main.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/Main.java Thu Feb 28 16:14:30 2013
@@ -30,9 +30,6 @@ import org.apache.commons.cli.PosixParse
import org.apache.log4j.Logger;
import org.apache.qpid.common.QpidProperties;
import org.apache.qpid.framing.ProtocolVersion;
-import org.apache.qpid.server.Broker.InitException;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-
/**
* Main entry point for AMQPD.
@@ -45,86 +42,17 @@ public class Main
private static final Option OPTION_VERSION = new Option("v", "version", false, "print the version information and exit");
- private static final Option OPTION_CONFIG_FILE =
- OptionBuilder.withArgName("file").hasArg().withDescription("use given configuration file").withLongOpt("config")
- .create("c");
-
- private static final Option OPTION_PORT =
- OptionBuilder.withArgName("port").hasArg()
- .withDescription("listen on the specified port. Overrides any value in the config file")
- .withLongOpt("port").create("p");
-
- private static final Option OPTION_SSLPORT =
- OptionBuilder.withArgName("port").hasArg()
- .withDescription("SSL port. Overrides any value in the config file")
- .withLongOpt("sslport").create("s");
-
-
- private static final Option OPTION_EXCLUDE_1_0 =
- OptionBuilder.withArgName("port").hasArg()
- .withDescription("when listening on the specified port do not accept AMQP1-0 connections. The specified port must be one specified on the command line")
- .withLongOpt("exclude-1-0").create();
-
- private static final Option OPTION_EXCLUDE_0_10 =
- OptionBuilder.withArgName("port").hasArg()
- .withDescription("when listening on the specified port do not accept AMQP0-10 connections. The specified port must be one specified on the command line")
- .withLongOpt("exclude-0-10").create();
-
- private static final Option OPTION_EXCLUDE_0_9_1 =
- OptionBuilder.withArgName("port").hasArg()
- .withDescription("when listening on the specified port do not accept AMQP0-9-1 connections. The specified port must be one specified on the command line")
- .withLongOpt("exclude-0-9-1").create();
-
- private static final Option OPTION_EXCLUDE_0_9 =
- OptionBuilder.withArgName("port").hasArg()
- .withDescription("when listening on the specified port do not accept AMQP0-9 connections. The specified port must be one specified on the command line")
- .withLongOpt("exclude-0-9").create();
-
- private static final Option OPTION_EXCLUDE_0_8 =
- OptionBuilder.withArgName("port").hasArg()
- .withDescription("when listening on the specified port do not accept AMQP0-8 connections. The specified port must be one specified on the command line")
- .withLongOpt("exclude-0-8").create();
-
- private static final Option OPTION_INCLUDE_1_0 =
- OptionBuilder.withArgName("port").hasArg()
- .withDescription("accept AMQP1-0 connections on this port, overriding configuration to the contrary. The specified port must be one specified on the command line")
- .withLongOpt("include-1-0").create();
-
-private static final Option OPTION_INCLUDE_0_10 =
- OptionBuilder.withArgName("port").hasArg()
- .withDescription("accept AMQP0-10 connections on this port, overriding configuration to the contrary. The specified port must be one specified on the command line")
- .withLongOpt("include-0-10").create();
-
-private static final Option OPTION_INCLUDE_0_9_1 =
- OptionBuilder.withArgName("port").hasArg()
- .withDescription("accept AMQP0-9-1 connections on this port, overriding configuration to the contrary. The specified port must be one specified on the command line")
- .withLongOpt("include-0-9-1").create();
-
-private static final Option OPTION_INCLUDE_0_9 =
- OptionBuilder.withArgName("port").hasArg()
- .withDescription("accept AMQP0-9 connections on this port, overriding configuration to the contrary. The specified port must be one specified on the command line")
- .withLongOpt("include-0-9").create();
-
-private static final Option OPTION_INCLUDE_0_8 =
- OptionBuilder.withArgName("port").hasArg()
- .withDescription("accept AMQP0-8 connections on this port, overriding configuration to the contrary. The specified port must be one specified on the command line")
- .withLongOpt("include-0-8").create();
-
-
- private static final Option OPTION_JMX_PORT_REGISTRY_SERVER =
- OptionBuilder.withArgName("port").hasArg()
- .withDescription("listen on the specified management (registry server) port. Overrides any value in the config file")
- .withLongOpt("jmxregistryport").create("m");
-
- private static final Option OPTION_JMX_PORT_CONNECTOR_SERVER =
- OptionBuilder.withArgName("port").hasArg()
- .withDescription("listen on the specified management (connector server) port. Overrides any value in the config file")
- .withLongOpt("jmxconnectorport").create();
-
- private static final Option OPTION_BIND =
- OptionBuilder.withArgName("address").hasArg()
- .withDescription("bind to the specified address. Overrides any value in the config file")
- .withLongOpt("bind").create("b");
+ private static final Option OPTION_CONFIGURATION_STORE_PATH = OptionBuilder.withArgName("path").hasArg()
+ .withDescription("use given configuration store location").withLongOpt("store-path").create("sp");
+
+ private static final Option OPTION_CONFIGURATION_STORE_TYPE = OptionBuilder.withArgName("type").hasArg()
+ .withDescription("use given store type").withLongOpt("store-type").create("st");
+
+ private static final Option OPTION_INITIAL_CONFIGURATION_STORE_PATH = OptionBuilder.withArgName("path").hasArg()
+ .withDescription("pass the location of initial store to use to create a user store").withLongOpt("initial-store-path").create("isp");
+
+ private static final Option OPTION_INITIAL_CONFIGURATION_STORE_TYPE = OptionBuilder.withArgName("type").hasArg()
+ .withDescription("the type of initial store").withLongOpt("initial-store-type").create("ist");
private static final Option OPTION_LOG_CONFIG_FILE =
OptionBuilder.withArgName("file").hasArg()
@@ -137,31 +65,31 @@ private static final Option OPTION_INCLU
.withDescription("monitor the log file configuration file for changes. Units are seconds. "
+ "Zero means do not check for changes.").withLongOpt("logwatch").create("w");
+ private static final Option OPTION_MANAGEMENT_MODE = OptionBuilder.withDescription("start broker in a management mode")
+ .withLongOpt("management-mode").create("mm");
+ private static final Option OPTION_RMI_PORT = OptionBuilder.withArgName("port").hasArg()
+ .withDescription("override jmx rmi port in management mode").withLongOpt("jmxregistryport").create("rmi");
+ private static final Option OPTION_CONNECTOR_PORT = OptionBuilder.withArgName("port").hasArg()
+ .withDescription("override jmx connector port in management mode").withLongOpt("jmxconnectorport").create("jmxrmi");
+ private static final Option OPTION_HTTP_PORT = OptionBuilder.withArgName("port").hasArg()
+ .withDescription("override web management port in management mode").withLongOpt("httpport").create("http");
+
private static final Options OPTIONS = new Options();
static
{
OPTIONS.addOption(OPTION_HELP);
OPTIONS.addOption(OPTION_VERSION);
- OPTIONS.addOption(OPTION_CONFIG_FILE);
+ OPTIONS.addOption(OPTION_CONFIGURATION_STORE_PATH);
+ OPTIONS.addOption(OPTION_CONFIGURATION_STORE_TYPE);
OPTIONS.addOption(OPTION_LOG_CONFIG_FILE);
OPTIONS.addOption(OPTION_LOG_WATCH);
- OPTIONS.addOption(OPTION_PORT);
- OPTIONS.addOption(OPTION_SSLPORT);
- OPTIONS.addOption(OPTION_EXCLUDE_1_0);
- OPTIONS.addOption(OPTION_EXCLUDE_0_10);
- OPTIONS.addOption(OPTION_EXCLUDE_0_9_1);
- OPTIONS.addOption(OPTION_EXCLUDE_0_9);
- OPTIONS.addOption(OPTION_EXCLUDE_0_8);
- OPTIONS.addOption(OPTION_INCLUDE_1_0);
- OPTIONS.addOption(OPTION_INCLUDE_0_10);
- OPTIONS.addOption(OPTION_INCLUDE_0_9_1);
- OPTIONS.addOption(OPTION_INCLUDE_0_9);
- OPTIONS.addOption(OPTION_INCLUDE_0_8);
- OPTIONS.addOption(OPTION_BIND);
-
- OPTIONS.addOption(OPTION_JMX_PORT_REGISTRY_SERVER);
- OPTIONS.addOption(OPTION_JMX_PORT_CONNECTOR_SERVER);
+ OPTIONS.addOption(OPTION_INITIAL_CONFIGURATION_STORE_PATH);
+ OPTIONS.addOption(OPTION_INITIAL_CONFIGURATION_STORE_TYPE);
+ OPTIONS.addOption(OPTION_MANAGEMENT_MODE);
+ OPTIONS.addOption(OPTION_RMI_PORT);
+ OPTIONS.addOption(OPTION_CONNECTOR_PORT);
+ OPTIONS.addOption(OPTION_HTTP_PORT);
}
protected CommandLine _commandLine;
@@ -243,10 +171,15 @@ private static final Option OPTION_INCLU
else
{
BrokerOptions options = new BrokerOptions();
- String configFile = _commandLine.getOptionValue(OPTION_CONFIG_FILE.getOpt());
- if(configFile != null)
+ String configurationStore = _commandLine.getOptionValue(OPTION_CONFIGURATION_STORE_PATH.getOpt());
+ if (configurationStore != null)
+ {
+ options.setConfigurationStoreLocation(configurationStore);
+ }
+ String configurationStoreType = _commandLine.getOptionValue(OPTION_CONFIGURATION_STORE_TYPE.getOpt());
+ if (configurationStoreType != null)
{
- options.setConfigFile(configFile);
+ options.setConfigurationStoreType(configurationStoreType);
}
String logWatchConfig = _commandLine.getOptionValue(OPTION_LOG_WATCH.getOpt());
@@ -261,52 +194,37 @@ private static final Option OPTION_INCLU
options.setLogConfigFile(logConfig);
}
- String jmxPortRegistryServer = _commandLine.getOptionValue(OPTION_JMX_PORT_REGISTRY_SERVER.getOpt());
- if(jmxPortRegistryServer != null)
+ String initialConfigurationStore = _commandLine.getOptionValue(OPTION_INITIAL_CONFIGURATION_STORE_PATH.getOpt());
+ if (initialConfigurationStore != null)
{
- options.setJmxPortRegistryServer(Integer.parseInt(jmxPortRegistryServer));
+ options.setInitialConfigurationStoreLocation(initialConfigurationStore);
}
-
- String jmxPortConnectorServer = _commandLine.getOptionValue(OPTION_JMX_PORT_CONNECTOR_SERVER.getLongOpt());
- if(jmxPortConnectorServer != null)
+ String initailConfigurationStoreType = _commandLine.getOptionValue(OPTION_INITIAL_CONFIGURATION_STORE_TYPE.getOpt());
+ if (initailConfigurationStoreType != null)
{
- options.setJmxPortConnectorServer(Integer.parseInt(jmxPortConnectorServer));
+ options.setInitialConfigurationStoreType(initailConfigurationStoreType);
}
- String bindAddr = _commandLine.getOptionValue(OPTION_BIND.getOpt());
- if (bindAddr != null)
+ boolean managmentMode = _commandLine.hasOption(OPTION_MANAGEMENT_MODE.getOpt());
+ if (managmentMode)
{
- options.setBind(bindAddr);
- }
-
- String[] portStr = _commandLine.getOptionValues(OPTION_PORT.getOpt());
- if(portStr != null)
- {
- parsePortArray(options, portStr, false);
- for(ProtocolExclusion pe : ProtocolExclusion.values())
+ options.setManagementMode(true);
+ String rmiPort = _commandLine.getOptionValue(OPTION_RMI_PORT.getOpt());
+ if (rmiPort != null)
{
- parsePortArray(options, _commandLine.getOptionValues(pe.getExcludeName()), pe);
+ options.setManagementModeRmiPort(Integer.parseInt(rmiPort));
}
- for(ProtocolInclusion pe : ProtocolInclusion.values())
+ String connectorPort = _commandLine.getOptionValue(OPTION_CONNECTOR_PORT.getOpt());
+ if (connectorPort != null)
{
- parseProtocolInclusions(options, _commandLine.getOptionValues(pe.getIncludeName()), pe);
+ options.setManagementModeConnectorPort(Integer.parseInt(connectorPort));
}
- }
-
- String[] sslPortStr = _commandLine.getOptionValues(OPTION_SSLPORT.getOpt());
- if(sslPortStr != null)
- {
- parsePortArray(options, sslPortStr, true);
- for(ProtocolExclusion pe : ProtocolExclusion.values())
- {
- parsePortArray(options, _commandLine.getOptionValues(pe.getExcludeName()), pe);
- }
- for(ProtocolInclusion pe : ProtocolInclusion.values())
+ String httpPort = _commandLine.getOptionValue(OPTION_HTTP_PORT.getOpt());
+ if (httpPort != null)
{
- parseProtocolInclusions(options, _commandLine.getOptionValues(pe.getIncludeName()), pe);
+ options.setManagementModeHttpPort(Integer.parseInt(httpPort));
}
}
-
setExceptionHandler();
startBroker(options);
@@ -389,72 +307,7 @@ private static final Option OPTION_INCLU
protected void shutdown(final int status)
{
- ApplicationRegistry.remove();
System.exit(status);
}
- private static void parsePortArray(final BrokerOptions options,final Object[] ports,
- final boolean ssl) throws InitException
- {
- if(ports != null)
- {
- for(int i = 0; i < ports.length; i++)
- {
- try
- {
- if(ssl)
- {
- options.addSSLPort(Integer.parseInt(String.valueOf(ports[i])));
- }
- else
- {
- options.addPort(Integer.parseInt(String.valueOf(ports[i])));
- }
- }
- catch (NumberFormatException e)
- {
- throw new InitException("Invalid port: " + ports[i], e);
- }
- }
- }
- }
-
- private static void parsePortArray(final BrokerOptions options, final Object[] ports,
- final ProtocolExclusion excludedProtocol) throws InitException
- {
- if(ports != null)
- {
- for(int i = 0; i < ports.length; i++)
- {
- try
- {
- options.addExcludedPort(excludedProtocol,
- Integer.parseInt(String.valueOf(ports[i])));
- }
- catch (NumberFormatException e)
- {
- throw new InitException("Invalid port for exclusion: " + ports[i], e);
- }
- }
- }
- }
-
- private static void parseProtocolInclusions(final BrokerOptions options, final Object[] ports,
- final ProtocolInclusion includedProtocol) throws InitException
- {
- if(ports != null)
- {
- for(int i = 0; i < ports.length; i++)
- {
- try
- {
- options.addIncludedPort(includedProtocol, Integer.parseInt(String.valueOf(ports[i])));
- }
- catch (NumberFormatException e)
- {
- throw new InitException("Invalid port for inclusion: " + ports[i], e);
- }
- }
- }
- }
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/TransactionTimeoutHelper.java Thu Feb 28 16:14:30 2013
@@ -18,46 +18,85 @@
*/
package org.apache.qpid.server;
-import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogMessage;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
+import org.apache.qpid.server.txn.ServerTransaction;
public class TransactionTimeoutHelper
{
- private static final Logger LOGGER = Logger.getLogger(TransactionTimeoutHelper.class);
-
- public static final String IDLE_TRANSACTION_ALERT = "IDLE TRANSACTION ALERT";
- public static final String OPEN_TRANSACTION_ALERT = "OPEN TRANSACTION ALERT";
+ private static final String OPEN_TRANSACTION_TIMEOUT_ERROR = "Open transaction timed out";
+ private static final String IDLE_TRANSACTION_TIMEOUT_ERROR = "Idle transaction timed out";
private final LogSubject _logSubject;
- public TransactionTimeoutHelper(final LogSubject logSubject)
+ private final CloseAction _closeAction;
+
+ public TransactionTimeoutHelper(final LogSubject logSubject, final CloseAction closeAction)
{
_logSubject = logSubject;
+ _closeAction = closeAction;
}
- public void logIfNecessary(final long timeSoFar, final long warnTimeout,
- final LogMessage message, final String alternateLogPrefix)
+ public void checkIdleOrOpenTimes(ServerTransaction transaction, long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
{
- if (isTimedOut(timeSoFar, warnTimeout))
+ if (transaction.isTransactional())
{
- LogActor logActor = CurrentActor.get();
- if(logActor.getRootMessageLogger().isMessageEnabled(logActor, _logSubject, message.getLogHierarchy()))
+ final long transactionUpdateTime = transaction.getTransactionUpdateTime();
+ if(transactionUpdateTime > 0)
{
- logActor.message(_logSubject, message);
+ long idleTime = System.currentTimeMillis() - transactionUpdateTime;
+ boolean closed = logAndCloseIfNecessary(idleTime, idleWarn, idleClose, ChannelMessages.IDLE_TXN(idleTime), IDLE_TRANSACTION_TIMEOUT_ERROR);
+ if (closed)
+ {
+ return; // no point proceeding to check the open time
+ }
}
- else
+
+ final long transactionStartTime = transaction.getTransactionStartTime();
+ if(transactionStartTime > 0)
{
- LOGGER.warn(alternateLogPrefix + " " + _logSubject.toLogString() + " " + timeSoFar + " ms");
+ long openTime = System.currentTimeMillis() - transactionStartTime;
+ logAndCloseIfNecessary(openTime, openWarn, openClose, ChannelMessages.OPEN_TXN(openTime), OPEN_TRANSACTION_TIMEOUT_ERROR);
}
}
}
- public boolean isTimedOut(long timeSoFar, long timeout)
+ /**
+ * @return true iff closeTimeout was exceeded
+ */
+ private boolean logAndCloseIfNecessary(final long timeSoFar,
+ final long warnTimeout, final long closeTimeout,
+ final LogMessage warnMessage, final String closeMessage) throws AMQException
+ {
+ if (isTimedOut(timeSoFar, warnTimeout))
+ {
+ LogActor logActor = CurrentActor.get();
+ logActor.message(_logSubject, warnMessage);
+ }
+
+ if(isTimedOut(timeSoFar, closeTimeout))
+ {
+ _closeAction.doTimeoutAction(closeMessage);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ private boolean isTimedOut(long timeSoFar, long timeout)
{
return timeout > 0L && timeSoFar > timeout;
}
+
+ public interface CloseAction
+ {
+ void doTimeoutAction(String reason) throws AMQException;
+ }
+
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java Thu Feb 28 16:14:30 2013
@@ -35,13 +35,15 @@ public class Binding
private final Exchange _exchange;
private final Map<String, Object> _arguments;
private final UUID _id;
- private final UUID _qmfId;
private final AtomicLong _matches = new AtomicLong();
- public Binding(UUID id, UUID qmfId, final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> arguments)
+ public Binding(UUID id,
+ final String bindingKey,
+ final AMQQueue queue,
+ final Exchange exchange,
+ final Map<String, Object> arguments)
{
_id = id;
- _qmfId = qmfId;
_bindingKey = bindingKey;
_queue = queue;
_exchange = exchange;
@@ -53,11 +55,6 @@ public class Binding
return _id;
}
- public UUID getQMFId()
- {
- return _qmfId;
- }
-
public String getBindingKey()
{
return _bindingKey;
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java Thu Feb 28 16:14:30 2013
@@ -24,10 +24,6 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.configuration.BindingConfig;
-import org.apache.qpid.server.configuration.BindingConfigType;
-import org.apache.qpid.server.configuration.ConfigStore;
-import org.apache.qpid.server.configuration.ConfiguredObject;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.BindingMessages;
@@ -52,7 +48,7 @@ public class BindingFactory
_virtualHost = vhost;
}
- private final class BindingImpl extends Binding implements AMQQueue.Task, Exchange.Task, BindingConfig
+ private final class BindingImpl extends Binding implements AMQQueue.Task, Exchange.Task
{
private final BindingLogSubject _logSubject;
//TODO : persist creation time
@@ -60,7 +56,7 @@ public class BindingFactory
private BindingImpl(UUID id, String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> arguments)
{
- super(id, queue.getVirtualHost().getConfigStore().createId(), bindingKey, queue, exchange, arguments);
+ super(id, bindingKey, queue, exchange, arguments);
_logSubject = new BindingLogSubject(bindingKey,exchange,queue);
}
@@ -96,16 +92,6 @@ public class BindingFactory
return _createTime;
}
- public BindingConfigType getConfigType()
- {
- return BindingConfigType.getInstance();
- }
-
- public ConfiguredObject getParent()
- {
- return _virtualHost;
- }
-
public boolean isDurable()
{
return getQueue().isDurable() && getExchange().isDurable();
@@ -186,7 +172,6 @@ public class BindingFactory
exchange.addCloseTask(b);
queue.addBinding(b);
exchange.addBinding(b);
- getConfigStore().addConfiguredObject(b);
b.logCreation();
return true;
@@ -197,11 +182,6 @@ public class BindingFactory
}
}
- private ConfigStore getConfigStore()
- {
- return _virtualHost.getConfigStore();
- }
-
public void restoreBinding(final UUID id, final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> argumentMap) throws AMQSecurityException, AMQInternalException
{
makeBinding(id, bindingKey,queue,exchange,argumentMap,true, false);
@@ -257,7 +237,6 @@ public class BindingFactory
_virtualHost.getMessageStore().unbindQueue(b);
}
b.logDestruction();
- getConfigStore().removeConfiguredObject(b);
}
return b;
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java Thu Feb 28 16:14:30 2013
@@ -24,11 +24,11 @@ import org.apache.commons.configuration.
import org.apache.commons.configuration.ConfigurationException;
import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
+import org.apache.qpid.server.configuration.plugins.AbstractConfiguration;
import java.util.List;
-public class QueueConfiguration extends ConfigurationPlugin
+public class QueueConfiguration extends AbstractConfiguration
{
private String _name;
private VirtualHostConfiguration _vHostConfig;
@@ -39,7 +39,7 @@ public class QueueConfiguration extends
_name = name;
CompositeConfiguration mungedConf = new CompositeConfiguration();
- mungedConf.addConfiguration(_vHostConfig.getConfig().subset("queues.queue." + name));
+ mungedConf.addConfiguration(_vHostConfig.getConfig().subset("queues.queue." + escapeTagName(name)));
mungedConf.addConfiguration(_vHostConfig.getConfig().subset("queues"));
setConfiguration("virtualhosts.virtualhost.queues.queue", mungedConf);
@@ -193,43 +193,4 @@ public class QueueConfiguration extends
{
return getBooleanValue("deadLetterQueues", _vHostConfig.isDeadLetterQueueEnabled());
}
-
- public static class QueueConfig extends ConfigurationPlugin
- {
- @Override
- public String[] getElementsProcessed()
- {
- return new String[]{"name"};
- }
-
- public String getName()
- {
- return getStringValue("name");
- }
-
-
- public void validateConfiguration() throws ConfigurationException
- {
- if (getConfig().isEmpty())
- {
- throw new ConfigurationException("Queue section cannot be empty.");
- }
-
- if (getName() == null)
- {
- throw new ConfigurationException("Queue section must have a 'name' element.");
- }
-
- }
-
-
- @Override
- public String formatToString()
- {
- return "Name:"+getName();
- }
-
-
- }
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org