You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/08/03 14:14:12 UTC
svn commit: r1368910 [13/27] - in /qpid/branches/asyncstore: ./ bin/
cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ruby/ cpp/bindings/qpid/python/
cpp/bindings/qpid/ruby/ cpp/bindings/qpid/ruby/features/
cpp/bindings/qpid/ruby/features/step_definitions/ cpp...
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java Fri Aug 3 12:13:32 2012
@@ -70,6 +70,7 @@ public class ManagementExchange implemen
private final Set<Binding> _bindingSet = new CopyOnWriteArraySet<Binding>();
private UUID _id;
+ private UUID _qmfId;
private static final String AGENT_BANK = "0";
private int _bindingCountHigh;
@@ -84,7 +85,7 @@ public class ManagementExchange implemen
private class ManagementQueue implements BaseQueue
{
- private final UUID QUEUE_ID = UUIDGenerator.generateUUID();
+ private final UUID QUEUE_ID = UUIDGenerator.generateRandomUUID();
private final String NAME_AS_STRING = "##__mgmt_pseudo_queue__##" + QUEUE_ID.toString();
private final AMQShortString NAME_AS_SHORT_STRING = new AMQShortString(NAME_AS_STRING);
@@ -196,6 +197,7 @@ public class ManagementExchange implemen
_virtualHost = host;
_id = id;
_virtualHost.scheduleHouseKeepingTask(_virtualHost.getBroker().getManagementPublishInterval(), new UpdateTask(_virtualHost));
+ _qmfId = getConfigStore().createId();
getConfigStore().addConfiguredObject(this);
getQMFService().addListener(this);
}
@@ -205,6 +207,12 @@ public class ManagementExchange implemen
return _id;
}
+ @Override
+ public UUID getQMFId()
+ {
+ return _qmfId;
+ }
+
public ExchangeConfigType getConfigType()
{
return ExchangeConfigType.getInstance();
@@ -540,6 +548,11 @@ public class ManagementExchange implemen
return getMsgReceives();
}
+ public long getMsgDrops()
+ {
+ return 0l;
+ }
+
public long getByteReceives()
{
return _bytesReceived.get();
@@ -550,6 +563,11 @@ public class ManagementExchange implemen
return getByteReceives();
}
+ public long getByteDrops()
+ {
+ return 0l;
+ }
+
public long getCreateTime()
{
return _createTime;
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/qmf/QMFMessage.java Fri Aug 3 12:13:32 2012
@@ -21,6 +21,8 @@
package org.apache.qpid.qmf;
+import java.util.Collection;
+import java.util.Collections;
import org.apache.commons.lang.NotImplementedException;
import org.apache.qpid.framing.AMQShortString;
@@ -111,6 +113,16 @@ public class QMFMessage implements Serve
return 0;
}
+ public String getUserId()
+ {
+ return null;
+ }
+
+ public String getAppId()
+ {
+ return null;
+ }
+
public String getMessageId()
{
return null;
@@ -166,6 +178,12 @@ public class QMFMessage implements Serve
return false;
}
+ @Override
+ public Collection<String> getHeaderNames()
+ {
+ return Collections.EMPTY_SET;
+ }
+
public boolean containsHeader(String name)
{
return false;
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/qmf/QMFObject.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/qmf/QMFObject.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/qmf/QMFObject.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/qmf/QMFObject.java Fri Aug 3 12:13:32 2012
@@ -28,7 +28,7 @@ public abstract class QMFObject<C extend
public interface Delegate
{
- UUID getId();
+ UUID getQMFId();
long getCreateTime();
}
@@ -49,7 +49,7 @@ public abstract class QMFObject<C extend
public final UUID getId()
{
- return _delegate.getId();
+ return _delegate.getQMFId();
}
public final long getCreateTime()
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java Fri Aug 3 12:13:32 2012
@@ -436,7 +436,7 @@ public class QMFService implements Confi
QMFObject qmfObject = classObjects.remove(object);
if(qmfObject != null)
{
- _managedObjectsById.get(qmfClass).remove(object.getId());
+ _managedObjectsById.get(qmfClass).remove(object.getQMFId());
objectRemoved(qmfObject);
}
}
@@ -468,7 +468,7 @@ public class QMFService implements Confi
}
}
- classObjectsById.put(object.getId(),qmfObject);
+ classObjectsById.put(object.getQMFId(),qmfObject);
if(classObjects.putIfAbsent(object, qmfObject) == null)
{
@@ -570,7 +570,7 @@ public class QMFService implements Confi
public UUID getSystemId()
{
- return _obj.getId();
+ return _obj.getQMFId();
}
public String getOsName()
@@ -598,9 +598,9 @@ public class QMFService implements Confi
return _obj.getOSArchitecture();
}
- public UUID getId()
+ public UUID getQMFId()
{
- return _obj.getId();
+ return _obj.getQMFId();
}
public long getCreateTime()
@@ -964,9 +964,9 @@ public class QMFService implements Confi
return factory.createResponseCommand(CompletionCode.NOT_IMPLEMENTED);
}
- public UUID getId()
+ public UUID getQMFId()
{
- return _obj.getId();
+ return _obj.getQMFId();
}
public long getCreateTime()
@@ -1004,9 +1004,9 @@ public class QMFService implements Confi
return _obj.getFederationTag();
}
- public UUID getId()
+ public UUID getQMFId()
{
- return _obj.getId();
+ return _obj.getQMFId();
}
public long getCreateTime()
@@ -1135,9 +1135,9 @@ public class QMFService implements Confi
return _obj.getByteRoutes();
}
- public UUID getId()
+ public UUID getQMFId()
{
- return _obj.getId();
+ return _obj.getQMFId();
}
public long getCreateTime()
@@ -1470,9 +1470,9 @@ public class QMFService implements Confi
return _obj.getArguments();
}
- public UUID getId()
+ public UUID getQMFId()
{
- return _obj.getId();
+ return _obj.getQMFId();
}
public long getCreateTime()
@@ -1526,9 +1526,9 @@ public class QMFService implements Confi
return _obj.getMatches();
}
- public UUID getId()
+ public UUID getQMFId()
{
- return _obj.getId();
+ return _obj.getQMFId();
}
public long getCreateTime()
@@ -1647,9 +1647,9 @@ public class QMFService implements Confi
return factory.createResponseCommand();
}
- public UUID getId()
+ public UUID getQMFId()
{
- return _obj.getId();
+ return _obj.getQMFId();
}
public long getCreateTime()
@@ -1741,6 +1741,12 @@ public class QMFService implements Confi
return 0l;
}
+ public Long getUnackedMessages()
+ {
+ // TODO
+ return 0l;
+ }
+
public Long getTxnStarts()
{
return _obj.getTxnStarts();
@@ -1799,9 +1805,9 @@ public class QMFService implements Confi
return factory.createResponseCommand();
}
- public UUID getId()
+ public UUID getQMFId()
{
- return _obj.getId();
+ return _obj.getQMFId();
}
public long getCreateTime()
@@ -1870,9 +1876,9 @@ public class QMFService implements Confi
return _obj.getDelivered();
}
- public UUID getId()
+ public UUID getQMFId()
{
- return _obj.getId();
+ return _obj.getQMFId();
}
public long getCreateTime()
@@ -1955,14 +1961,20 @@ public class QMFService implements Confi
return _obj.getAckBatching();
}
+ /* support TBD */
+ public String getName()
+ {
+ return null;
+ }
+
public BrokerSchema.BridgeClass.CloseMethodResponseCommand close(final BrokerSchema.BridgeClass.CloseMethodResponseCommandFactory factory)
{
return null;
}
- public UUID getId()
+ public UUID getQMFId()
{
- return _obj.getId();
+ return _obj.getQMFId();
}
public long getCreateTime()
@@ -2020,6 +2032,18 @@ public class QMFService implements Confi
return _obj.getLastError();
}
+ /* support TBD */
+ public String getName()
+ {
+ return null;
+ }
+
+ /* support TBD */
+ public BrokerSchema.ConnectionObject getConnectionRef()
+ {
+ return (BrokerSchema.ConnectionObject) null;
+ }
+
public BrokerSchema.LinkClass.CloseMethodResponseCommand close(final BrokerSchema.LinkClass.CloseMethodResponseCommandFactory factory)
{
_obj.close();
@@ -2042,9 +2066,9 @@ public class QMFService implements Confi
return factory.createResponseCommand();
}
- public UUID getId()
+ public UUID getQMFId()
{
- return _obj.getId();
+ return _obj.getQMFId();
}
public long getCreateTime()
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Fri Aug 3 12:13:32 2012
@@ -23,7 +23,9 @@ package org.apache.qpid.server;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
@@ -32,9 +34,9 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;
-import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Lock;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -89,7 +91,6 @@ import org.apache.qpid.server.subscripti
import org.apache.qpid.server.subscription.RecordDeliveryMethod;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.SubscriptionFactoryImpl;
-import org.apache.qpid.server.subscription.SubscriptionImpl;
import org.apache.qpid.server.txn.AsyncAutoCommitTransaction;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -137,11 +138,9 @@ public class AMQChannel implements Sessi
private final LinkedList<AsyncCommand> _unfinishedCommandsQueue = new LinkedList<AsyncCommand>();
- private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500;
-
private UnacknowledgedMessageMap _unacknowledgedMessageMap = new UnacknowledgedMessageMapImpl(DEFAULT_PREFETCH);
- // Set of messages being acknoweledged in the current transaction
+ // Set of messages being acknowledged in the current transaction
private SortedSet<QueueEntry> _acknowledgedMessages = new TreeSet<QueueEntry>();
private final AtomicBoolean _suspended = new AtomicBoolean(false);
@@ -157,7 +156,7 @@ public class AMQChannel implements Sessi
private final AMQProtocolSession _session;
private AtomicBoolean _closing = new AtomicBoolean(false);
- private final Set<AMQQueue> _blockingQueues = new ConcurrentSkipListSet<AMQQueue>();
+ private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>());
private final AtomicBoolean _blocking = new AtomicBoolean(false);
@@ -170,11 +169,13 @@ public class AMQChannel implements Sessi
private List<QueueEntry> _resendList = new ArrayList<QueueEntry>();
private static final
AMQShortString IMMEDIATE_DELIVERY_REPLY_TEXT = new AMQShortString("Immediate delivery is not possible.");
- private final UUID _id;
+ private final UUID _qmfId;
private long _createTime = System.currentTimeMillis();
private final ClientDeliveryMethod _clientDeliveryMethod;
+ private final TransactionTimeoutHelper _transactionTimeoutHelper;
+
public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore)
throws AMQException
{
@@ -183,7 +184,7 @@ public class AMQChannel implements Sessi
_actor = new AMQPChannelActor(this, session.getLogActor().getRootMessageLogger());
_logSubject = new ChannelLogSubject(this);
- _id = getConfigStore().createId();
+ _qmfId = getConfigStore().createId();
_actor.message(ChannelMessages.CREATE());
getConfigStore().addConfiguredObject(this);
@@ -194,6 +195,8 @@ public class AMQChannel implements Sessi
_transaction = new AsyncAutoCommitTransaction(_messageStore, this);
_clientDeliveryMethod = session.createDeliveryMethod(_channelId);
+
+ _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject);
}
public ConfigStore getConfigStore()
@@ -264,6 +267,11 @@ public class AMQChannel implements Sessi
return _txnCount.get();
}
+ public Long getTxnStart()
+ {
+ return _txnStarts.get();
+ }
+
public int getChannelId()
{
return _channelId;
@@ -441,7 +449,7 @@ public class AMQChannel implements Sessi
* @param acks Are acks enabled for this subscriber
* @param filters Filters to apply to this subscriber
*
- * @param noLocal Flag stopping own messages being receivied.
+ * @param noLocal Flag stopping own messages being received.
* @param exclusive Flag requesting exclusive access to the queue
* @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests
*
@@ -948,9 +956,11 @@ public class AMQChannel implements Sessi
public void commit() throws AMQException
{
- commit(null);
+ commit(null, false);
}
- public void commit(Runnable immediateAction) throws AMQException
+
+
+ public void commit(final Runnable immediateAction, boolean async) throws AMQException
{
if (!isTransactional())
@@ -958,11 +968,29 @@ public class AMQChannel implements Sessi
throw new AMQException("Fatal error: commit called on non-transactional channel");
}
- _transaction.commit(immediateAction);
+ if(async && _transaction instanceof LocalTransaction)
+ {
+
+ ((LocalTransaction)_transaction).commitAsync(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ immediateAction.run();
+ _txnCommits.incrementAndGet();
+ _txnStarts.incrementAndGet();
+ decrementOutstandingTxnsIfNecessary();
+ }
+ });
+ }
+ else
+ {
+ _transaction.commit(immediateAction);
- _txnCommits.incrementAndGet();
- _txnStarts.incrementAndGet();
- decrementOutstandingTxnsIfNecessary();
+ _txnCommits.incrementAndGet();
+ _txnStarts.incrementAndGet();
+ decrementOutstandingTxnsIfNecessary();
+ }
}
public void rollback() throws AMQException
@@ -1357,9 +1385,34 @@ public class AMQChannel implements Sessi
return _actor;
}
- public void block(AMQQueue queue)
+ public synchronized void block()
{
- if(_blockingQueues.add(queue))
+ if(_blockingEntities.add(this))
+ {
+ if(_blocking.compareAndSet(false,true))
+ {
+ _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED("** All Queues **"));
+ flow(false);
+ }
+ }
+ }
+
+ public synchronized void unblock()
+ {
+ if(_blockingEntities.remove(this))
+ {
+ if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false))
+ {
+ _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
+
+ flow(true);
+ }
+ }
+ }
+
+ public synchronized void block(AMQQueue queue)
+ {
+ if(_blockingEntities.add(queue))
{
if(_blocking.compareAndSet(false,true))
@@ -1370,11 +1423,11 @@ public class AMQChannel implements Sessi
}
}
- public void unblock(AMQQueue queue)
+ public synchronized void unblock(AMQQueue queue)
{
- if(_blockingQueues.remove(queue))
+ if(_blockingEntities.remove(queue))
{
- if(_blocking.compareAndSet(true,false) && !isClosing())
+ if(_blockingEntities.isEmpty() && _blocking.compareAndSet(true,false) && !isClosing())
{
_actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
@@ -1393,6 +1446,11 @@ public class AMQChannel implements Sessi
return false;
}
+ public int getUnacknowledgedMessageCount()
+ {
+ return getUnacknowledgedMessageMap().size();
+ }
+
private void flow(boolean flow)
{
MethodRegistry methodRegistry = _session.getMethodRegistry();
@@ -1400,6 +1458,7 @@ public class AMQChannel implements Sessi
_session.writeFrame(responseBody.generateFrame(_channelId));
}
+ @Override
public boolean getBlocking()
{
return _blocking.get();
@@ -1456,9 +1515,10 @@ public class AMQChannel implements Sessi
return false;
}
- public UUID getId()
+ @Override
+ public UUID getQMFId()
{
- return _id;
+ return _qmfId;
}
public String getSessionName()
@@ -1484,30 +1544,42 @@ public class AMQChannel implements Sessi
long openTime = currentTime - _transaction.getTransactionStartTime();
long idleTime = currentTime - _txnUpdateTime.get();
- // Log a warning on idle or open transactions
- if (idleWarn > 0L && idleTime > idleWarn)
- {
- CurrentActor.get().message(_logSubject, ChannelMessages.IDLE_TXN(idleTime));
- _logger.warn("IDLE TRANSACTION ALERT " + _logSubject.toString() + " " + idleTime + " ms");
- }
- else if (openWarn > 0L && openTime > openWarn)
+ _transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime),
+ TransactionTimeoutHelper.IDLE_TRANSACTION_ALERT);
+ if (_transactionTimeoutHelper.isTimedOut(idleTime, idleClose))
{
- CurrentActor.get().message(_logSubject, ChannelMessages.OPEN_TXN(openTime));
- _logger.warn("OPEN TRANSACTION ALERT " + _logSubject.toString() + " " + openTime + " ms");
+ closeConnection("Idle transaction timed out");
+ return;
}
- // Close connection for idle or open transactions that have timed out
- if (idleClose > 0L && idleTime > idleClose)
+ _transactionTimeoutHelper.logIfNecessary(openTime, openWarn, ChannelMessages.OPEN_TXN(openTime),
+ TransactionTimeoutHelper.OPEN_TRANSACTION_ALERT);
+ if (_transactionTimeoutHelper.isTimedOut(openTime, openClose))
{
- getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out");
- }
- else if (openClose > 0L && openTime > openClose)
- {
- getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out");
+ closeConnection("Open transaction timed out");
+ return;
}
}
}
+ /**
+ * Typically called from the HouseKeepingThread instead of the main receiver thread,
+ * therefore uses a lock to close the connection in a thread-safe manner.
+ */
+ private void closeConnection(String reason) throws AMQException
+ {
+ Lock receivedLock = _session.getReceivedLock();
+ receivedLock.lock();
+ try
+ {
+ _session.close(AMQConstant.RESOURCE_ERROR, reason);
+ }
+ finally
+ {
+ receivedLock.unlock();
+ }
+ }
+
public void deadLetter(long deliveryTag) throws AMQException
{
final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
@@ -1563,23 +1635,6 @@ public class AMQChannel implements Sessi
_unfinishedCommandsQueue.add(new AsyncCommand(future, action));
}
- public void completeAsyncCommands()
- {
- AsyncCommand cmd;
- while((cmd = _unfinishedCommandsQueue.peek()) != null && cmd.isReadyForCompletion())
- {
- cmd.complete();
- _unfinishedCommandsQueue.poll();
- }
- while(_unfinishedCommandsQueue.size() > UNFINISHED_COMMAND_QUEUE_THRESHOLD)
- {
- cmd = _unfinishedCommandsQueue.poll();
- cmd.awaitReadyForCompletion();
- cmd.complete();
- }
- }
-
-
public void sync()
{
AsyncCommand cmd;
@@ -1588,6 +1643,10 @@ public class AMQChannel implements Sessi
cmd.awaitReadyForCompletion();
cmd.complete();
}
+ if(_transaction instanceof LocalTransaction)
+ {
+ ((LocalTransaction)_transaction).sync();
+ }
}
private static class AsyncCommand
@@ -1624,6 +1683,12 @@ public class AMQChannel implements Sessi
public int compareTo(AMQSessionModel session)
{
- return getId().compareTo(session.getId());
+ return getQMFId().compareTo(session.getQMFId());
+ }
+
+ @Override
+ public int getConsumerCount()
+ {
+ return _tag2SubscriptionMap.size();
}
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/Broker.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/Broker.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/Broker.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/Broker.java Fri Aug 3 12:13:32 2012
@@ -20,19 +20,22 @@
*/
package org.apache.qpid.server;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.*;
+import javax.net.ssl.SSLContext;
import org.apache.log4j.Logger;
import org.apache.log4j.PropertyConfigurator;
-import org.apache.log4j.xml.QpidLog4JConfigurator;
-
import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.configuration.ServerNetworkTransportConfiguration;
-import org.apache.qpid.server.configuration.management.ConfigurationManagementMBean;
-import org.apache.qpid.server.information.management.ServerInformationMBean;
import org.apache.qpid.server.logging.SystemOutMessageLogger;
import org.apache.qpid.server.logging.actors.BrokerActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
-import org.apache.qpid.server.logging.management.LoggingManagementMBean;
+import org.apache.qpid.server.logging.log4j.LoggingFacade;
import org.apache.qpid.server.logging.messages.BrokerMessages;
import org.apache.qpid.server.protocol.AmqpProtocolVersion;
import org.apache.qpid.server.protocol.MultiVersionProtocolEngineFactory;
@@ -46,30 +49,10 @@ import org.apache.qpid.transport.network
import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
-import javax.net.ssl.SSLContext;
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
-import java.util.EnumSet;
-import java.util.Formatter;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-import java.util.logging.ConsoleHandler;
-import java.util.logging.FileHandler;
-import java.util.logging.Handler;
-import java.util.logging.Level;
-import java.util.logging.LogRecord;
-
public class Broker
{
private static final Logger LOGGER = Logger.getLogger(Broker.class);
- private static final int IPV4_ADDRESS_LENGTH = 4;
- private static final char IPV4_LITERAL_SEPARATOR = '.';
private volatile Thread _shutdownHookThread;
protected static class InitException extends RuntimeException
@@ -128,6 +111,14 @@ public class Broker
ConfigurationFileApplicationRegistry config = new ConfigurationFileApplicationRegistry(configFile, options.getBundleContext());
ServerConfiguration serverConfig = config.getConfiguration();
+ if (options.getQpidWork() != null)
+ {
+ serverConfig.setQpidWork(options.getQpidWork());
+ }
+ if (options.getQpidHome() != null)
+ {
+ serverConfig.setQpidHome(options.getQpidHome());
+ }
updateManagementPorts(serverConfig, options.getJmxPortRegistryServer(), options.getJmxPortConnectorServer());
ApplicationRegistry.initialise(config);
@@ -145,14 +136,6 @@ public class Broker
try
{
- configureLoggingManagementMBean(logConfigFile, options.getLogWatchFrequency());
-
- ConfigurationManagementMBean configMBean = new ConfigurationManagementMBean();
- configMBean.register();
-
- ServerInformationMBean sysInfoMBean = new ServerInformationMBean(config);
- sysInfoMBean.register();
-
Set<Integer> ports = new HashSet<Integer>(options.getPorts());
if(ports.isEmpty())
{
@@ -165,36 +148,71 @@ public class Broker
parsePortList(sslPorts, serverConfig.getSSLPorts());
}
+ //1-0 excludes and includes
Set<Integer> exclude_1_0 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v1_0));
if(exclude_1_0.isEmpty())
{
parsePortList(exclude_1_0, serverConfig.getPortExclude10());
}
+ Set<Integer> include_1_0 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v1_0));
+ if(include_1_0.isEmpty())
+ {
+ parsePortList(include_1_0, serverConfig.getPortInclude10());
+ }
+
+ //0-10 excludes and includes
Set<Integer> exclude_0_10 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_10));
if(exclude_0_10.isEmpty())
{
parsePortList(exclude_0_10, serverConfig.getPortExclude010());
}
+ Set<Integer> include_0_10 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_10));
+ if(include_0_10.isEmpty())
+ {
+ parsePortList(include_0_10, serverConfig.getPortInclude010());
+ }
+
+ //0-9-1 excludes and includes
Set<Integer> exclude_0_9_1 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_9_1));
if(exclude_0_9_1.isEmpty())
{
parsePortList(exclude_0_9_1, serverConfig.getPortExclude091());
}
+ Set<Integer> include_0_9_1 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_9_1));
+ if(include_0_9_1.isEmpty())
+ {
+ parsePortList(include_0_9_1, serverConfig.getPortInclude091());
+ }
+
+ //0-9 excludes and includes
Set<Integer> exclude_0_9 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_9));
if(exclude_0_9.isEmpty())
{
parsePortList(exclude_0_9, serverConfig.getPortExclude09());
}
+ Set<Integer> include_0_9 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_9));
+ if(include_0_9.isEmpty())
+ {
+ parsePortList(include_0_9, serverConfig.getPortInclude09());
+ }
+
+ //0-8 excludes and includes
Set<Integer> exclude_0_8 = new HashSet<Integer>(options.getExcludedPorts(ProtocolExclusion.v0_8));
if(exclude_0_8.isEmpty())
{
parsePortList(exclude_0_8, serverConfig.getPortExclude08());
}
+ Set<Integer> include_0_8 = new HashSet<Integer>(options.getIncludedPorts(ProtocolInclusion.v0_8));
+ if(include_0_8.isEmpty())
+ {
+ parsePortList(include_0_8, serverConfig.getPortInclude08());
+ }
+
String bindAddr = options.getBind();
if (bindAddr == null)
{
@@ -220,8 +238,8 @@ public class Broker
final InetSocketAddress inetSocketAddress = new InetSocketAddress(bindAddress, port);
final Set<AmqpProtocolVersion> supported =
- getSupportedVersions(port, exclude_1_0, exclude_0_10, exclude_0_9_1, exclude_0_9,
- exclude_0_8, serverConfig);
+ getSupportedVersions(port, exclude_1_0, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8,
+ include_1_0, include_0_10, include_0_9_1, include_0_9, include_0_8,serverConfig);
final NetworkTransportConfiguration settings =
new ServerNetworkTransportConfiguration(serverConfig, inetSocketAddress, Transport.TCP);
@@ -233,7 +251,7 @@ public class Broker
transport.accept(settings, protocolEngineFactory, null);
ApplicationRegistry.getInstance().addAcceptor(inetSocketAddress,
- new QpidAcceptor(transport,"TCP"));
+ new QpidAcceptor(transport,QpidAcceptor.Transport.TCP, supported));
CurrentActor.get().message(BrokerMessages.LISTENING("TCP", port));
}
}
@@ -242,16 +260,31 @@ public class Broker
{
final String keystorePath = serverConfig.getConnectorKeyStorePath();
final String keystorePassword = serverConfig.getConnectorKeyStorePassword();
+ final String keystoreType = serverConfig.getConnectorKeyStoreType();
final String keyManagerFactoryAlgorithm = serverConfig.getConnectorKeyManagerFactoryAlgorithm();
- final SSLContext sslContext = SSLContextFactory.buildServerContext(keystorePath, keystorePassword, keyManagerFactoryAlgorithm);
+ final SSLContext sslContext;
+ if(serverConfig.getConnectorTrustStorePath()!=null)
+ {
+ sslContext = SSLContextFactory.buildClientContext(serverConfig.getConnectorTrustStorePath(),
+ serverConfig.getConnectorTrustStorePassword(),
+ serverConfig.getConnectorTrustStoreType(),
+ serverConfig.getConnectorTrustManagerFactoryAlgorithm(),
+ keystorePath,
+ keystorePassword, keystoreType, keyManagerFactoryAlgorithm,
+ serverConfig.getCertAlias());
+ }
+ else
+ {
+ sslContext = SSLContextFactory.buildServerContext(keystorePath, keystorePassword, keystoreType, keyManagerFactoryAlgorithm);
+ }
for(int sslPort : sslPorts)
{
final InetSocketAddress inetSocketAddress = new InetSocketAddress(bindAddress, sslPort);
final Set<AmqpProtocolVersion> supported =
- getSupportedVersions(sslPort, exclude_1_0, exclude_0_10, exclude_0_9_1,
- exclude_0_9, exclude_0_8, serverConfig);
+ getSupportedVersions(sslPort, exclude_1_0, exclude_0_10, exclude_0_9_1, exclude_0_9, exclude_0_8,
+ include_1_0, include_0_10, include_0_9_1, include_0_9, include_0_8, serverConfig);
final NetworkTransportConfiguration settings =
new ServerNetworkTransportConfiguration(serverConfig, inetSocketAddress, Transport.TCP);
@@ -262,7 +295,7 @@ public class Broker
transport.accept(settings, protocolEngineFactory, sslContext);
ApplicationRegistry.getInstance().addAcceptor(inetSocketAddress,
- new QpidAcceptor(transport,"TCP"));
+ new QpidAcceptor(transport,QpidAcceptor.Transport.SSL, supported));
CurrentActor.get().message(BrokerMessages.LISTENING("TCP/SSL", sslPort));
}
}
@@ -282,27 +315,36 @@ public class Broker
final Set<Integer> exclude_0_9_1,
final Set<Integer> exclude_0_9,
final Set<Integer> exclude_0_8,
+ final Set<Integer> include_1_0,
+ final Set<Integer> include_0_10,
+ final Set<Integer> include_0_9_1,
+ final Set<Integer> include_0_9,
+ final Set<Integer> include_0_8,
final ServerConfiguration serverConfig)
{
final EnumSet<AmqpProtocolVersion> supported = EnumSet.allOf(AmqpProtocolVersion.class);
- if(exclude_1_0.contains(port) || !serverConfig.isAmqp10enabled())
+ if((exclude_1_0.contains(port) || !serverConfig.isAmqp10enabled()) && !include_1_0.contains(port))
{
supported.remove(AmqpProtocolVersion.v1_0_0);
}
- if(exclude_0_10.contains(port) || !serverConfig.isAmqp010enabled())
+
+ if((exclude_0_10.contains(port) || !serverConfig.isAmqp010enabled()) && !include_0_10.contains(port))
{
supported.remove(AmqpProtocolVersion.v0_10);
}
- if(exclude_0_9_1.contains(port) || !serverConfig.isAmqp091enabled())
+
+ if((exclude_0_9_1.contains(port) || !serverConfig.isAmqp091enabled()) && !include_0_9_1.contains(port))
{
supported.remove(AmqpProtocolVersion.v0_9_1);
}
- if(exclude_0_9.contains(port) || !serverConfig.isAmqp09enabled())
+
+ if((exclude_0_9.contains(port) || !serverConfig.isAmqp09enabled()) && !include_0_9.contains(port))
{
supported.remove(AmqpProtocolVersion.v0_9);
}
- if(exclude_0_8.contains(port) || !serverConfig.isAmqp08enabled())
+
+ if((exclude_0_8.contains(port) || !serverConfig.isAmqp08enabled()) && !include_0_8.contains(port))
{
supported.remove(AmqpProtocolVersion.v0_8);
}
@@ -388,7 +430,7 @@ public class Broker
}
}
- private void configureLogging(File logConfigFile, long logWatchTime) throws InitException, IOException
+ private void configureLogging(File logConfigFile, int logWatchTime) throws InitException, IOException
{
if (logConfigFile.exists() && logConfigFile.canRead())
{
@@ -401,7 +443,7 @@ public class Broker
// log4j expects the watch interval in milliseconds
try
{
- QpidLog4JConfigurator.configureAndWatch(logConfigFile.getPath(), logWatchTime * 1000);
+ LoggingFacade.configureAndWatch(logConfigFile.getPath(), logWatchTime * 1000);
}
catch (Exception e)
{
@@ -412,7 +454,7 @@ public class Broker
{
try
{
- QpidLog4JConfigurator.configure(logConfigFile.getPath());
+ LoggingFacade.configure(logConfigFile.getPath());
}
catch (Exception e)
{
@@ -446,12 +488,6 @@ public class Broker
}
}
- private void configureLoggingManagementMBean(File logConfigFile, int logWatchTime) throws Exception
- {
- LoggingManagementMBean blm = new LoggingManagementMBean(logConfigFile.getPath(),logWatchTime);
-
- blm.register();
- }
private void addShutdownHook()
{
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/BrokerOptions.java Fri Aug 3 12:13:32 2012
@@ -33,10 +33,12 @@ public class BrokerOptions
public static final String DEFAULT_CONFIG_FILE = "etc/config.xml";
public static final String DEFAULT_LOG_CONFIG_FILE = "etc/log4j.xml";
public static final String QPID_HOME = "QPID_HOME";
+ public static final String QPID_WORK = "QPID_WORK";
private final Set<Integer> _ports = new HashSet<Integer>();
private final Set<Integer> _sslPorts = new HashSet<Integer>();
private final Map<ProtocolExclusion,Set<Integer>> _exclusionMap = new HashMap<ProtocolExclusion, Set<Integer>>();
+ private final Map<ProtocolInclusion,Set<Integer>> _inclusionMap = new HashMap<ProtocolInclusion, Set<Integer>>();
private String _configFile;
private String _logConfigFile;
@@ -46,6 +48,8 @@ public class BrokerOptions
private BundleContext _bundleContext;
private Integer _logWatchFrequency = 0;
+ private String _qpidWorkFolder;
+ private String _qpidHomeFolder;
public void addPort(final int port)
{
@@ -108,7 +112,7 @@ public class BrokerOptions
}
public String getQpidHome()
{
- return System.getProperty(QPID_HOME);
+ return _qpidHomeFolder == null? System.getProperty(QPID_HOME): _qpidHomeFolder;
}
public Set<Integer> getExcludedPorts(final ProtocolExclusion excludeProtocol)
@@ -161,4 +165,36 @@ public class BrokerOptions
{
_bundleContext = bundleContext;
}
+
+ public Set<Integer> getIncludedPorts(final ProtocolInclusion includeProtocol)
+ {
+ final Set<Integer> includedPorts = _inclusionMap.get(includeProtocol);
+ return includedPorts == null ? Collections.<Integer>emptySet() : includedPorts;
+ }
+
+ public void addIncludedPort(final ProtocolInclusion includeProtocol, final int port)
+ {
+ if (!_inclusionMap.containsKey(includeProtocol))
+ {
+ _inclusionMap.put(includeProtocol, new HashSet<Integer>());
+ }
+
+ Set<Integer> ports = _inclusionMap.get(includeProtocol);
+ ports.add(port);
+ }
+
+ public String getQpidWork()
+ {
+ return _qpidWorkFolder;
+ }
+
+ public void setQpidWork(String qpidWorkFolder)
+ {
+ _qpidWorkFolder = qpidWorkFolder;
+ }
+
+ public void setQpidHome(String qpidHomeFolder)
+ {
+ _qpidHomeFolder = qpidHomeFolder;
+ }
}
\ No newline at end of file
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/Main.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/Main.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/Main.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/Main.java Fri Aug 3 12:13:32 2012
@@ -85,6 +85,32 @@ public class Main
.withDescription("when listening on the specified port do not accept AMQP0-8 connections. The specified port must be one specified on the command line")
.withLongOpt("exclude-0-8").create();
+ private static final Option OPTION_INCLUDE_1_0 =
+ OptionBuilder.withArgName("port").hasArg()
+ .withDescription("accept AMQP1-0 connections on this port, overriding configuration to the contrary. The specified port must be one specified on the command line")
+ .withLongOpt("include-1-0").create();
+
+private static final Option OPTION_INCLUDE_0_10 =
+ OptionBuilder.withArgName("port").hasArg()
+ .withDescription("accept AMQP0-10 connections on this port, overriding configuration to the contrary. The specified port must be one specified on the command line")
+ .withLongOpt("include-0-10").create();
+
+private static final Option OPTION_INCLUDE_0_9_1 =
+ OptionBuilder.withArgName("port").hasArg()
+ .withDescription("accept AMQP0-9-1 connections on this port, overriding configuration to the contrary. The specified port must be one specified on the command line")
+ .withLongOpt("include-0-9-1").create();
+
+private static final Option OPTION_INCLUDE_0_9 =
+ OptionBuilder.withArgName("port").hasArg()
+ .withDescription("accept AMQP0-9 connections on this port, overriding configuration to the contrary. The specified port must be one specified on the command line")
+ .withLongOpt("include-0-9").create();
+
+private static final Option OPTION_INCLUDE_0_8 =
+ OptionBuilder.withArgName("port").hasArg()
+ .withDescription("accept AMQP0-8 connections on this port, overriding configuration to the contrary. The specified port must be one specified on the command line")
+ .withLongOpt("include-0-8").create();
+
+
private static final Option OPTION_JMX_PORT_REGISTRY_SERVER =
OptionBuilder.withArgName("port").hasArg()
.withDescription("listen on the specified management (registry server) port. Overrides any value in the config file")
@@ -127,6 +153,11 @@ public class Main
OPTIONS.addOption(OPTION_EXCLUDE_0_9_1);
OPTIONS.addOption(OPTION_EXCLUDE_0_9);
OPTIONS.addOption(OPTION_EXCLUDE_0_8);
+ OPTIONS.addOption(OPTION_INCLUDE_1_0);
+ OPTIONS.addOption(OPTION_INCLUDE_0_10);
+ OPTIONS.addOption(OPTION_INCLUDE_0_9_1);
+ OPTIONS.addOption(OPTION_INCLUDE_0_9);
+ OPTIONS.addOption(OPTION_INCLUDE_0_8);
OPTIONS.addOption(OPTION_BIND);
OPTIONS.addOption(OPTION_JMX_PORT_REGISTRY_SERVER);
@@ -256,6 +287,10 @@ public class Main
{
parsePortArray(options, _commandLine.getOptionValues(pe.getExcludeName()), pe);
}
+ for(ProtocolInclusion pe : ProtocolInclusion.values())
+ {
+ parseProtocolInclusions(options, _commandLine.getOptionValues(pe.getIncludeName()), pe);
+ }
}
String[] sslPortStr = _commandLine.getOptionValues(OPTION_SSLPORT.getOpt());
@@ -266,6 +301,10 @@ public class Main
{
parsePortArray(options, _commandLine.getOptionValues(pe.getExcludeName()), pe);
}
+ for(ProtocolInclusion pe : ProtocolInclusion.values())
+ {
+ parseProtocolInclusions(options, _commandLine.getOptionValues(pe.getIncludeName()), pe);
+ }
}
setExceptionHandler();
@@ -399,4 +438,23 @@ public class Main
}
}
}
+
+ private static void parseProtocolInclusions(final BrokerOptions options, final Object[] ports,
+ final ProtocolInclusion includedProtocol) throws InitException
+ {
+ if(ports != null)
+ {
+ for(int i = 0; i < ports.length; i++)
+ {
+ try
+ {
+ options.addIncludedPort(includedProtocol, Integer.parseInt(String.valueOf(ports[i])));
+ }
+ catch (NumberFormatException e)
+ {
+ throw new InitException("Invalid port for inclusion: " + ports[i], e);
+ }
+ }
+ }
+ }
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java Fri Aug 3 12:13:32 2012
@@ -35,11 +35,13 @@ public class Binding
private final Exchange _exchange;
private final Map<String, Object> _arguments;
private final UUID _id;
+ private final UUID _qmfId;
private final AtomicLong _matches = new AtomicLong();
- public Binding(UUID id, final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> arguments)
+ public Binding(UUID id, UUID qmfId, final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> arguments)
{
_id = id;
+ _qmfId = qmfId;
_bindingKey = bindingKey;
_queue = queue;
_exchange = exchange;
@@ -51,6 +53,11 @@ public class Binding
return _id;
}
+ public UUID getQMFId()
+ {
+ return _qmfId;
+ }
+
public String getBindingKey()
{
return _bindingKey;
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java Fri Aug 3 12:13:32 2012
@@ -60,7 +60,7 @@ public class BindingFactory
private BindingImpl(UUID id, String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> arguments)
{
- super(id, bindingKey, queue, exchange, arguments);
+ super(id, queue.getVirtualHost().getConfigStore().createId(), bindingKey, queue, exchange, arguments);
_logSubject = new BindingLogSubject(bindingKey,exchange,queue);
}
@@ -166,7 +166,7 @@ public class BindingFactory
if (id == null)
{
- id = UUIDGenerator.generateUUID();
+ id = UUIDGenerator.generateBindingUUID(exchange.getName(), queue.getName(), bindingKey, _virtualHost.getName());
}
BindingImpl b = new BindingImpl(id, bindingKey, queue, exchange, arguments);
BindingImpl existingMapping = _bindings.putIfAbsent(b, b);
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfigStore.java Fri Aug 3 12:13:32 2012
@@ -101,7 +101,7 @@ public class ConfigStore
}
- typeMap.put(object.getId(), object);
+ typeMap.put(object.getQMFId(), object);
sendEvent(Event.CREATED, object);
}
@@ -111,7 +111,7 @@ public class ConfigStore
ConcurrentHashMap typeMap = _typeMap.get(object.getConfigType());
if(typeMap != null)
{
- typeMap.remove(object.getId());
+ typeMap.remove(object.getQMFId());
sendEvent(Event.DELETED, object);
}
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfiguredObject.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfiguredObject.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfiguredObject.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ConfiguredObject.java Fri Aug 3 12:13:32 2012
@@ -25,7 +25,7 @@ import java.util.UUID;
public interface ConfiguredObject<T extends ConfigObjectType<T,C>, C extends ConfiguredObject<T, C>>
{
- public UUID getId();
+ public UUID getQMFId();
public T getConfigType();
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ExchangeConfig.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ExchangeConfig.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ExchangeConfig.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ExchangeConfig.java Fri Aug 3 12:13:32 2012
@@ -49,7 +49,12 @@ public interface ExchangeConfig extends
long getMsgRoutes();
+ long getMsgDrops();
+
long getByteReceives();
long getByteRoutes();
+
+ long getByteDrops();
+
}
\ No newline at end of file
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/LinkConfigType.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/LinkConfigType.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/LinkConfigType.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/LinkConfigType.java Fri Aug 3 12:13:32 2012
@@ -76,7 +76,7 @@ public final class LinkConfigType extend
}
};
- public static final LinkReadOnlyProperty<Integer> PORT_PROPERTY = new LinkReadOnlyProperty<Integer>("host")
+ public static final LinkReadOnlyProperty<Integer> PORT_PROPERTY = new LinkReadOnlyProperty<Integer>("port")
{
public Integer getValue(LinkConfig object)
{
@@ -134,4 +134,4 @@ public final class LinkConfigType extend
-}
\ No newline at end of file
+}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/QueueConfiguration.java Fri Aug 3 12:13:32 2012
@@ -126,6 +126,11 @@ public class QueueConfiguration extends
return _name;
}
+ public String getDescription()
+ {
+ return getStringValue("description");
+ }
+
public int getMaximumMessageAge()
{
return getIntValue("maximumMessageAge", _vHostConfig.getMaximumMessageAge());
@@ -226,4 +231,5 @@ public class QueueConfiguration extends
}
+
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerConfiguration.java Fri Aug 3 12:13:32 2012
@@ -20,6 +20,16 @@
package org.apache.qpid.server.configuration;
+import java.io.File;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Map.Entry;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManagerFactory;
import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationException;
@@ -28,7 +38,6 @@ import org.apache.commons.configuration.
import org.apache.commons.configuration.SystemConfiguration;
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.log4j.Logger;
-
import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
import org.apache.qpid.server.exchange.DefaultExchangeFactory;
import org.apache.qpid.server.protocol.AmqpProtocolVersion;
@@ -40,17 +49,6 @@ import org.apache.qpid.server.virtualhos
import static org.apache.qpid.transport.ConnectionSettings.WILDCARD_ADDRESS;
-import java.io.File;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import javax.net.ssl.KeyManagerFactory;
-
public class ServerConfiguration extends ConfigurationPlugin
{
protected static final Logger _logger = Logger.getLogger(ServerConfiguration.class);
@@ -66,6 +64,9 @@ public class ServerConfiguration extends
public static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L;
public static final int DEFAULT_JMXPORT_REGISTRYSERVER = 8999;
public static final int JMXPORT_CONNECTORSERVER_OFFSET = 100;
+ public static final int DEFAULT_HTTP_MANAGEMENT_PORT = 8080;
+ public static final int DEFAULT_HTTPS_MANAGEMENT_PORT = 8443;
+ public static final long DEFAULT_MINIMUM_ALERT_REPEAT_GAP = 30000l;
public static final String QPID_HOME = "QPID_HOME";
public static final String QPID_WORK = "QPID_WORK";
@@ -77,6 +78,8 @@ public class ServerConfiguration extends
private File _configFile;
private File _vhostsFile;
+ private String _qpidWork;
+ private String _qpidHome;
// Map of environment variables to config items
private static final Map<String, String> envVarMap = new HashMap<String, String>();
@@ -86,6 +89,9 @@ public class ServerConfiguration extends
public static final String MGMT_CUSTOM_REGISTRY_SOCKET = "management.custom-registry-socket";
public static final String MGMT_JMXPORT_REGISTRYSERVER = "management.jmxport.registryServer";
public static final String MGMT_JMXPORT_CONNECTORSERVER = "management.jmxport.connectorServer";
+ public static final String SECURITY_DEFAULT_AUTH_MANAGER = "security.default-auth-manager";
+ public static final String SECURITY_PORT_MAPPINGS_PORT_MAPPING_AUTH_MANAGER = "security.port-mappings.port-mapping.auth-manager";
+ public static final String SECURITY_PORT_MAPPINGS_PORT_MAPPING_PORT = "security.port-mappings.port-mapping.port";
public static final String STATUS_UPDATES = "status-updates";
public static final String ADVANCED_LOCALE = "advanced.locale";
public static final String CONNECTOR_AMQP10ENABLED = "connector.amqp10enabled";
@@ -94,6 +100,11 @@ public class ServerConfiguration extends
public static final String CONNECTOR_AMQP09ENABLED = "connector.amqp09enabled";
public static final String CONNECTOR_AMQP08ENABLED = "connector.amqp08enabled";
public static final String CONNECTOR_AMQP_SUPPORTED_REPLY = "connector.amqpDefaultSupportedProtocolReply";
+ public static final String CONNECTOR_INCLUDE_10 = "connector.include10";
+ public static final String CONNECTOR_INCLUDE_010 = "connector.include010";
+ public static final String CONNECTOR_INCLUDE_091 = "connector.include091";
+ public static final String CONNECTOR_INCLUDE_09 = "connector.include09";
+ public static final String CONNECTOR_INCLUDE_08 = "connector.include08";
{
envVarMap.put("QPID_PORT", "connector.port");
@@ -104,6 +115,8 @@ public class ServerConfiguration extends
envVarMap.put("QPID_MSGAUTH", "security.msg-auth");
envVarMap.put("QPID_AUTOREGISTER", "auto_register");
envVarMap.put("QPID_MANAGEMENTENABLED", "management.enabled");
+ envVarMap.put("QPID_HTTPMANAGEMENTENABLED", "management.http.enabled");
+ envVarMap.put("QPID_HTTPMANAGEMENTPORT", "management.http.port");
envVarMap.put("QPID_HEARTBEATDELAY", "heartbeat.delay");
envVarMap.put("QPID_HEARTBEATTIMEOUTFACTOR", "heartbeat.timeoutFactor");
envVarMap.put("QPID_MAXIMUMMESSAGEAGE", "maximumMessageAge");
@@ -177,7 +190,7 @@ public class ServerConfiguration extends
* This has been made a two step process to allow the Plugin Manager and
* Configuration Manager to be initialised in the Application Registry.
* <p>
- * If using this ServerConfiguration via an ApplicationRegistry there is no
+ * If using this ServerConfiguration via an ApplicationRegistry there is no
* need to explicitly call {@link #initialise()} as this is done via the
* {@link ApplicationRegistry#initialise()} method.
*
@@ -199,12 +212,12 @@ public class ServerConfiguration extends
* Called by {@link ApplicationRegistry#initialise()}.
* <p>
* NOTE: A DEFAULT ApplicationRegistry must exist when using this method
- * or a new ApplicationRegistry will be created.
+ * or a new ApplicationRegistry will be created.
*
* @throws ConfigurationException
*/
public void initialise() throws ConfigurationException
- {
+ {
setConfiguration("", getConfig());
setupVirtualHosts(getConfig());
}
@@ -219,10 +232,10 @@ public class ServerConfiguration extends
{
// Support for security.jmx.access was removed when JMX access rights were incorporated into the main ACL.
// This ensure that users remove the element from their configuration file.
-
+
if (getListValue("security.jmx.access").size() > 0)
{
- String message = "Validation error : security/jmx/access is no longer a supported element within the configuration xml."
+ String message = "Validation error : security/jmx/access is no longer a supported element within the configuration xml."
+ (_configFile == null ? "" : " Configuration file : " + _configFile);
throw new ConfigurationException(message);
}
@@ -236,7 +249,7 @@ public class ServerConfiguration extends
if (getListValue("security.principal-databases.principal-database(0).class").size() > 0)
{
- String message = "Validation error : security/principal-databases is no longer supported within the configuration xml."
+ String message = "Validation error : security/principal-databases is no longer supported within the configuration xml."
+ (_configFile == null ? "" : " Configuration file : " + _configFile);
throw new ConfigurationException(message);
}
@@ -249,6 +262,13 @@ public class ServerConfiguration extends
throw new ConfigurationException(message);
}
+ String[] ports = getConfig().getStringArray(SECURITY_PORT_MAPPINGS_PORT_MAPPING_PORT);
+ String[] authManagers = getConfig().getStringArray(SECURITY_PORT_MAPPINGS_PORT_MAPPING_AUTH_MANAGER);
+ if (ports.length != authManagers.length)
+ {
+ throw new ConfigurationException("Validation error: Each port-mapping must have exactly one port and exactly one auth-manager.");
+ }
+
// QPID-3517: Inconsistency in capitalisation in the SSL configuration keys used within the connector and management configuration
// sections. For the moment, continue to understand both but generate a deprecated warning if the less preferred keystore is used.
for (String key : new String[] {"management.ssl.keystorePath",
@@ -280,7 +300,7 @@ public class ServerConfiguration extends
@SuppressWarnings("unchecked")
protected void setupVirtualHosts(Configuration conf) throws ConfigurationException
{
- List<String> vhostFiles = conf.getList("virtualhosts");
+ List<String> vhostFiles = (List) conf.getList("virtualhosts");
Configuration vhostConfig = conf.subset("virtualhosts");
// Only one configuration mechanism allowed
@@ -470,7 +490,7 @@ public class ServerConfiguration extends
Configuration newConfig = parseConfig(_configFile);
setConfiguration("", newConfig);
ApplicationRegistry.getInstance().getSecurityManager().configureHostPlugins(this);
-
+
// Reload virtualhosts from correct location
Configuration newVhosts;
if (_vhostsFile == null)
@@ -495,15 +515,29 @@ public class ServerConfiguration extends
_logger.warn(SECURITY_CONFIG_RELOADED);
}
}
-
+
public String getQpidWork()
{
- return System.getProperty(QPID_WORK, System.getProperty("java.io.tmpdir"));
+ if ( _qpidWork == null )
+ {
+ return System.getProperty(QPID_WORK, System.getProperty("java.io.tmpdir"));
+ }
+ else
+ {
+ return _qpidWork;
+ }
}
-
+
public String getQpidHome()
{
- return System.getProperty(QPID_HOME);
+ if ( _qpidHome == null )
+ {
+ return System.getProperty(QPID_HOME);
+ }
+ else
+ {
+ return _qpidHome;
+ }
}
public void setJMXPortRegistryServer(int registryServerPort)
@@ -541,16 +575,36 @@ public class ServerConfiguration extends
return getBooleanValue("management.platform-mbeanserver", true);
}
+ public boolean getHTTPManagementEnabled()
+ {
+ return getBooleanValue("management.http.enabled", true);
+ }
+
+ public int getHTTPManagementPort()
+ {
+ return getIntValue("management.http.port", DEFAULT_HTTP_MANAGEMENT_PORT);
+ }
+
+ public boolean getHTTPSManagementEnabled()
+ {
+ return getBooleanValue("management.https.enabled", false);
+ }
+
+ public int getHTTPSManagementPort()
+ {
+ return getIntValue("management.https.port", DEFAULT_HTTPS_MANAGEMENT_PORT);
+ }
+
public String[] getVirtualHosts()
{
return _virtualHosts.keySet().toArray(new String[_virtualHosts.size()]);
}
-
+
public String getPluginDirectory()
{
return getStringValue("plugin-directory");
}
-
+
public String getCacheDirectory()
{
return getStringValue("cache-directory");
@@ -581,6 +635,26 @@ public class ServerConfiguration extends
return getBooleanValue("security.msg-auth");
}
+ public String getDefaultAuthenticationManager()
+ {
+ return getStringValue(SECURITY_DEFAULT_AUTH_MANAGER);
+ }
+
+ public Map<Integer, String> getPortAuthenticationMappings()
+ {
+ String[] ports = getConfig().getStringArray(SECURITY_PORT_MAPPINGS_PORT_MAPPING_PORT);
+ String[] authManagers = getConfig().getStringArray(SECURITY_PORT_MAPPINGS_PORT_MAPPING_AUTH_MANAGER);
+
+ Map<Integer,String> portMappings = new HashMap<Integer, String>();
+ for(int i = 0; i < ports.length; i++)
+ {
+ portMappings.put(Integer.valueOf(ports[i]), authManagers[i]);
+ }
+
+ return portMappings;
+ }
+
+
public String getManagementKeyStorePath()
{
final String fallback = getStringValue("management.ssl.keystorePath");
@@ -589,7 +663,7 @@ public class ServerConfiguration extends
public boolean getManagementSSLEnabled()
{
- return getBooleanValue("management.ssl.enabled", true);
+ return getBooleanValue("management.ssl.enabled", false);
}
public String getManagementKeyStorePassword()
@@ -603,16 +677,11 @@ public class ServerConfiguration extends
return getBooleanValue("queue.auto_register", true);
}
- public boolean getManagementEnabled()
+ public boolean getJMXManagementEnabled()
{
return getBooleanValue("management.enabled", true);
}
- public void setManagementEnabled(boolean enabled)
- {
- getConfig().setProperty("management.enabled", enabled);
- }
-
public int getHeartBeatDelay()
{
return getIntValue("heartbeat.delay", 5);
@@ -645,7 +714,7 @@ public class ServerConfiguration extends
public long getMinimumAlertRepeatGap()
{
- return getLongValue("minimumAlertRepeatGap");
+ return getLongValue("minimumAlertRepeatGap", DEFAULT_MINIMUM_ALERT_REPEAT_GAP);
}
public long getCapacity()
@@ -693,6 +762,31 @@ public class ServerConfiguration extends
return getListValue("connector.non08port");
}
+ public List getPortInclude08()
+ {
+ return getListValue(CONNECTOR_INCLUDE_08);
+ }
+
+ public List getPortInclude09()
+ {
+ return getListValue(CONNECTOR_INCLUDE_09);
+ }
+
+ public List getPortInclude091()
+ {
+ return getListValue(CONNECTOR_INCLUDE_091);
+ }
+
+ public List getPortInclude010()
+ {
+ return getListValue(CONNECTOR_INCLUDE_010);
+ }
+
+ public List getPortInclude10()
+ {
+ return getListValue(CONNECTOR_INCLUDE_10);
+ }
+
public String getBind()
{
return getStringValue("connector.bind", WILDCARD_ADDRESS);
@@ -740,6 +834,11 @@ public class ServerConfiguration extends
return getStringValue("connector.ssl.keyStorePassword", fallback);
}
+ public String getConnectorKeyStoreType()
+ {
+ return getStringValue("connector.ssl.keyStoreType", "JKS");
+ }
+
public String getConnectorKeyManagerFactoryAlgorithm()
{
final String systemFallback = KeyManagerFactory.getDefaultAlgorithm();
@@ -748,6 +847,41 @@ public class ServerConfiguration extends
return getStringValue("connector.ssl.keyManagerFactoryAlgorithm", fallback);
}
+ public String getConnectorTrustStorePath()
+ {
+ return getStringValue("connector.ssl.trustStorePath", null);
+ }
+
+ public String getConnectorTrustStorePassword()
+ {
+ return getStringValue("connector.ssl.trustStorePassword", null);
+ }
+
+ public String getConnectorTrustStoreType()
+ {
+ return getStringValue("connector.ssl.trustStoreType", "JKS");
+ }
+
+ public String getConnectorTrustManagerFactoryAlgorithm()
+ {
+ return getStringValue("connector.ssl.trustManagerFactoryAlgorithm", TrustManagerFactory.getDefaultAlgorithm());
+ }
+
+ public String getCertAlias()
+ {
+ return getStringValue("connector.ssl.certAlias", null);
+ }
+
+ public boolean needClientAuth()
+ {
+ return getConfig().getBoolean("connector.ssl.needClientAuth", false);
+ }
+
+ public boolean wantClientAuth()
+ {
+ return getConfig().getBoolean("connector.ssl.wantClientAuth", false);
+ }
+
public String getDefaultVirtualHost()
{
return getStringValue("virtualhosts.default");
@@ -756,7 +890,7 @@ public class ServerConfiguration extends
public void setDefaultVirtualHost(String vhost)
{
getConfig().setProperty("virtualhosts.default", vhost);
- }
+ }
public void setHousekeepingCheckPeriod(long value)
{
@@ -883,4 +1017,15 @@ public class ServerConfiguration extends
return reply == null ? null : AmqpProtocolVersion.valueOf(reply);
}
+
+ public void setQpidWork(String path)
+ {
+ _qpidWork = path;
+ }
+
+ public void setQpidHome(String path)
+ {
+ _qpidHome = path;
+ }
+
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerNetworkTransportConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerNetworkTransportConfiguration.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerNetworkTransportConfiguration.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/ServerNetworkTransportConfiguration.java Fri Aug 3 12:13:32 2012
@@ -28,7 +28,7 @@ public class ServerNetworkTransportConfi
private final String _transport;
private InetSocketAddress _address;
- public ServerNetworkTransportConfiguration(final ServerConfiguration serverConfig,
+ public ServerNetworkTransportConfiguration(final ServerConfiguration serverConfig,
final InetSocketAddress address,
final String transport)
{
@@ -76,4 +76,15 @@ public class ServerNetworkTransportConfi
{
return _address;
}
+
+ public boolean needClientAuth()
+ {
+ return _serverConfig.needClientAuth();
+ }
+
+ @Override
+ public boolean wantClientAuth()
+ {
+ return _serverConfig.wantClientAuth();
+ }
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/SystemConfigImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/SystemConfigImpl.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/SystemConfigImpl.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/SystemConfigImpl.java Fri Aug 3 12:13:32 2012
@@ -33,7 +33,7 @@ public class SystemConfigImpl implements
private static final String OS_ARCH = System.getProperty("os.arch");
private static final String OS_VERSION = System.getProperty("os.version");
- private final UUID _id;
+ private final UUID _qmfId;
private String _name;
private final String _host;
@@ -48,9 +48,9 @@ public class SystemConfigImpl implements
this(store.createId(), store);
}
- public SystemConfigImpl(UUID id, ConfigStore store)
+ public SystemConfigImpl(UUID qmfId, ConfigStore store)
{
- _id = id;
+ _qmfId = qmfId;
_store = store;
String host;
try
@@ -95,9 +95,10 @@ public class SystemConfigImpl implements
return OS_ARCH;
}
- public UUID getId()
+ @Override
+ public UUID getQMFId()
{
- return _id;
+ return _qmfId;
}
public SystemConfigType getConfigType()
@@ -119,12 +120,12 @@ public class SystemConfigImpl implements
{
broker.setSystem(this);
_store.addConfiguredObject(broker);
- _brokers.put(broker.getId(), broker);
+ _brokers.put(broker.getQMFId(), broker);
}
public void removeBroker(final BrokerConfig broker)
{
- _brokers.remove(broker.getId());
+ _brokers.remove(broker.getQMFId());
_store.removeConfiguredObject(broker);
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/SystemConfigType.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/SystemConfigType.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/SystemConfigType.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/SystemConfigType.java Fri Aug 3 12:13:32 2012
@@ -65,7 +65,7 @@ public final class SystemConfigType exte
{
public UUID getValue(SystemConfig object)
{
- return object.getId();
+ return object.getQMFId();
}
};
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/TopicConfig.java Fri Aug 3 12:13:32 2012
@@ -61,7 +61,6 @@ public class TopicConfig extends Configu
throw new ConfigurationException("Topic section must have a 'name' or 'subscriptionName' element.");
}
- System.err.println("********* Created TC:"+this);
}
@@ -75,5 +74,5 @@ public class TopicConfig extends Configu
}
return response;
- }
+ }
}
\ No newline at end of file
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/configuration/VirtualHostConfiguration.java Fri Aug 3 12:13:32 2012
@@ -32,7 +32,6 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MemoryMessageStore;
-import org.apache.qpid.server.store.MemoryMessageStoreFactory;
import java.util.ArrayList;
import java.util.Arrays;
@@ -103,14 +102,14 @@ public class VirtualHostConfiguration ex
return getConfig().subset("store");
}
- public String getMessageStoreFactoryClass()
+ public String getMessageStoreClass()
{
- return getStringValue("store.factoryclass", MemoryMessageStoreFactory.class.getName());
+ return getStringValue("store.class", MemoryMessageStore.class.getName());
}
- public void setMessageStoreFactoryClass(String storeFactoryClass)
+ public void setMessageStoreClass(String storeFactoryClass)
{
- getConfig().setProperty("store.factoryclass", storeFactoryClass);
+ getConfig().setProperty("store.class", storeFactoryClass);
}
public List getExchanges()
@@ -271,7 +270,7 @@ public class VirtualHostConfiguration ex
public Long getMinimumAlertRepeatGap()
{
- return getLongValue("queues.minimumAlertRepeatGap");
+ return getLongValue("queues.minimumAlertRepeatGap", ApplicationRegistry.getInstance().getConfiguration().getMinimumAlertRepeatGap());
}
public long getCapacity()
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/connection/ConnectionRegistry.java Fri Aug 3 12:13:32 2012
@@ -22,13 +22,12 @@ package org.apache.qpid.server.connectio
import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
import org.apache.qpid.common.Closeable;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.transport.TransportException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -37,6 +36,8 @@ public class ConnectionRegistry implemen
private List<AMQConnectionModel> _registry = new CopyOnWriteArrayList<AMQConnectionModel>();
private Logger _logger = Logger.getLogger(ConnectionRegistry.class);
+ private final Collection<RegistryChangeListener> _listeners =
+ new ArrayList<RegistryChangeListener>();
public void initialise()
{
@@ -62,34 +63,77 @@ public class ConnectionRegistry implemen
}
}
- public void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message)
+ private void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message)
{
try
{
connection.close(cause, message);
}
- catch (TransportException e)
+ catch (Exception e)
{
- _logger.warn("Error closing connection:" + e.getMessage());
- }
- catch (AMQException e)
- {
- _logger.warn("Error closing connection:" + e.getMessage());
+ _logger.warn("Exception closing connection", e);
}
}
public void registerConnection(AMQConnectionModel connnection)
{
- _registry.add(connnection);
+ synchronized (this)
+ {
+ _registry.add(connnection);
+ synchronized (_listeners)
+ {
+ for(RegistryChangeListener listener : _listeners)
+ {
+ listener.connectionRegistered(connnection);
+ }
+ }
+ }
+ synchronized (_listeners)
+ {
+ for(RegistryChangeListener listener : _listeners)
+ {
+ listener.connectionRegistered(connnection);
+ }
+ }
}
public void deregisterConnection(AMQConnectionModel connnection)
{
- _registry.remove(connnection);
+ synchronized (this)
+ {
+ _registry.remove(connnection);
+
+ synchronized (_listeners)
+ {
+ for(RegistryChangeListener listener : _listeners)
+ {
+ listener.connectionUnregistered(connnection);
+ }
+ }
+ }
+
+ synchronized (_listeners)
+ {
+ for(RegistryChangeListener listener : _listeners)
+ {
+ listener.connectionUnregistered(connnection);
+ }
+ }
+ }
+
+ public void addRegistryChangeListener(RegistryChangeListener listener)
+ {
+ synchronized (_listeners)
+ {
+ _listeners.add(listener);
+ }
}
public List<AMQConnectionModel> getConnections()
{
- return new ArrayList<AMQConnectionModel>(_registry);
+ synchronized (this)
+ {
+ return new ArrayList<AMQConnectionModel>(_registry);
+ }
}
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/connection/IConnectionRegistry.java Fri Aug 3 12:13:32 2012
@@ -37,11 +37,18 @@ public interface IConnectionRegistry
public void close(String replyText) throws AMQException;
- public void closeConnection(AMQConnectionModel connection, AMQConstant cause, String message);
-
public List<AMQConnectionModel> getConnections();
public void registerConnection(AMQConnectionModel connnection);
public void deregisterConnection(AMQConnectionModel connnection);
+
+ void addRegistryChangeListener(RegistryChangeListener listener);
+
+ interface RegistryChangeListener
+ {
+ void connectionRegistered(AMQConnectionModel connection);
+ void connectionUnregistered(AMQConnectionModel connection);
+
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org