You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/08/03 14:14:12 UTC
svn commit: r1368910 [18/27] - in /qpid/branches/asyncstore: ./ bin/
cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ruby/ cpp/bindings/qpid/python/
cpp/bindings/qpid/ruby/ cpp/bindings/qpid/ruby/features/
cpp/bindings/qpid/ruby/features/step_definitions/ cpp...
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java Fri Aug 3 12:13:32 2012
@@ -20,17 +20,11 @@
*/
package org.apache.qpid.server.transport;
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
-import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.MessageMetaData_0_10;
-import org.apache.qpid.server.message.MessageTransferMessage;
-import org.apache.qpid.server.txn.RollbackOnlyDtxException;
-import org.apache.qpid.server.txn.TimeoutDtxException;
-import static org.apache.qpid.util.Serial.gt;
-
import java.security.Principal;
import java.text.MessageFormat;
import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -40,18 +34,16 @@ import java.util.SortedMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
-
import javax.security.auth.Subject;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.ProtocolEngine;
+import org.apache.qpid.server.TransactionTimeoutHelper;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
import org.apache.qpid.server.configuration.ConnectionConfig;
@@ -63,7 +55,10 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.subjects.ChannelLogSubject;
+import org.apache.qpid.server.message.InboundMessage;
+import org.apache.qpid.server.message.MessageMetaData_0_10;
import org.apache.qpid.server.message.MessageReference;
+import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -82,20 +77,25 @@ import org.apache.qpid.server.txn.Incorr
import org.apache.qpid.server.txn.JoinAndResumeDtxException;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.NotAssociatedDtxException;
+import org.apache.qpid.server.txn.RollbackOnlyDtxException;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.txn.SuspendAndFailDtxException;
+import org.apache.qpid.server.txn.TimeoutDtxException;
import org.apache.qpid.server.txn.UnknownDtxBranchException;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ServerSession extends Session
- implements AuthorizationHolder, SessionConfig,
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
+import static org.apache.qpid.util.Serial.gt;
+
+public class ServerSession extends Session
+ implements AuthorizationHolder, SessionConfig,
AMQSessionModel, LogSubject, AsyncAutoCommitTransaction.FutureRecorder
{
private static final Logger _logger = LoggerFactory.getLogger(ServerSession.class);
-
+
private static final String NULL_DESTINTATION = UUID.randomUUID().toString();
private static final int PRODUCER_CREDIT_TOPUP_THRESHOLD = 1 << 30;
private static final int UNFINISHED_COMMAND_QUEUE_THRESHOLD = 500;
@@ -105,7 +105,7 @@ public class ServerSession extends Sessi
private long _createTime = System.currentTimeMillis();
private LogActor _actor = GenericActor.getInstance(this);
- private final Set<AMQQueue> _blockingQueues = new ConcurrentSkipListSet<AMQQueue>();
+ private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>());
private final AtomicBoolean _blocking = new AtomicBoolean(false);
private ChannelLogSubject _logSubject;
@@ -145,6 +145,8 @@ public class ServerSession extends Sessi
private final List<Task> _taskList = new CopyOnWriteArrayList<Task>();
+ private final TransactionTimeoutHelper _transactionTimeoutHelper;
+
ServerSession(Connection connection, SessionDelegate delegate, Binary name, long expiry)
{
this(connection, delegate, name, expiry, ((ServerConnection)connection).getConfig());
@@ -158,6 +160,8 @@ public class ServerSession extends Sessi
_logSubject = new ChannelLogSubject(this);
_id = getConfigStore().createId();
getConfigStore().addConfiguredObject(this);
+
+ _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject);
}
protected void setState(State state)
@@ -167,9 +171,19 @@ public class ServerSession extends Sessi
if (state == State.OPEN)
{
_actor.message(ChannelMessages.CREATE());
+ if(_blocking.get())
+ {
+ invokeBlock();
+ }
}
}
+ private void invokeBlock()
+ {
+ invoke(new MessageSetFlowMode("", MessageFlowMode.CREDIT));
+ invoke(new MessageStop(""));
+ }
+
private ConfigStore getConfigStore()
{
return getConnectionConfig().getConfigStore();
@@ -455,7 +469,7 @@ public class ServerSession extends Sessi
{
return _transaction.isTransactional();
}
-
+
public boolean inTransaction()
{
return isTransactional() && _txnUpdateTime.get() > 0 && _transaction.getTransactionStartTime() > 0;
@@ -621,16 +635,26 @@ public class ServerSession extends Sessi
return _txnRejects.get();
}
+ public int getChannelId()
+ {
+ return getChannel();
+ }
+
public Long getTxnCount()
{
return _txnCount.get();
}
+ public Long getTxnStart()
+ {
+ return _txnStarts.get();
+ }
+
public Principal getAuthorizedPrincipal()
{
return getConnection().getAuthorizedPrincipal();
}
-
+
public Subject getAuthorizedSubject()
{
return getConnection().getAuthorizedSubject();
@@ -661,7 +685,8 @@ public class ServerSession extends Sessi
return (VirtualHost) _connectionConfig.getVirtualHost();
}
- public UUID getId()
+ @Override
+ public UUID getQMFId()
{
return _id;
}
@@ -755,63 +780,85 @@ public class ServerSession extends Sessi
long openTime = currentTime - _transaction.getTransactionStartTime();
long idleTime = currentTime - _txnUpdateTime.get();
- // Log a warning on idle or open transactions
- if (idleWarn > 0L && idleTime > idleWarn)
- {
- CurrentActor.get().message(getLogSubject(), ChannelMessages.IDLE_TXN(idleTime));
- _logger.warn("IDLE TRANSACTION ALERT " + getLogSubject().toString() + " " + idleTime + " ms");
- }
- else if (openWarn > 0L && openTime > openWarn)
- {
- CurrentActor.get().message(getLogSubject(), ChannelMessages.OPEN_TXN(openTime));
- _logger.warn("OPEN TRANSACTION ALERT " + getLogSubject().toString() + " " + openTime + " ms");
- }
-
- // Close connection for idle or open transactions that have timed out
- if (idleClose > 0L && idleTime > idleClose)
+ _transactionTimeoutHelper.logIfNecessary(idleTime, idleWarn, ChannelMessages.IDLE_TXN(idleTime),
+ TransactionTimeoutHelper.IDLE_TRANSACTION_ALERT);
+ if (_transactionTimeoutHelper.isTimedOut(idleTime, idleClose))
{
getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Idle transaction timed out");
+ return;
}
- else if (openClose > 0L && openTime > openClose)
+
+ _transactionTimeoutHelper.logIfNecessary(openTime, openWarn, ChannelMessages.OPEN_TXN(openTime),
+ TransactionTimeoutHelper.OPEN_TRANSACTION_ALERT);
+ if (_transactionTimeoutHelper.isTimedOut(openTime, openClose))
{
getConnectionModel().closeSession(this, AMQConstant.RESOURCE_ERROR, "Open transaction timed out");
+ return;
}
}
}
public void block(AMQQueue queue)
{
+ block(queue, queue.getName());
+ }
+
+ public void block()
+ {
+ block(this, "** All Queues **");
+ }
- if(_blockingQueues.add(queue))
- {
- if(_blocking.compareAndSet(false,true))
+ private void block(Object queue, String name)
+ {
+ synchronized (_blockingEntities)
+ {
+ if(_blockingEntities.add(queue))
{
- invoke(new MessageSetFlowMode("", MessageFlowMode.CREDIT));
- invoke(new MessageStop(""));
- _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(queue.getNameShortString().toString()));
- }
+
+ if(_blocking.compareAndSet(false,true))
+ {
+ if(getState() == State.OPEN)
+ {
+ invokeBlock();
+ }
+ _actor.message(_logSubject, ChannelMessages.FLOW_ENFORCED(name));
+ }
+ }
}
}
public void unblock(AMQQueue queue)
{
- if(_blockingQueues.remove(queue) && _blockingQueues.isEmpty())
+ unblock((Object)queue);
+ }
+
+ public void unblock()
+ {
+ unblock(this);
+ }
+
+ private void unblock(Object queue)
+ {
+ synchronized(_blockingEntities)
{
- if(_blocking.compareAndSet(true,false) && !isClosing())
+ if(_blockingEntities.remove(queue) && _blockingEntities.isEmpty())
{
+ if(_blocking.compareAndSet(true,false) && !isClosing())
+ {
- _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
- MessageFlow mf = new MessageFlow();
- mf.setUnit(MessageCreditUnit.MESSAGE);
- mf.setDestination("");
- _outstandingCredit.set(Integer.MAX_VALUE);
- mf.setValue(Integer.MAX_VALUE);
- invoke(mf);
+ _actor.message(_logSubject, ChannelMessages.FLOW_REMOVED());
+ MessageFlow mf = new MessageFlow();
+ mf.setUnit(MessageCreditUnit.MESSAGE);
+ mf.setDestination("");
+ _outstandingCredit.set(Integer.MAX_VALUE);
+ mf.setValue(Integer.MAX_VALUE);
+ invoke(mf);
+ }
}
}
}
@@ -1020,7 +1067,12 @@ public class ServerSession extends Sessi
public int compareTo(AMQSessionModel session)
{
- return getId().compareTo(session.getId());
+ return getQMFId().compareTo(session.getQMFId());
}
+ @Override
+ public int getConsumerCount()
+ {
+ return _subscriptions.values().size();
+ }
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java Fri Aug 3 12:13:32 2012
@@ -99,9 +99,9 @@ public class ServerSessionDelegate exten
Object newOutstanding = ((ServerSession)session).getAsyncCommandMark();
if(newOutstanding == null || newOutstanding == asyncCommandMark)
{
- session.processed(method);
+ session.processed(method);
}
-
+
if(newOutstanding != null)
{
((ServerSession)session).completeAsyncCommands();
@@ -240,13 +240,13 @@ public class ServerSessionDelegate exten
}
FlowCreditManager_0_10 creditManager = new WindowCreditManager(0L,0L);
-
+
FilterManager filterManager = null;
- try
+ try
{
filterManager = FilterManagerFactory.createManager(method.getArguments());
- }
- catch (AMQException amqe)
+ }
+ catch (AMQException amqe)
{
exception(session, method, ExecutionErrorCode.ILLEGAL_ARGUMENT, "Exception Creating FilterManager");
return;
@@ -257,7 +257,7 @@ public class ServerSessionDelegate exten
method.getAcceptMode(),
method.getAcquireMode(),
MessageFlowMode.WINDOW,
- creditManager,
+ creditManager,
filterManager,
method.getArguments());
@@ -297,13 +297,13 @@ public class ServerSessionDelegate exten
final MessageMetaData_0_10 messageMetaData = new MessageMetaData_0_10(xfr);
messageMetaData.setConnectionReference(((ServerSession)ssn).getReference());
-
+
if (!getVirtualHost(ssn).getSecurityManager().authorisePublish(messageMetaData.isImmediate(), messageMetaData.getRoutingKey(), exchange.getName()))
{
ExecutionErrorCode errorCode = ExecutionErrorCode.UNAUTHORIZED_ACCESS;
String description = "Permission denied: exchange-name '" + exchange.getName() + "'";
exception(ssn, xfr, errorCode, description);
-
+
return;
}
@@ -807,7 +807,7 @@ public class ServerSessionDelegate exten
}
}
- // TODO decouple AMQException and AMQConstant error codes
+ // TODO decouple AMQException and AMQConstant error codes
private void exception(Session session, Method method, AMQException exception, String message)
{
ExecutionErrorCode errorCode = ExecutionErrorCode.INTERNAL_ERROR;
@@ -823,7 +823,7 @@ public class ServerSessionDelegate exten
}
}
String description = message + "': " + exception.getMessage();
-
+
exception(session, method, errorCode, description);
}
@@ -1226,11 +1226,7 @@ public class ServerSessionDelegate exten
try
{
queue = createQueue(queueName, method, virtualHost, (ServerSession)session);
- if(method.getExclusive())
- {
- queue.setExclusive(true);
- }
- else if(method.getAutoDelete())
+ if(!method.getExclusive() && method.getAutoDelete())
{
queue.setDeleteOnNoConsumers(true);
}
@@ -1349,9 +1345,9 @@ public class ServerSessionDelegate exten
+ " as exclusive queue with same name "
+ "declared on another session";
ExecutionErrorCode errorCode = ExecutionErrorCode.RESOURCE_LOCKED;
-
+
exception(session, method, errorCode, description);
-
+
return;
}
}
@@ -1389,7 +1385,7 @@ public class ServerSessionDelegate exten
{
String owner = body.getExclusive() ? session.getClientID() : null;
- final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateUUID(), queueName, body.getDurable(), owner,
+ final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateQueueUUID(queueName, virtualHost.getName()), queueName, body.getDurable(), owner,
body.getAutoDelete(), body.getExclusive(), virtualHost, body.getArguments());
return queue;
@@ -1436,7 +1432,7 @@ public class ServerSessionDelegate exten
else
{
VirtualHost virtualHost = getVirtualHost(session);
-
+
try
{
queue.delete();
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/AsyncAutoCommitTransaction.java Fri Aug 3 12:13:32 2012
@@ -44,11 +44,16 @@ import java.util.List;
*/
public class AsyncAutoCommitTransaction implements ServerTransaction
{
+ static final String QPID_STRICT_ORDER_WITH_MIXED_DELIVERY_MODE = "qpid.strict_order_with_mixed_delivery_mode";
+
protected static final Logger _logger = Logger.getLogger(AsyncAutoCommitTransaction.class);
private final MessageStore _messageStore;
private final FutureRecorder _futureRecorder;
+ //Set true to ensure strict ordering when enqueing messages with mixed delivery mode, i.e. disable async persistence
+ private boolean _strictOrderWithMixedDeliveryMode = Boolean.getBoolean(QPID_STRICT_ORDER_WITH_MIXED_DELIVERY_MODE);
+
public static interface FutureRecorder
{
public void recordFuture(StoreFuture future, Action action);
@@ -129,6 +134,23 @@ public class AsyncAutoCommitTransaction
}
}
+ private void addEnqueueFuture(final StoreFuture future, final Action action, boolean persistent)
+ {
+ if(action != null)
+ {
+ // For persistent messages, do not synchronously invoke postCommit even if the future is completed.
+ // Otherwise, postCommit (which actually does the enqueuing) might be called on successive messages out of order.
+ if(future.isComplete() && !persistent && !_strictOrderWithMixedDeliveryMode)
+ {
+ action.postCommit();
+ }
+ else
+ {
+ _futureRecorder.recordFuture(future, action);
+ }
+ }
+ }
+
public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
{
Transaction txn = null;
@@ -203,7 +225,7 @@ public class AsyncAutoCommitTransaction
{
future = StoreFuture.IMMEDIATE_FUTURE;
}
- addFuture(future, postTransactionAction);
+ addEnqueueFuture(future, postTransactionAction, message.isPersistent());
postTransactionAction = null;
}
catch (AMQException e)
@@ -257,7 +279,7 @@ public class AsyncAutoCommitTransaction
{
future = StoreFuture.IMMEDIATE_FUTURE;
}
- addFuture(future, postTransactionAction);
+ addEnqueueFuture(future, postTransactionAction, message.isPersistent());
postTransactionAction = null;
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/DtxBranch.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/DtxBranch.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/DtxBranch.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/DtxBranch.java Fri Aug 3 12:13:32 2012
@@ -98,12 +98,29 @@ public class DtxBranch
public void setTimeout(long timeout)
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Setting timeout to " + timeout + "s for DtxBranch " + _xid);
+ }
+
if(_timeoutFuture != null)
{
- _timeoutFuture.cancel(false);
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Attempting to cancel previous timeout task future for DtxBranch " + _xid);
+ }
+
+ boolean succeeded = _timeoutFuture.cancel(false);
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Cancelling previous timeout task " + (succeeded ? "succeeded" : "failed")
+ + " for DtxBranch " + _xid);
+ }
}
+
_timeout = timeout;
- _expiration = timeout == 0 ? 0 : System.currentTimeMillis() + timeout;
+ _expiration = timeout == 0 ? 0 : System.currentTimeMillis() + (1000 * timeout);
if(_timeout == 0)
{
@@ -111,10 +128,23 @@ public class DtxBranch
}
else
{
- _timeoutFuture = _vhost.scheduleTask(_timeout, new Runnable()
+ long delay = 1000*_timeout;
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Scheduling timeout and rollback after " + delay/1000 +
+ "s for DtxBranch " + _xid);
+ }
+
+ _timeoutFuture = _vhost.scheduleTask(delay, new Runnable()
{
public void run()
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Timing out DtxBranch " + _xid);
+ }
+
setState(State.TIMEDOUT);
try
{
@@ -122,8 +152,7 @@ public class DtxBranch
}
catch (AMQStoreException e)
{
- _logger.error("Unexpected error when attempting to rollback XA transaction ("+
- _xid + ") due to timeout", e);
+ _logger.error("Unexpected error when attempting to rollback DtxBranch "+ _xid + " due to timeout", e);
throw new RuntimeException(e);
}
}
@@ -199,6 +228,10 @@ public class DtxBranch
public void prepare() throws AMQStoreException
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Performing prepare for DtxBranch " + _xid);
+ }
Transaction txn = _store.newTransaction();
txn.recordXid(_xid.getFormat(),
@@ -213,12 +246,27 @@ public class DtxBranch
public synchronized void rollback() throws AMQStoreException
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Performing rollback for DtxBranch " + _xid);
+ }
+
if(_timeoutFuture != null)
{
- _timeoutFuture.cancel(false);
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Attempting to cancel previous timeout task future for DtxBranch " + _xid);
+ }
+
+ boolean succeeded = _timeoutFuture.cancel(false);
_timeoutFuture = null;
- }
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Cancelling previous timeout task " + (succeeded ? "succeeded" : "failed")
+ + " for DtxBranch " + _xid);
+ }
+ }
if(_transaction != null)
{
@@ -240,10 +288,26 @@ public class DtxBranch
public void commit() throws AMQStoreException
{
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Performing commit for DtxBranch " + _xid);
+ }
+
if(_timeoutFuture != null)
{
- _timeoutFuture.cancel(false);
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Attempting to cancel previous timeout task future for DtxBranch " + _xid);
+ }
+
+ boolean succeeded = _timeoutFuture.cancel(false);
_timeoutFuture = null;
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Cancelling previous timeout task " + (succeeded ? "succeeded" : "failed")
+ + " for DtxBranch " + _xid);
+ }
}
if(_transaction == null)
@@ -342,7 +406,7 @@ public class DtxBranch
}
catch(AMQStoreException e)
{
- _logger.error("Error while closing XA branch", e);
+ _logger.error("Error while closing DtxBranch " + _xid, e);
}
}
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/DtxRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/DtxRegistry.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/DtxRegistry.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/DtxRegistry.java Fri Aug 3 12:13:32 2012
@@ -37,7 +37,7 @@ public class DtxRegistry
private static final class ComparableXid
{
private final Xid _xid;
-
+
private ComparableXid(Xid xid)
{
_xid = xid;
@@ -58,7 +58,7 @@ public class DtxRegistry
ComparableXid that = (ComparableXid) o;
return compareBytes(_xid.getBranchId(), that._xid.getBranchId())
- && compareBytes(_xid.getGlobalId(), that._xid.getGlobalId());
+ && compareBytes(_xid.getGlobalId(), that._xid.getGlobalId());
}
private static boolean compareBytes(byte[] a, byte[] b)
@@ -94,7 +94,7 @@ public class DtxRegistry
return result;
}
}
-
+
public synchronized DtxBranch getBranch(Xid xid)
{
return _branches.get(new ComparableXid(xid));
@@ -116,7 +116,7 @@ public class DtxRegistry
return (_branches.remove(new ComparableXid(branch.getXid())) != null);
}
- public void commit(Xid id, boolean onePhase)
+ public synchronized void commit(Xid id, boolean onePhase)
throws IncorrectDtxStateException, UnknownDtxBranchException, AMQStoreException, RollbackOnlyDtxException, TimeoutDtxException
{
DtxBranch branch = getBranch(id);
@@ -204,7 +204,7 @@ public class DtxRegistry
}
}
- public void rollback(Xid id)
+ public synchronized void rollback(Xid id)
throws IncorrectDtxStateException,
UnknownDtxBranchException,
AMQStoreException, TimeoutDtxException
@@ -318,6 +318,7 @@ public class DtxRegistry
branch.disassociateSession(session);
}
}
+
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransaction.java Fri Aug 3 12:13:32 2012
@@ -1,4 +1,3 @@
-package org.apache.qpid.server.txn;
/*
*
* Licensed to the Apache Software Foundation (ASF) under one
@@ -19,7 +18,9 @@ package org.apache.qpid.server.txn;
* under the License.
*
*/
+package org.apache.qpid.server.txn;
+import org.apache.qpid.server.store.StoreFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -50,6 +51,7 @@ public class LocalTransaction implements
private volatile Transaction _transaction;
private MessageStore _transactionLog;
private long _txnStartTime = 0L;
+ private StoreFuture _asyncTran;
public LocalTransaction(MessageStore transactionLog)
{
@@ -68,11 +70,13 @@ public class LocalTransaction implements
public void addPostTransactionAction(Action postTransactionAction)
{
+ sync();
_postTransactionActions.add(postTransactionAction);
}
public void dequeue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
{
+ sync();
_postTransactionActions.add(postTransactionAction);
if(message.isPersistent() && queue.isDurable())
@@ -98,6 +102,7 @@ public class LocalTransaction implements
public void dequeue(Collection<QueueEntry> queueEntries, Action postTransactionAction)
{
+ sync();
_postTransactionActions.add(postTransactionAction);
try
@@ -131,10 +136,7 @@ public class LocalTransaction implements
{
try
{
- for(Action action : _postTransactionActions)
- {
- action.onRollback();
- }
+ doRollbackActions();
}
finally
{
@@ -151,7 +153,7 @@ public class LocalTransaction implements
}
finally
{
- resetDetails();
+ resetDetails();
}
}
@@ -176,6 +178,7 @@ public class LocalTransaction implements
public void enqueue(BaseQueue queue, EnqueableMessage message, Action postTransactionAction)
{
+ sync();
_postTransactionActions.add(postTransactionAction);
if(message.isPersistent() && queue.isDurable())
@@ -201,6 +204,7 @@ public class LocalTransaction implements
public void enqueue(List<? extends BaseQueue> queues, EnqueableMessage message, Action postTransactionAction, long currentTime)
{
+ sync();
_postTransactionActions.add(postTransactionAction);
if (_txnStartTime == 0L)
@@ -239,11 +243,13 @@ public class LocalTransaction implements
public void commit()
{
+ sync();
commit(null);
}
public void commit(Runnable immediateAction)
{
+ sync();
try
{
if(_transaction != null)
@@ -256,29 +262,137 @@ public class LocalTransaction implements
immediateAction.run();
}
- for(int i = 0; i < _postTransactionActions.size(); i++)
+ doPostTransactionActions();
+ }
+ catch (Exception e)
+ {
+ _logger.error("Failed to commit transaction", e);
+
+ doRollbackActions();
+ throw new RuntimeException("Failed to commit transaction", e);
+ }
+ finally
+ {
+ resetDetails();
+ }
+ }
+
+ private void doRollbackActions()
+ {
+ for(Action action : _postTransactionActions)
+ {
+ action.onRollback();
+ }
+ }
+
+ public StoreFuture commitAsync(final Runnable deferred)
+ {
+ sync();
+ try
+ {
+ StoreFuture future = StoreFuture.IMMEDIATE_FUTURE;
+ if(_transaction != null)
{
- _postTransactionActions.get(i).postCommit();
+ future = new StoreFuture()
+ {
+ private volatile boolean _completed = false;
+ private StoreFuture _underlying = _transaction.commitTranAsync();
+
+ @Override
+ public boolean isComplete()
+ {
+ return _completed || checkUnderlyingCompletion();
+ }
+
+ @Override
+ public void waitForCompletion()
+ {
+ if(!_completed)
+ {
+ _underlying.waitForCompletion();
+ checkUnderlyingCompletion();
+ }
+ }
+
+ private synchronized boolean checkUnderlyingCompletion()
+ {
+ if(!_completed && _underlying.isComplete())
+ {
+ completeDeferredWork();
+ _completed = true;
+ }
+ return _completed;
+
+ }
+
+ private void completeDeferredWork()
+ {
+ try
+ {
+ doPostTransactionActions();
+ deferred.run();
+
+ }
+ catch (Exception e)
+ {
+ _logger.error("Failed to commit transaction", e);
+
+ doRollbackActions();
+ throw new RuntimeException("Failed to commit transaction", e);
+ }
+ finally
+ {
+ resetDetails();
+ }
+ }
+
+ };
+ _asyncTran = future;
}
+ else
+ {
+ try
+ {
+ doPostTransactionActions();
+
+ deferred.run();
+ }
+ finally
+ {
+ resetDetails();
+ }
+ }
+
+ return future;
}
catch (Exception e)
{
_logger.error("Failed to commit transaction", e);
-
- for(Action action : _postTransactionActions)
+ try
{
- action.onRollback();
+ doRollbackActions();
+ }
+ finally
+ {
+ resetDetails();
}
throw new RuntimeException("Failed to commit transaction", e);
}
- finally
+
+
+ }
+
+ private void doPostTransactionActions()
+ {
+ for(int i = 0; i < _postTransactionActions.size(); i++)
{
- resetDetails();
+ _postTransactionActions.get(i).postCommit();
}
}
public void rollback()
{
+ sync();
try
{
if(_transaction != null)
@@ -295,10 +409,7 @@ public class LocalTransaction implements
{
try
{
- for(Action action : _postTransactionActions)
- {
- action.onRollback();
- }
+ doRollbackActions();
}
finally
{
@@ -306,9 +417,19 @@ public class LocalTransaction implements
}
}
}
-
+
+ public void sync()
+ {
+ if(_asyncTran != null)
+ {
+ _asyncTran.waitForCompletion();
+ _asyncTran = null;
+ }
+ }
+
private void resetDetails()
{
+ _asyncTran = null;
_transaction = null;
_postTransactionActions.clear();
_txnStartTime = 0L;
Propchange: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost:r1333988-1368650
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/HouseKeepingTask.java Fri Aug 3 12:13:32 2012
@@ -44,9 +44,9 @@ public abstract class HouseKeepingTask i
final public void run()
{
- // Don't need to undo this as this is a thread pool thread so will
- // always go through here before we do any real work.
+ String originalThreadName = Thread.currentThread().getName();
Thread.currentThread().setName(_name);
+
CurrentActor.set(new AbstractActor(_rootLogger)
{
@Override
@@ -67,6 +67,9 @@ public abstract class HouseKeepingTask i
finally
{
CurrentActor.remove();
+
+ // eagerly revert the thread name to make thread dumps more meaningful if captured after task has finished
+ Thread.currentThread().setName(originalThreadName);
}
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/State.java Fri Aug 3 12:13:32 2012
@@ -25,5 +25,7 @@ public enum State
INITIALISING,
ACTIVE,
PASSIVE,
- STOPPED
+ STOPPED,
+ /** Terminal state that signifies the virtual host has experienced an unexpected condition. */
+ ERRORED
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java Fri Aug 3 12:13:32 2012
@@ -32,7 +32,6 @@ import org.apache.qpid.server.connection
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.federation.BrokerLink;
-import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
@@ -62,10 +61,10 @@ public interface VirtualHost extends Dur
void close();
- ManagedObject getManagedObject();
-
UUID getBrokerId();
+ UUID getId();
+
void scheduleHouseKeepingTask(long period, HouseKeepingTask task);
long getHouseKeepingTaskCount();
@@ -74,7 +73,7 @@ public interface VirtualHost extends Dur
int getHouseKeepingPoolSize();
- void setHouseKeepingPoolSize(int newSize);
+ void setHouseKeepingPoolSize(int newSize);
int getHouseKeepingActiveCount();
@@ -102,4 +101,8 @@ public interface VirtualHost extends Dur
ScheduledFuture<?> scheduleTask(long delay, Runnable timeoutTask);
State getState();
+
+ public void block();
+
+ public void unblock();
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java Fri Aug 3 12:13:32 2012
@@ -76,15 +76,12 @@ public class VirtualHostConfigRecoveryHa
private final VirtualHost _virtualHost;
- private MessageStoreLogSubject _logSubject;
-
- private MessageStore _store;
-
private final Map<String, Integer> _queueRecoveries = new TreeMap<String, Integer>();
- private Map<Long, AbstractServerMessageImpl> _recoveredMessages = new HashMap<Long, AbstractServerMessageImpl>();
- private Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>();
-
+ private final Map<Long, AbstractServerMessageImpl> _recoveredMessages = new HashMap<Long, AbstractServerMessageImpl>();
+ private final Map<Long, StoredMessage> _unusedMessages = new HashMap<Long, StoredMessage>();
+ private MessageStoreLogSubject _logSubject;
+ private MessageStore _store;
public VirtualHostConfigRecoveryHandler(VirtualHost virtualHost)
{
@@ -100,7 +97,7 @@ public class VirtualHostConfigRecoveryHa
return this;
}
- public void queue(UUID id, String queueName, String owner, boolean exclusive, FieldTable arguments)
+ public void queue(UUID id, String queueName, String owner, boolean exclusive, FieldTable arguments, UUID alternateExchangeId)
{
try
{
@@ -111,6 +108,17 @@ public class VirtualHostConfigRecoveryHa
q = AMQQueueFactory.createAMQQueueImpl(id, queueName, true, owner, false, exclusive, _virtualHost,
FieldTable.convertToMap(arguments));
_virtualHost.getQueueRegistry().registerQueue(q);
+
+ if (alternateExchangeId != null)
+ {
+ Exchange altExchange = _virtualHost.getExchangeRegistry().getExchange(alternateExchangeId);
+ if (altExchange == null)
+ {
+ _logger.error("Unknown exchange id " + alternateExchangeId + ", cannot set alternate exchange on queue with id " + id);
+ return;
+ }
+ q.setAlternateExchange(altExchange);
+ }
}
CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_START(queueName, true));
@@ -120,12 +128,12 @@ public class VirtualHostConfigRecoveryHa
}
catch (AMQException e)
{
- // TODO
- throw new RuntimeException(e);
+ throw new RuntimeException("Error recovering queue uuid " + id + " name " + queueName, e);
}
}
- public ExchangeRecoveryHandler completeQueueRecovery()
+ @Override
+ public BindingRecoveryHandler completeQueueRecovery()
{
return this;
}
@@ -145,19 +153,17 @@ public class VirtualHostConfigRecoveryHa
}
catch (AMQException e)
{
- // TODO
- throw new RuntimeException(e);
+ throw new RuntimeException("Error recovering exchange uuid " + id + " name " + exchangeName, e);
}
}
- public BindingRecoveryHandler completeExchangeRecovery()
+ public QueueRecoveryHandler completeExchangeRecovery()
{
return this;
}
public StoredMessageRecoveryHandler begin()
{
- // TODO - log begin
return this;
}
@@ -182,7 +188,6 @@ public class VirtualHostConfigRecoveryHa
public void completeMessageRecovery()
{
- //TODO - log end
}
public BridgeRecoveryHandler brokerLink(final UUID id,
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java Fri Aug 3 12:13:32 2012
@@ -20,12 +20,20 @@
*/
package org.apache.qpid.server.virtualhost;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.AMQBrokerManagerMBean;
import org.apache.qpid.server.binding.BindingFactory;
import org.apache.qpid.server.configuration.BrokerConfig;
import org.apache.qpid.server.configuration.ConfigStore;
@@ -45,11 +53,10 @@ import org.apache.qpid.server.federation
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.logging.subjects.MessageStoreLogSubject;
-import org.apache.qpid.server.management.AMQManagedObject;
-import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
+import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
+import org.apache.qpid.server.protocol.v1_0.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.DefaultQueueRegistry;
@@ -59,37 +66,25 @@ import org.apache.qpid.server.security.S
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
+import org.apache.qpid.server.store.HAMessageStore;
import org.apache.qpid.server.store.MessageStore;
-import org.apache.qpid.server.store.MessageStoreFactory;
import org.apache.qpid.server.store.OperationalLoggingListener;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.virtualhost.plugins.VirtualHostPlugin;
import org.apache.qpid.server.virtualhost.plugins.VirtualHostPluginFactory;
-import javax.management.JMException;
-import javax.management.NotCompliantMBeanException;
-import javax.management.ObjectName;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.ScheduledThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
-
-
-public class VirtualHostImpl implements VirtualHost
+public class VirtualHostImpl implements VirtualHost, IConnectionRegistry.RegistryChangeListener, EventListener
{
private static final Logger _logger = Logger.getLogger(VirtualHostImpl.class);
private static final int HOUSEKEEPING_SHUTDOWN_TIMEOUT = 5;
- private final UUID _id;
+ private final UUID _qmfId;
private final String _name;
+ private final UUID _id;
+
private final long _createTime = System.currentTimeMillis();
private final ConcurrentHashMap<BrokerLink,BrokerLink> _links = new ConcurrentHashMap<BrokerLink, BrokerLink>();
@@ -104,10 +99,6 @@ public class VirtualHostImpl implements
private final VirtualHostConfiguration _vhostConfig;
- private final VirtualHostMBean _virtualHostMBean;
-
- private final AMQBrokerManagerMBean _brokerMBean;
-
private final QueueRegistry _queueRegistry;
private final ExchangeRegistry _exchangeRegistry;
@@ -122,13 +113,12 @@ public class VirtualHostImpl implements
private final MessageStore _messageStore;
- private State _state = State.INITIALISING;
-
- private boolean _statisticsEnabled = false;
+ private volatile State _state = State.INITIALISING;
private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
private final Map<String, LinkRegistry> _linkRegistry = new HashMap<String, LinkRegistry>();
+ private boolean _blocked;
public VirtualHostImpl(IApplicationRegistry appRegistry, VirtualHostConfiguration hostConfig) throws Exception
{
@@ -143,20 +133,21 @@ public class VirtualHostImpl implements
}
_appRegistry = appRegistry;
- _brokerConfig = _appRegistry.getBroker();
+ _brokerConfig = _appRegistry.getBrokerConfig();
_vhostConfig = hostConfig;
_name = _vhostConfig.getName();
_dtxRegistry = new DtxRegistry();
- _id = _appRegistry.getConfigStore().createId();
+ _qmfId = _appRegistry.getConfigStore().createId();
+ _id = UUIDGenerator.generateVhostUUID(_name);
CurrentActor.get().message(VirtualHostMessages.CREATED(_name));
- _virtualHostMBean = new VirtualHostMBean();
_securityManager = new SecurityManager(_appRegistry.getSecurityManager());
_securityManager.configureHostPlugins(_vhostConfig);
_connectionRegistry = new ConnectionRegistry();
+ _connectionRegistry.addRegistryChangeListener(this);
_houseKeepingTasks = new ScheduledThreadPoolExecutor(_vhostConfig.getHouseKeepingThreadCount());
@@ -169,15 +160,16 @@ public class VirtualHostImpl implements
_bindingFactory = new BindingFactory(this);
- _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean);
-
- _messageStore = initialiseMessageStore(hostConfig.getMessageStoreFactoryClass());
+ _messageStore = initialiseMessageStore(hostConfig.getMessageStoreClass());
configureMessageStore(hostConfig);
activateNonHAMessageStore();
initialiseStatistics();
+
+ _messageStore.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
+ _messageStore.addEventListener(this, Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
}
public IConnectionRegistry getConnectionRegistry()
@@ -195,6 +187,12 @@ public class VirtualHostImpl implements
return _id;
}
+ @Override
+ public UUID getQMFId()
+ {
+ return _qmfId;
+ }
+
public VirtualHostConfigType getConfigType()
{
return VirtualHostConfigType.getInstance();
@@ -324,20 +322,19 @@ public class VirtualHostImpl implements
}
- private MessageStore initialiseMessageStore(final String messageStoreFactoryClass) throws Exception
+ private MessageStore initialiseMessageStore(final String messageStoreClass) throws Exception
{
- final Class<?> clazz = Class.forName(messageStoreFactoryClass);
+ final Class<?> clazz = Class.forName(messageStoreClass);
final Object o = clazz.newInstance();
- if (!(o instanceof MessageStoreFactory))
+ if (!(o instanceof MessageStore))
{
- throw new ClassCastException("Message store factory class must implement " + MessageStoreFactory.class +
+ throw new ClassCastException("Message store factory class must implement " + MessageStore.class +
". Class " + clazz + " does not.");
}
- final MessageStoreFactory messageStoreFactory = (MessageStoreFactory) o;
- final MessageStore messageStore = messageStoreFactory.createMessageStore();
- final MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, messageStoreFactory.getStoreClassName());
+ final MessageStore messageStore = (MessageStore) o;
+ final MessageStoreLogSubject storeLogSubject = new MessageStoreLogSubject(this, clazz.getSimpleName());
OperationalLoggingListener.listen(messageStore, storeLogSubject);
messageStore.addEventListener(new BeforeActivationListener(), Event.BEFORE_ACTIVATE);
@@ -361,7 +358,10 @@ public class VirtualHostImpl implements
private void activateNonHAMessageStore() throws Exception
{
- _messageStore.activate();
+ if (!(_messageStore instanceof HAMessageStore))
+ {
+ _messageStore.activate();
+ }
}
private void initialiseModel(VirtualHostConfiguration config) throws ConfigurationException, AMQException
@@ -534,16 +534,6 @@ public class VirtualHostImpl implements
CurrentActor.get().message(VirtualHostMessages.CLOSED());
}
- public ManagedObject getBrokerMBean()
- {
- return _brokerMBean;
- }
-
- public ManagedObject getManagedObject()
- {
- return _virtualHostMBean;
- }
-
public UUID getBrokerId()
{
return _appRegistry.getBrokerId();
@@ -558,54 +548,48 @@ public class VirtualHostImpl implements
{
return _bindingFactory;
}
-
+
public void registerMessageDelivered(long messageSize)
{
- if (isStatisticsEnabled())
- {
- _messagesDelivered.registerEvent(1L);
- _dataDelivered.registerEvent(messageSize);
- }
+ _messagesDelivered.registerEvent(1L);
+ _dataDelivered.registerEvent(messageSize);
_appRegistry.registerMessageDelivered(messageSize);
}
-
+
public void registerMessageReceived(long messageSize, long timestamp)
{
- if (isStatisticsEnabled())
- {
- _messagesReceived.registerEvent(1L, timestamp);
- _dataReceived.registerEvent(messageSize, timestamp);
- }
+ _messagesReceived.registerEvent(1L, timestamp);
+ _dataReceived.registerEvent(messageSize, timestamp);
_appRegistry.registerMessageReceived(messageSize, timestamp);
}
-
+
public StatisticsCounter getMessageReceiptStatistics()
{
return _messagesReceived;
}
-
+
public StatisticsCounter getDataReceiptStatistics()
{
return _dataReceived;
}
-
+
public StatisticsCounter getMessageDeliveryStatistics()
{
return _messagesDelivered;
}
-
+
public StatisticsCounter getDataDeliveryStatistics()
{
return _dataDelivered;
}
-
+
public void resetStatistics()
{
_messagesDelivered.reset();
_dataDelivered.reset();
_messagesReceived.reset();
_dataReceived.reset();
-
+
for (AMQConnectionModel connection : _connectionRegistry.getConnections())
{
connection.resetStatistics();
@@ -614,25 +598,12 @@ public class VirtualHostImpl implements
public void initialiseStatistics()
{
- setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
- _appRegistry.getConfiguration().isStatisticsGenerationVirtualhostsEnabled());
-
_messagesDelivered = new StatisticsCounter("messages-delivered-" + getName());
_dataDelivered = new StatisticsCounter("bytes-delivered-" + getName());
_messagesReceived = new StatisticsCounter("messages-received-" + getName());
_dataReceived = new StatisticsCounter("bytes-received-" + getName());
}
- public boolean isStatisticsEnabled()
- {
- return _statisticsEnabled;
- }
-
- public void setStatisticsEnabled(boolean enabled)
- {
- _statisticsEnabled = enabled;
- }
-
public BrokerLink createBrokerConnection(UUID id, long createTime, Map<String,String> arguments)
{
BrokerLink blink = new BrokerLink(this, id, createTime, arguments);
@@ -699,107 +670,155 @@ public class VirtualHostImpl implements
return _dtxRegistry;
}
- @Override
public String toString()
{
return _name;
}
- @Override
public State getState()
{
return _state;
}
-
- /**
- * Virtual host JMX MBean class.
- *
- * This has some of the methods implemented from management interface for exchanges. Any
- * Implementation of an Exchange MBean should extend this class.
- */
- public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost
+ public void block()
{
- public VirtualHostMBean() throws NotCompliantMBeanException
+ synchronized (_connectionRegistry)
{
- super(ManagedVirtualHost.class, ManagedVirtualHost.TYPE);
+ if(!_blocked)
+ {
+ _blocked = true;
+ for(AMQConnectionModel conn : _connectionRegistry.getConnections())
+ {
+ conn.block();
+ }
+ }
}
+ }
- public String getObjectInstanceName()
- {
- return ObjectName.quote(_name);
- }
- public String getName()
+ public void unblock()
+ {
+ synchronized (_connectionRegistry)
{
- return _name;
+ if(_blocked)
+ {
+ _blocked = false;
+ for(AMQConnectionModel conn : _connectionRegistry.getConnections())
+ {
+ conn.unblock();
+ }
+ }
}
+ }
- public VirtualHostImpl getVirtualHost()
+ public void connectionRegistered(final AMQConnectionModel connection)
+ {
+ if(_blocked)
{
- return VirtualHostImpl.this;
+ connection.block();
}
}
- private final class BeforeActivationListener implements EventListener
+ public void connectionUnregistered(final AMQConnectionModel connection)
{
- @Override
- public void event(Event event)
+ }
+
+ public void event(final Event event)
+ {
+ switch(event)
{
- try
- {
- _exchangeRegistry.initialise();
- initialiseModel(_vhostConfig);
- } catch (Exception e)
- {
- throw new RuntimeException("Failed to initialise virtual host after state change", e);
- }
+ case PERSISTENT_MESSAGE_SIZE_OVERFULL:
+ block();
+ break;
+ case PERSISTENT_MESSAGE_SIZE_UNDERFULL:
+ unblock();
+ break;
}
}
- private final class AfterActivationListener implements EventListener
+ private final class BeforeActivationListener implements EventListener
+ {
+ @Override
+ public void event(Event event)
+ {
+ try
+ {
+ _exchangeRegistry.initialise();
+ initialiseModel(_vhostConfig);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Failed to initialise virtual host after state change", e);
+ }
+ }
+ }
+
+ private final class AfterActivationListener implements EventListener
+ {
+ @Override
+ public void event(Event event)
+ {
+ State finalState = State.ERRORED;
+
+ try
+ {
+ initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod());
+ finalState = State.ACTIVE;
+ }
+ finally
+ {
+ _state = finalState;
+ reportIfError(_state);
+ }
+ }
+ }
+
+ private final class BeforePassivationListener implements EventListener
{
- @Override
public void event(Event event)
{
- initialiseHouseKeeping(_vhostConfig.getHousekeepingCheckPeriod());
+ State finalState = State.ERRORED;
+
try
{
- _brokerMBean.register();
- } catch (JMException e)
+ /* the approach here is not ideal as there is a race condition where a
+ * queue etc could be created while the virtual host is on the way to
+ * the passivated state. However the store state change from MASTER to UNKNOWN
+ * is documented as exceptionally rare..
+ */
+
+ _connectionRegistry.close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT);
+ removeHouseKeepingTasks();
+
+ _queueRegistry.stopAllAndUnregisterMBeans();
+ _exchangeRegistry.clearAndUnregisterMbeans();
+ _dtxRegistry.close();
+
+ finalState = State.PASSIVE;
+ }
+ finally
{
- throw new RuntimeException("Failed to register virtual host mbean for virtual host " + getName(), e);
+ _state = finalState;
+ reportIfError(_state);
}
-
- _state = State.ACTIVE;
}
+
}
- public class BeforePassivationListener implements EventListener
+ private final class BeforeCloseListener implements EventListener
{
-
@Override
public void event(Event event)
{
- _connectionRegistry.close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT);
- _brokerMBean.unregister();
- removeHouseKeepingTasks();
-
- _queueRegistry.stopAllAndUnregisterMBeans();
- _exchangeRegistry.clearAndUnregisterMbeans();
- _dtxRegistry.close();
-
- _state = State.PASSIVE;
+ shutdownHouseKeeping();
}
}
- private final class BeforeCloseListener implements EventListener
+ private void reportIfError(State state)
{
- @Override
- public void event(Event event)
+ if (state == State.ERRORED)
{
- _brokerMBean.unregister();
- shutdownHouseKeeping();
+ CurrentActor.get().message(VirtualHostMessages.ERRORED());
}
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostRegistry.java Fri Aug 3 12:13:32 2012
@@ -22,10 +22,12 @@ package org.apache.qpid.server.virtualho
import org.apache.qpid.common.Closeable;
import org.apache.qpid.server.configuration.ConfigStore;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -37,6 +39,8 @@ public class VirtualHostRegistry impleme
private String _defaultVirtualHostName;
private ApplicationRegistry _applicationRegistry;
+ private final Collection<RegistryChangeListener> _listeners =
+ Collections.synchronizedCollection(new ArrayList<RegistryChangeListener>());
public VirtualHostRegistry(ApplicationRegistry applicationRegistry)
{
@@ -50,11 +54,25 @@ public class VirtualHostRegistry impleme
throw new Exception("Virtual Host with name " + host.getName() + " already registered.");
}
_registry.put(host.getName(),host);
+ synchronized (_listeners)
+ {
+ for(RegistryChangeListener listener : _listeners)
+ {
+ listener.virtualHostRegistered(host);
+ }
+ }
}
public synchronized void unregisterVirtualHost(VirtualHost host)
{
_registry.remove(host.getName());
+ synchronized (_listeners)
+ {
+ for(RegistryChangeListener listener : _listeners)
+ {
+ listener.virtualHostUnregistered(host);
+ }
+ }
}
public VirtualHost getVirtualHost(String name)
@@ -106,4 +124,17 @@ public class VirtualHostRegistry impleme
}
}
+
+ public static interface RegistryChangeListener
+ {
+ void virtualHostRegistered(VirtualHost virtualHost);
+ void virtualHostUnregistered(VirtualHost virtualHost);
+
+ }
+
+ public void addRegistryChangeListener(RegistryChangeListener listener)
+ {
+ _listeners.add(listener);
+ }
+
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/virtualhost/plugins/SlowConsumerDetection.java Fri Aug 3 12:13:32 2012
@@ -1,5 +1,6 @@
/*
*
+ * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
Modified: qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/BrokerOptionsTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/BrokerOptionsTest.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/BrokerOptionsTest.java (original)
+++ qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/BrokerOptionsTest.java Fri Aug 3 12:13:32 2012
@@ -199,4 +199,31 @@ public class BrokerOptionsTest extends Q
_options.setLogWatchFrequency(myFreq);
assertEquals(myFreq, _options.getLogWatchFrequency());
}
+
+ public void testDefaultIncludesPortFor0_10()
+ {
+ assertEquals(Collections.EMPTY_SET, _options.getIncludedPorts(ProtocolInclusion.v0_10));
+ }
+
+ public void testOverriddenIncludesPortFor0_10()
+ {
+ _options.addIncludedPort(ProtocolInclusion.v0_10, TEST_PORT1);
+ assertEquals(Collections.singleton(TEST_PORT1), _options.getIncludedPorts(ProtocolInclusion.v0_10));
+ }
+
+ public void testManyOverriddenIncludedPortFor0_10()
+ {
+ _options.addIncludedPort(ProtocolInclusion.v0_10, TEST_PORT1);
+ _options.addIncludedPort(ProtocolInclusion.v0_10, TEST_PORT2);
+ final Set<Integer> expectedPorts = new HashSet<Integer>(Arrays.asList(new Integer[] {TEST_PORT1, TEST_PORT2}));
+ assertEquals(expectedPorts, _options.getIncludedPorts(ProtocolInclusion.v0_10));
+ }
+
+ public void testDuplicatedOverriddenIncludedPortFor0_10AreSilentlyIgnored()
+ {
+ _options.addIncludedPort(ProtocolInclusion.v0_10, TEST_PORT1);
+ _options.addIncludedPort(ProtocolInclusion.v0_10, TEST_PORT2);
+ final Set<Integer> expectedPorts = new HashSet<Integer>(Arrays.asList(new Integer[] {TEST_PORT1, TEST_PORT2}));
+ assertEquals(expectedPorts, _options.getIncludedPorts(ProtocolInclusion.v0_10));
+ }
}
Modified: qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/MainTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/MainTest.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/MainTest.java (original)
+++ qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/MainTest.java Fri Aug 3 12:13:32 2012
@@ -47,6 +47,11 @@ public class MainTest extends QpidTestCa
{
assertEquals(0, options.getExcludedPorts(pe).size());
}
+
+ for(ProtocolInclusion pe : EnumSet.allOf(ProtocolInclusion.class))
+ {
+ assertEquals(0, options.getIncludedPorts(pe).size());
+ }
}
public void testPortOverriddenSingle()
@@ -162,6 +167,20 @@ public class MainTest extends QpidTestCa
assertTrue("Parsed command line didnt pick up help option", main.getCommandLine().hasOption("h"));
}
+ public void testInclude010()
+ {
+ BrokerOptions options = startDummyMain("-p 5678 --include-0-10 5678");
+
+ assertTrue(options.getPorts().contains(5678));
+ assertEquals(1, options.getPorts().size());
+ assertTrue(options.getIncludedPorts(ProtocolInclusion.v0_10).contains(5678));
+ assertEquals(1, options.getIncludedPorts(ProtocolInclusion.v0_10).size());
+ assertEquals(0, options.getIncludedPorts(ProtocolInclusion.v0_9_1).size());
+ assertEquals(0, options.getIncludedPorts(ProtocolInclusion.v0_9).size());
+ assertEquals(0, options.getIncludedPorts(ProtocolInclusion.v0_8).size());
+ assertEquals(0, options.getIncludedPorts(ProtocolInclusion.v1_0).size());
+ }
+
private BrokerOptions startDummyMain(String commandLine)
{
return (new TestMain(commandLine.split("\\s"))).getOptions();
Modified: qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/configuration/MockConnectionConfig.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/configuration/MockConnectionConfig.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/configuration/MockConnectionConfig.java (original)
+++ qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/configuration/MockConnectionConfig.java Fri Aug 3 12:13:32 2012
@@ -25,14 +25,14 @@ import java.util.UUID;
public class MockConnectionConfig implements ConnectionConfig
{
- public MockConnectionConfig(UUID _id, ConnectionConfigType _configType,
+ public MockConnectionConfig(UUID _qmfId, ConnectionConfigType _configType,
ConfiguredObject<ConnectionConfigType, ConnectionConfig> _parent, boolean _durable,
long _createTime, VirtualHostConfig _virtualHost, String _address, Boolean _incoming,
Boolean _systemConnection, Boolean _federationLink, String _authId, String _remoteProcessName,
Integer _remotePID, Integer _remoteParentPID, ConfigStore _configStore, Boolean _shadow)
{
super();
- this._id = _id;
+ this._qmfId = _qmfId;
this._configType = _configType;
this._parent = _parent;
this._durable = _durable;
@@ -50,7 +50,7 @@ public class MockConnectionConfig implem
this._shadow = _shadow;
}
- private UUID _id;
+ private UUID _qmfId;
private ConnectionConfigType _configType;
private ConfiguredObject<ConnectionConfigType, ConnectionConfig> _parent;
private boolean _durable;
@@ -68,9 +68,9 @@ public class MockConnectionConfig implem
private Boolean _shadow;
@Override
- public UUID getId()
+ public UUID getQMFId()
{
- return _id;
+ return _qmfId;
}
@Override
Modified: qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java (original)
+++ qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/configuration/QueueConfigurationTest.java Fri Aug 3 12:13:32 2012
@@ -176,20 +176,29 @@ public class QueueConfigurationTest exte
assertEquals(1, qConf.getMaximumMessageCount());
}
- public void testGetMinimumAlertRepeatGap() throws ConfigurationException
+ public void testGetMinimumAlertRepeatGap() throws Exception
{
- // Check default value
- QueueConfiguration qConf = new QueueConfiguration("test", _emptyConf);
- assertEquals(0, qConf.getMinimumAlertRepeatGap());
-
- // Check explicit value
- VirtualHostConfiguration vhostConfig = overrideConfiguration("minimumAlertRepeatGap", 2);
- qConf = new QueueConfiguration("test", vhostConfig);
- assertEquals(2, qConf.getMinimumAlertRepeatGap());
-
- // Check inherited value
- qConf = new QueueConfiguration("test", _fullHostConf);
- assertEquals(1, qConf.getMinimumAlertRepeatGap());
+ try
+ {
+ ApplicationRegistry registry = new TestApplicationRegistry(new ServerConfiguration(_env));
+ ApplicationRegistry.initialise(registry);
+ // Check default value
+ QueueConfiguration qConf = new QueueConfiguration("test", _emptyConf);
+ assertEquals(ServerConfiguration.DEFAULT_MINIMUM_ALERT_REPEAT_GAP, qConf.getMinimumAlertRepeatGap());
+
+ // Check explicit value
+ VirtualHostConfiguration vhostConfig = overrideConfiguration("minimumAlertRepeatGap", 2);
+ qConf = new QueueConfiguration("test", vhostConfig);
+ assertEquals(2, qConf.getMinimumAlertRepeatGap());
+
+ // Check inherited value
+ qConf = new QueueConfiguration("test", _fullHostConf);
+ assertEquals(1, qConf.getMinimumAlertRepeatGap());
+ }
+ finally
+ {
+ ApplicationRegistry.remove();
+ }
}
public void testSortQueueConfiguration() throws ConfigurationException
@@ -204,6 +213,18 @@ public class QueueConfigurationTest exte
assertEquals("test-sort-key", qConf.getQueueSortKey());
}
+ public void testQueueDescription() throws ConfigurationException
+ {
+ //Check default value
+ QueueConfiguration qConf = new QueueConfiguration("test", _emptyConf);
+ assertNull(qConf.getDescription());
+
+ // Check explicit value
+ final VirtualHostConfiguration vhostConfig = overrideConfiguration("description", "mydescription");
+ qConf = new QueueConfiguration("test", vhostConfig);
+ assertEquals("mydescription", qConf.getDescription());
+ }
+
private VirtualHostConfiguration overrideConfiguration(String property, Object value)
throws ConfigurationException
{
Modified: qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java (original)
+++ qpid/branches/asyncstore/java/broker/src/test/java/org/apache/qpid/server/configuration/ServerConfigurationTest.java Fri Aug 3 12:13:32 2012
@@ -25,6 +25,7 @@ import org.apache.commons.configuration.
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.protocol.AmqpProtocolVersion;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.ConfigurationFileApplicationRegistry;
import org.apache.qpid.server.util.TestApplicationRegistry;
@@ -251,13 +252,13 @@ public class ServerConfigurationTest ext
{
// Check default
_serverConfig.initialise();
- assertEquals(true, _serverConfig.getManagementSSLEnabled());
+ assertEquals(false, _serverConfig.getManagementSSLEnabled());
// Check value we set
- _config.setProperty("management.ssl.enabled", false);
+ _config.setProperty("management.ssl.enabled", true);
_serverConfig = new ServerConfiguration(_config);
_serverConfig.initialise();
- assertEquals(false, _serverConfig.getManagementSSLEnabled());
+ assertEquals(true, _serverConfig.getManagementSSLEnabled());
}
public void testGetManagementKeystorePassword() throws ConfigurationException
@@ -286,25 +287,17 @@ public class ServerConfigurationTest ext
assertEquals(false, _serverConfig.getQueueAutoRegister());
}
- public void testGetManagementEnabled() throws ConfigurationException
+ public void testGetJMXManagementEnabled() throws ConfigurationException
{
// Check default
_serverConfig.initialise();
- assertEquals(true, _serverConfig.getManagementEnabled());
+ assertEquals(true, _serverConfig.getJMXManagementEnabled());
// Check value we set
_config.setProperty("management.enabled", false);
_serverConfig = new ServerConfiguration(_config);
_serverConfig.initialise();
- assertEquals(false, _serverConfig.getManagementEnabled());
- }
-
- public void testSetManagementEnabled() throws ConfigurationException
- {
- // Check value we set
- _serverConfig.initialise();
- _serverConfig.setManagementEnabled(false);
- assertEquals(false, _serverConfig.getManagementEnabled());
+ assertEquals(false, _serverConfig.getJMXManagementEnabled());
}
public void testGetManagementRightsInferAllAccess() throws Exception
@@ -401,7 +394,7 @@ public class ServerConfigurationTest ext
{
// Check default
_serverConfig.initialise();
- assertEquals(0, _serverConfig.getMinimumAlertRepeatGap());
+ assertEquals(30000l, _serverConfig.getMinimumAlertRepeatGap());
// Check value we set
_config.setProperty("minimumAlertRepeatGap", 10L);
@@ -1588,6 +1581,168 @@ public class ServerConfigurationTest ext
assertEquals(false, _serverConfig.isAmqp08enabled());
}
+ public void testPortInclude08() throws ConfigurationException
+ {
+ // Check default
+ _serverConfig.initialise();
+ assertEquals(true, _serverConfig.getPortInclude08().isEmpty());
+
+ // Check values we set
+ _config.addProperty(ServerConfiguration.CONNECTOR_INCLUDE_08, "1");
+ _config.addProperty(ServerConfiguration.CONNECTOR_INCLUDE_08, "2");
+ _serverConfig = new ServerConfiguration(_config);
+ _serverConfig.initialise();
+ assertEquals(2, _serverConfig.getPortInclude08().size());
+ assertTrue(_serverConfig.getPortInclude08().contains("1"));
+ assertTrue(_serverConfig.getPortInclude08().contains("2"));
+ }
+
+ public void testPortInclude09() throws ConfigurationException
+ {
+ // Check default
+ _serverConfig.initialise();
+ assertEquals(true, _serverConfig.getPortInclude09().isEmpty());
+
+ // Check values we set
+ _config.addProperty(ServerConfiguration.CONNECTOR_INCLUDE_09, "3");
+ _config.addProperty(ServerConfiguration.CONNECTOR_INCLUDE_09, "4");
+ _serverConfig = new ServerConfiguration(_config);
+ _serverConfig.initialise();
+ assertEquals(2, _serverConfig.getPortInclude09().size());
+ assertTrue(_serverConfig.getPortInclude09().contains("3"));
+ assertTrue(_serverConfig.getPortInclude09().contains("4"));
+ }
+
+ public void testPortInclude091() throws ConfigurationException
+ {
+ // Check default
+ _serverConfig.initialise();
+ assertEquals(true, _serverConfig.getPortInclude091().isEmpty());
+
+ // Check values we set
+ _config.addProperty(ServerConfiguration.CONNECTOR_INCLUDE_091, "5");
+ _config.addProperty(ServerConfiguration.CONNECTOR_INCLUDE_091, "6");
+ _serverConfig = new ServerConfiguration(_config);
+ _serverConfig.initialise();
+ assertEquals(2, _serverConfig.getPortInclude091().size());
+ assertTrue(_serverConfig.getPortInclude091().contains("5"));
+ assertTrue(_serverConfig.getPortInclude091().contains("6"));
+ }
+
+ public void testPortInclude010() throws ConfigurationException
+ {
+ // Check default
+ _serverConfig.initialise();
+ assertEquals(true, _serverConfig.getPortInclude010().isEmpty());
+
+ // Check values we set
+ _config.addProperty(ServerConfiguration.CONNECTOR_INCLUDE_010, "7");
+ _config.addProperty(ServerConfiguration.CONNECTOR_INCLUDE_010, "8");
+ _serverConfig = new ServerConfiguration(_config);
+ _serverConfig.initialise();
+ assertEquals(2, _serverConfig.getPortInclude010().size());
+ assertTrue(_serverConfig.getPortInclude010().contains("7"));
+ assertTrue(_serverConfig.getPortInclude010().contains("8"));
+ }
+
+ public void testPortInclude10() throws ConfigurationException
+ {
+ // Check default
+ _serverConfig.initialise();
+ assertEquals(true, _serverConfig.getPortInclude10().isEmpty());
+
+ // Check values we set
+ _config.addProperty(ServerConfiguration.CONNECTOR_INCLUDE_10, "9");
+ _config.addProperty(ServerConfiguration.CONNECTOR_INCLUDE_10, "10");
+ _serverConfig = new ServerConfiguration(_config);
+ _serverConfig.initialise();
+ assertEquals(2, _serverConfig.getPortInclude10().size());
+ assertTrue(_serverConfig.getPortInclude10().contains("9"));
+ assertTrue(_serverConfig.getPortInclude10().contains("10"));
+ }
+
+ public void testGetDefaultSupportedProtocolReply() throws Exception
+ {
+ // Check default
+ _serverConfig.initialise();
+ assertNull("unexpected default value", _serverConfig.getDefaultSupportedProtocolReply());
+
+ // Check values we set
+ _config.addProperty(ServerConfiguration.CONNECTOR_AMQP_SUPPORTED_REPLY, "v0_10");
+ _serverConfig = new ServerConfiguration(_config);
+ _serverConfig.initialise();
+ assertEquals(AmqpProtocolVersion.v0_10, _serverConfig.getDefaultSupportedProtocolReply());
+ }
+
+ public void testDefaultAuthenticationManager() throws Exception
+ {
+ // Check default
+ _serverConfig.initialise();
+ assertNull("unexpected default value", _serverConfig.getDefaultAuthenticationManager());
+
+ // Check values we set
+ String testAuthManager = "myauthmanager";
+ _config.addProperty("security.default-auth-manager", testAuthManager);
+ _serverConfig = new ServerConfiguration(_config);
+ _serverConfig.initialise();
+ assertEquals(testAuthManager, _serverConfig.getDefaultAuthenticationManager());
+ }
+
+ public void testPortAuthenticationMappingsDefault() throws Exception
+ {
+ _serverConfig.initialise();
+ assertEquals("unexpected default number of port/authmanager mappings", 0, _serverConfig.getPortAuthenticationMappings().size());
+ }
+
+ public void testPortAuthenticationMappingsWithSingleMapping() throws Exception
+ {
+ String testAuthManager = "myauthmanager";
+ _config.addProperty("security.port-mappings.port-mapping.port", 1234);
+ _config.addProperty("security.port-mappings.port-mapping.auth-manager", testAuthManager);
+
+ _serverConfig = new ServerConfiguration(_config);
+ _serverConfig.initialise();
+ assertEquals("unexpected number of port/authmanager mappings", 1, _serverConfig.getPortAuthenticationMappings().size());
+ assertEquals("unexpected mapping for port", testAuthManager, _serverConfig.getPortAuthenticationMappings().get(1234));
+ }
+
+ public void testPortAuthenticationMappingsWithManyMapping() throws Exception
+ {
+ String testAuthManager1 = "myauthmanager1";
+ String testAuthManager2 = "myauthmanager2";
+ _config.addProperty("security.port-mappings.port-mapping(-1).port", 1234);
+ _config.addProperty("security.port-mappings.port-mapping.auth-manager", testAuthManager1);
+
+ _config.addProperty("security.port-mappings.port-mapping(-1).port", 2345);
+ _config.addProperty("security.port-mappings.port-mapping.auth-manager", testAuthManager2);
+
+ _serverConfig = new ServerConfiguration(_config);
+ _serverConfig.initialise();
+
+ assertEquals("unexpected number of port/authmanager mappings", 2, _serverConfig.getPortAuthenticationMappings().size());
+ assertEquals("unexpected mapping for port", testAuthManager1, _serverConfig.getPortAuthenticationMappings().get(1234));
+ assertEquals("unexpected mapping for port", testAuthManager2, _serverConfig.getPortAuthenticationMappings().get(2345));
+ }
+
+ public void testPortAuthenticationMappingWithMissingAuthManager() throws Exception
+ {
+ _config.addProperty("security.port-mappings.port-mapping(-1).port", 1234);
+ // no auth manager defined for port
+ _serverConfig = new ServerConfiguration(_config);
+ try
+ {
+ _serverConfig.initialise();
+ fail("Exception not thrown");
+ }
+ catch(ConfigurationException ce)
+ {
+ // PASS
+ assertEquals("Incorrect error message",
+ "Validation error: Each port-mapping must have exactly one port and exactly one auth-manager.",
+ ce.getMessage());
+ }
+ }
+
/**
* Convenience method to output required security preamble for broker config
*/
@@ -1605,7 +1760,6 @@ public class ServerConfigurationTest ext
out.write("\t\t\t\t\t</attribute>\n");
out.write("\t\t\t\t</attributes>\n");
out.write("\t\t\t</principal-database>\n");
- out.write("\t\t\t<jmx-access>/dev/null</jmx-access>\n");
out.write("\t\t</pd-auth-manager>\n");
out.write("\t</security>\n");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org