You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/08/03 14:14:12 UTC
svn commit: r1368910 [17/27] - in /qpid/branches/asyncstore: ./ bin/
cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ruby/ cpp/bindings/qpid/python/
cpp/bindings/qpid/ruby/ cpp/bindings/qpid/ruby/features/
cpp/bindings/qpid/ruby/features/step_definitions/ cpp...
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/state/AMQStateManager.java Fri Aug 3 12:13:32 2012
@@ -32,7 +32,9 @@ import org.apache.qpid.protocol.AMQConst
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQMethodListener;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -61,6 +63,18 @@ public class AMQStateManager implements
}
+ /**
+ * Get the ApplicationRegistry associated with this AMQStateManager
+ *
+ * returns the application registry associated with the VirtualHostRegistry of the AMQStateManager
+ *
+ * @return the ApplicationRegistry
+ */
+ public IApplicationRegistry getApplicationRegistry()
+ {
+ return _virtualHostRegistry.getApplicationRegistry();
+ }
+
public AMQState getCurrentState()
{
return _currentState;
@@ -142,4 +156,14 @@ public class AMQStateManager implements
SecurityManager.setThreadSubject(_protocolSession.getAuthorizedSubject());
return _protocolSession;
}
+
+ /**
+ * Get the AuthenticationManager associated with the ProtocolSession of the AMQStateManager
+ *
+ * @return the AuthenticationManager
+ */
+ public AuthenticationManager getAuthenticationManager()
+ {
+ return getApplicationRegistry().getAuthenticationManager(getProtocolSession().getLocalAddress());
+ }
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsCounter.java Fri Aug 3 12:13:32 2012
@@ -33,8 +33,7 @@ public class StatisticsCounter
private static final Logger _log = LoggerFactory.getLogger(StatisticsCounter.class);
public static final long DEFAULT_SAMPLE_PERIOD = Long.getLong("qpid.statistics.samplePeriod", 2000L); // 2s
- public static final boolean DISABLE_STATISTICS = Boolean.getBoolean("qpid.statistics.disable");
-
+
private static final String COUNTER = "counter";
private static final AtomicLong _counterIds = new AtomicLong(0L);
@@ -78,11 +77,6 @@ public class StatisticsCounter
public void registerEvent(long value, long timestamp)
{
- if (DISABLE_STATISTICS)
- {
- return;
- }
-
long thisSample = (timestamp / _period);
synchronized (this)
{
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/stats/StatisticsGatherer.java Fri Aug 3 12:13:32 2012
@@ -103,16 +103,4 @@ public interface StatisticsGatherer
* Reset the counters for this, and any child {@link StatisticsGatherer}s.
*/
void resetStatistics();
-
- /**
- * Check if this object has statistics generation enabled.
- *
- * @return true if statistics generation is enabled
- */
- boolean isStatisticsEnabled();
-
- /**
- * Enable or disable statistics generation for this object.
- */
- void setStatisticsEnabled(boolean enabled);
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java Fri Aug 3 12:13:32 2012
@@ -28,20 +28,21 @@ import java.util.UUID;
public interface ConfigurationRecoveryHandler
{
- QueueRecoveryHandler begin(MessageStore store);
+ ExchangeRecoveryHandler begin(MessageStore store);
- public static interface QueueRecoveryHandler
+ public static interface ExchangeRecoveryHandler
{
- void queue(UUID id, String queueName, String owner, boolean exclusive, FieldTable arguments);
- ExchangeRecoveryHandler completeQueueRecovery();
+ void exchange(UUID id, String exchangeName, String type, boolean autoDelete);
+ QueueRecoveryHandler completeExchangeRecovery();
}
- public static interface ExchangeRecoveryHandler
+ public static interface QueueRecoveryHandler
{
- void exchange(UUID id, String exchangeName, String type, boolean autoDelete);
- BindingRecoveryHandler completeExchangeRecovery();
+ void queue(UUID id, String queueName, String owner, boolean exclusive, FieldTable arguments, UUID alternateExchangeId);
+ BindingRecoveryHandler completeQueueRecovery();
}
+
public static interface BindingRecoveryHandler
{
void binding(UUID bindingId, UUID exchangeId, UUID queueId, String bindingName, ByteBuffer buf);
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java Fri Aug 3 12:13:32 2012
@@ -40,12 +40,7 @@ import org.apache.qpid.server.util.MapJs
public class ConfiguredObjectHelper
{
- /**
- * Name of queue attribute to store queue creation arguments.
- * <p>
- * This attribute is not defined yet on Queue configured object interface.
- */
- private static final String QUEUE_ARGUMENTS = "ARGUMENTS";
+
private MapJsonSerializer _serializer = new MapJsonSerializer();
@@ -57,14 +52,15 @@ public class ConfiguredObjectHelper
String queueName = (String) attributeMap.get(Queue.NAME);
String owner = (String) attributeMap.get(Queue.OWNER);
boolean exclusive = (Boolean) attributeMap.get(Queue.EXCLUSIVE);
+ UUID alternateExchangeId = attributeMap.get(Queue.ALTERNATE_EXCHANGE) == null ? null : UUID.fromString((String)attributeMap.get(Queue.ALTERNATE_EXCHANGE));
@SuppressWarnings("unchecked")
- Map<String, Object> queueArgumentsMap = (Map<String, Object>) attributeMap.get(QUEUE_ARGUMENTS);
+ Map<String, Object> queueArgumentsMap = (Map<String, Object>) attributeMap.get(Queue.ARGUMENTS);
FieldTable arguments = null;
if (queueArgumentsMap != null)
{
arguments = FieldTable.convertToFieldTable(queueArgumentsMap);
}
- qrh.queue(configuredObject.getId(), queueName, owner, exclusive, arguments);
+ qrh.queue(configuredObject.getId(), queueName, owner, exclusive, arguments, alternateExchangeId);
}
}
@@ -73,6 +69,24 @@ public class ConfiguredObjectHelper
Map<String, Object> attributesMap = _serializer.deserialize(queueRecord.getAttributes());
attributesMap.put(Queue.NAME, queue.getName());
attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive());
+ if (queue.getAlternateExchange() != null)
+ {
+ attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId());
+ }
+ else
+ {
+ attributesMap.remove(Queue.ALTERNATE_EXCHANGE);
+ }
+ if (attributesMap.containsKey(Queue.ARGUMENTS))
+ {
+ // We wouldn't need this if createQueueConfiguredObject took only AMQQueue
+ Map<String, Object> currentArgs = (Map<String, Object>) attributesMap.get(Queue.ARGUMENTS);
+ currentArgs.putAll(queue.getArguments());
+ }
+ else
+ {
+ attributesMap.put(Queue.ARGUMENTS, queue.getArguments());
+ }
String newJson = _serializer.serialize(attributesMap);
ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(queue.getId(), queueRecord.getType(), newJson);
return newQueueRecord;
@@ -84,9 +98,15 @@ public class ConfiguredObjectHelper
attributesMap.put(Queue.NAME, queue.getName());
attributesMap.put(Queue.OWNER, AMQShortString.toString(queue.getOwner()));
attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive());
+ if (queue.getAlternateExchange() != null)
+ {
+ attributesMap.put(Queue.ALTERNATE_EXCHANGE, queue.getAlternateExchange().getId());
+ }
+ // TODO KW i think the arguments could come from the queue itself removing the need for the parameter arguments.
+ // It would also do away with the need for the if/then/else within updateQueueConfiguredObject
if (arguments != null)
{
- attributesMap.put(QUEUE_ARGUMENTS, FieldTable.convertToMap(arguments));
+ attributesMap.put(Queue.ARGUMENTS, FieldTable.convertToMap(arguments));
}
String json = _serializer.serialize(attributesMap);
ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(queue.getId(), Queue.class.getName(), json);
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/Event.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/Event.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/Event.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/Event.java Fri Aug 3 12:13:32 2012
@@ -23,10 +23,20 @@ public enum Event
{
BEFORE_INIT,
AFTER_INIT,
+
BEFORE_ACTIVATE,
AFTER_ACTIVATE,
+
BEFORE_PASSIVATE,
AFTER_PASSIVATE,
+
BEFORE_CLOSE,
- AFTER_CLOSE
+ AFTER_CLOSE,
+
+ BEFORE_QUIESCE,
+ AFTER_QUIESCE,
+ BEFORE_RESTART,
+
+ PERSISTENT_MESSAGE_SIZE_OVERFULL,
+ PERSISTENT_MESSAGE_SIZE_UNDERFULL
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/EventManager.java Fri Aug 3 12:13:32 2012
@@ -24,9 +24,12 @@ import java.util.EnumMap;
import java.util.List;
import java.util.Map;
+import org.apache.log4j.Logger;
+
public class EventManager
{
private Map<Event, List<EventListener>> _listeners = new EnumMap<Event, List<EventListener>> (Event.class);
+ private static final Logger _LOGGER = Logger.getLogger(EventManager.class);
public synchronized void addEventListener(EventListener listener, Event... events)
{
@@ -46,6 +49,11 @@ public class EventManager
{
if (_listeners.containsKey(event))
{
+ if(_LOGGER.isDebugEnabled())
+ {
+ _LOGGER.debug("Received event " + event);
+ }
+
for (EventListener listener : _listeners.get(event))
{
listener.event(event);
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Fri Aug 3 12:13:32 2012
@@ -83,19 +83,19 @@ public class MemoryMessageStore extends
@Override
public void configureConfigStore(String name, ConfigurationRecoveryHandler recoveryHandler, Configuration config) throws Exception
{
- _stateManager.attainState(State.CONFIGURING);
+ _stateManager.attainState(State.INITIALISING);
}
@Override
public void configureMessageStore(String name, MessageStoreRecoveryHandler recoveryHandler, TransactionLogRecoveryHandler tlogRecoveryHandler, Configuration config) throws Exception
{
- _stateManager.attainState(State.CONFIGURED);
+ _stateManager.attainState(State.INITIALISED);
}
@Override
public void activate() throws Exception
{
- _stateManager.attainState(State.RECOVERING);
+ _stateManager.attainState(State.ACTIVATING);
_stateManager.attainState(State.ACTIVE);
}
@@ -134,4 +134,10 @@ public class MemoryMessageStore extends
{
_eventManager.addEventListener(eventListener, events);
}
+
+ @Override
+ public String getStoreType()
+ {
+ return "Memory";
+ }
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java Fri Aug 3 12:13:32 2012
@@ -67,4 +67,6 @@ public interface MessageStore extends Du
void addEventListener(EventListener eventListener, Event... events);
String getStoreLocation();
+
+ String getStoreType();
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/MessageStoreConstants.java Fri Aug 3 12:13:32 2012
@@ -23,5 +23,7 @@ public class MessageStoreConstants
{
public static final String ENVIRONMENT_PATH_PROPERTY = "environment-path";
+ public static final String OVERFULL_SIZE_PROPERTY = "overfull-size";
+ public static final String UNDERFULL_SIZE_PROPERTY = "underfull-size";
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java Fri Aug 3 12:13:32 2012
@@ -28,7 +28,7 @@ import org.apache.qpid.server.federation
import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.queue.AMQQueue;
-public class NullMessageStore implements MessageStore
+public abstract class NullMessageStore implements MessageStore
{
@Override
public void configureConfigStore(String name,
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/OperationalLoggingListener.java Fri Aug 3 12:13:32 2012
@@ -33,7 +33,14 @@ public class OperationalLoggingListener
private OperationalLoggingListener(final MessageStore store, LogSubject logSubject)
{
_logSubject = logSubject;
- store.addEventListener(this, Event.BEFORE_INIT, Event.AFTER_INIT, Event.BEFORE_ACTIVATE, Event.AFTER_ACTIVATE, Event.AFTER_CLOSE);
+ store.addEventListener(this,
+ Event.BEFORE_INIT,
+ Event.AFTER_INIT,
+ Event.BEFORE_ACTIVATE,
+ Event.AFTER_ACTIVATE,
+ Event.AFTER_CLOSE,
+ Event.PERSISTENT_MESSAGE_SIZE_OVERFULL,
+ Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
_store = store;
}
@@ -62,7 +69,13 @@ public class OperationalLoggingListener
case AFTER_CLOSE:
CurrentActor.get().message(_logSubject,MessageStoreMessages.CLOSED());
break;
-
+ case PERSISTENT_MESSAGE_SIZE_OVERFULL:
+ CurrentActor.get().message(_logSubject,MessageStoreMessages.OVERFULL());
+ break;
+ case PERSISTENT_MESSAGE_SIZE_UNDERFULL:
+ CurrentActor.get().message(_logSubject,MessageStoreMessages.UNDERFULL());
+ break;
+
}
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/State.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/State.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/State.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/State.java Fri Aug 3 12:13:32 2012
@@ -20,19 +20,30 @@
*/
package org.apache.qpid.server.store;
+import org.apache.qpid.server.configuration.ConfiguredObject;
+
public enum State
{
-
+ /** The initial state of the store. In practice, the store immediately transitions to the subsequent states. */
INITIAL,
- CONFIGURING,
- CONFIGURED,
- RECOVERING,
+
+ INITIALISING,
+ /**
+ * The initial set-up of the store has completed.
+ * If the store is persistent, it has not yet loaded configuration for {@link ConfiguredObject}'s from disk.
+ *
+ * From the point of view of the user, the store is essentially stopped.
+ */
+ INITIALISED,
+
+ ACTIVATING,
ACTIVE,
- QUIESCING,
- QUIESCED,
+
CLOSING,
- CLOSED;
-
+ CLOSED,
+ QUIESCING,
+ /** The virtual host (and implicitly also the store) has been manually paused by the user to allow configuration changes to take place */
+ QUIESCED;
}
\ No newline at end of file
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/StateManager.java Fri Aug 3 12:13:32 2012
@@ -24,6 +24,8 @@ package org.apache.qpid.server.store;
import java.util.EnumMap;
import java.util.Map;
+import org.apache.qpid.server.store.StateManager.Transition;
+
public class StateManager
{
private State _state = State.INITIAL;
@@ -70,16 +72,23 @@ public class StateManager
}
- public static final Transition CONFIGURE = new Transition(State.INITIAL, State.CONFIGURING, Event.BEFORE_INIT);
- public static final Transition CONFIGURE_COMPLETE = new Transition(State.CONFIGURING, State.CONFIGURED, Event.AFTER_INIT);
- public static final Transition RECOVER = new Transition(State.CONFIGURED, State.RECOVERING, Event.BEFORE_ACTIVATE);
- public static final Transition ACTIVATE = new Transition(State.RECOVERING, State.ACTIVE, Event.AFTER_ACTIVATE);
+ public static final Transition INITIALISE = new Transition(State.INITIAL, State.INITIALISING, Event.BEFORE_INIT);
+ public static final Transition INITALISE_COMPLETE = new Transition(State.INITIALISING, State.INITIALISED, Event.AFTER_INIT);
+
+ public static final Transition ACTIVATE = new Transition(State.INITIALISED, State.ACTIVATING, Event.BEFORE_ACTIVATE);
+ public static final Transition ACTIVATE_COMPLETE = new Transition(State.ACTIVATING, State.ACTIVE, Event.AFTER_ACTIVATE);
+
+ public static final Transition CLOSE_INITIALISED = new Transition(State.INITIALISED, State.CLOSING, Event.BEFORE_CLOSE);;
public static final Transition CLOSE_ACTIVE = new Transition(State.ACTIVE, State.CLOSING, Event.BEFORE_CLOSE);
public static final Transition CLOSE_QUIESCED = new Transition(State.QUIESCED, State.CLOSING, Event.BEFORE_CLOSE);
public static final Transition CLOSE_COMPLETE = new Transition(State.CLOSING, State.CLOSED, Event.AFTER_CLOSE);
- public static final Transition QUIESCE = new Transition(State.ACTIVE, State.QUIESCING, Event.BEFORE_PASSIVATE);
- public static final Transition QUIESCE_COMPLETE = new Transition(State.QUIESCING, State.QUIESCED, Event.BEFORE_PASSIVATE);
- public static final Transition RESTART = new Transition(State.QUIESCED, State.RECOVERING, Event.BEFORE_ACTIVATE);
+
+ public static final Transition PASSIVATE = new Transition(State.ACTIVE, State.INITIALISED, Event.BEFORE_PASSIVATE);
+
+ public static final Transition QUIESCE = new Transition(State.ACTIVE, State.QUIESCING, Event.BEFORE_QUIESCE);
+ public static final Transition QUIESCE_COMPLETE = new Transition(State.QUIESCING, State.QUIESCED, Event.AFTER_QUIESCE);
+
+ public static final Transition RESTART = new Transition(State.QUIESCED, State.ACTIVATING, Event.BEFORE_RESTART);
public StateManager(final EventManager eventManager)
@@ -105,16 +114,6 @@ public class StateManager
return _state;
}
- public synchronized void stateTransition(final State current, final State desired)
- {
- if (_state != current)
- {
- throw new IllegalStateException("Cannot transition to the state: " + desired + "; need to be in state: " + current
- + "; currently in state: " + _state);
- }
- attainState(desired);
- }
-
public synchronized void attainState(State desired)
{
Transition transition = null;
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java Fri Aug 3 12:13:32 2012
@@ -31,6 +31,7 @@ import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.sql.Blob;
+import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.Driver;
import java.sql.DriverManager;
@@ -59,6 +60,7 @@ import org.apache.qpid.server.federation
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler;
import org.apache.qpid.server.store.ConfiguredObjectHelper;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.Event;
@@ -77,6 +79,9 @@ import org.apache.qpid.server.store.Stor
import org.apache.qpid.server.store.Transaction;
import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
import org.apache.qpid.server.store.TransactionLogResource;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
/**
* An implementation of a {@link MessageStore} that uses Apache Derby as the persistence
@@ -111,6 +116,7 @@ public class DerbyMessageStore implement
private static Class<Driver> DRIVER_CLASS;
+ public static final String MEMORY_STORE_LOCATION = ":memory:";
private final AtomicLong _messageId = new AtomicLong(0);
private AtomicBoolean _closed = new AtomicBoolean(false);
@@ -230,10 +236,17 @@ public class DerbyMessageStore implement
private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006";
+ private static final String DERBY_STORE_TYPE = "DERBY";
+
private final StateManager _stateManager;
-
+
private final EventManager _eventManager = new EventManager();
+ private long _totalStoreSize;
+ private boolean _limitBusted;
+ private long _persistentSizeLowThreshold;
+ private long _persistentSizeHighThreshold;
+
private MessageStoreRecoveryHandler _messageRecoveryHandler;
private TransactionLogRecoveryHandler _tlogRecoveryHandler;
@@ -253,7 +266,7 @@ public class DerbyMessageStore implement
ConfigurationRecoveryHandler configRecoveryHandler,
Configuration storeConfiguration) throws Exception
{
- _stateManager.attainState(State.CONFIGURING);
+ _stateManager.attainState(State.INITIALISING);
_configRecoveryHandler = configRecoveryHandler;
commonConfiguration(name, storeConfiguration);
@@ -269,13 +282,13 @@ public class DerbyMessageStore implement
_tlogRecoveryHandler = tlogRecoveryHandler;
_messageRecoveryHandler = recoveryHandler;
- _stateManager.attainState(State.CONFIGURED);
+ _stateManager.attainState(State.INITIALISED);
}
@Override
public void activate() throws Exception
{
- _stateManager.attainState(State.RECOVERING);
+ _stateManager.attainState(State.ACTIVATING);
// this recovers durable exchanges, queues, and bindings
recoverConfiguration(_configRecoveryHandler);
@@ -296,19 +309,39 @@ public class DerbyMessageStore implement
final String databasePath = storeConfiguration.getString(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY, System.getProperty("QPID_WORK")
+ File.separator + "derbyDB");
- File environmentPath = new File(databasePath);
- if (!environmentPath.exists())
+ if(!MEMORY_STORE_LOCATION.equals(databasePath))
{
- if (!environmentPath.mkdirs())
+ File environmentPath = new File(databasePath);
+ if (!environmentPath.exists())
{
- throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
- + "Ensure the path is correct and that the permissions are correct.");
+ if (!environmentPath.mkdirs())
+ {
+ throw new IllegalArgumentException("Environment path " + environmentPath + " could not be read or created. "
+ + "Ensure the path is correct and that the permissions are correct.");
+ }
}
}
_storeLocation = databasePath;
+ _persistentSizeHighThreshold = storeConfiguration.getLong(MessageStoreConstants.OVERFULL_SIZE_PROPERTY, -1l);
+ _persistentSizeLowThreshold = storeConfiguration.getLong(MessageStoreConstants.UNDERFULL_SIZE_PROPERTY, _persistentSizeHighThreshold);
+ if(_persistentSizeLowThreshold > _persistentSizeHighThreshold || _persistentSizeLowThreshold < 0l)
+ {
+ _persistentSizeLowThreshold = _persistentSizeHighThreshold;
+ }
+
createOrOpenDatabase(name, databasePath);
+
+ Connection conn = newAutoCommitConnection();;
+ try
+ {
+ _totalStoreSize = getSizeOnDisk(conn);
+ }
+ finally
+ {
+ conn.close();
+ }
}
private static synchronized void initialiseDriver() throws ClassNotFoundException
@@ -322,7 +355,7 @@ public class DerbyMessageStore implement
private void createOrOpenDatabase(String name, final String environmentPath) throws SQLException
{
//FIXME this the _vhost name should not be added here, but derby wont use an empty directory as was possibly just created.
- _connectionURL = "jdbc:derby:" + environmentPath + "/" + name + ";create=true";
+ _connectionURL = "jdbc:derby" + (environmentPath.equals(MEMORY_STORE_LOCATION) ? environmentPath : ":" + environmentPath + "/") + name + ";create=true";
Connection conn = newAutoCommitConnection();
@@ -529,16 +562,17 @@ public class DerbyMessageStore implement
try
{
List<ConfiguredObjectRecord> configuredObjects = loadConfiguredObjects();
- ConfigurationRecoveryHandler.QueueRecoveryHandler qrh = recoveryHandler.begin(this);
- _configuredObjectHelper.recoverQueues(qrh, configuredObjects);
- ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh = qrh.completeQueueRecovery();
+ ExchangeRecoveryHandler erh = recoveryHandler.begin(this);
_configuredObjectHelper.recoverExchanges(erh, configuredObjects);
- ConfigurationRecoveryHandler.BindingRecoveryHandler brh = erh.completeExchangeRecovery();
+ QueueRecoveryHandler qrh = erh.completeExchangeRecovery();
+ _configuredObjectHelper.recoverQueues(qrh, configuredObjects);
+
+ BindingRecoveryHandler brh = qrh.completeQueueRecovery();
_configuredObjectHelper.recoverBindings(brh, configuredObjects);
- ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
+ BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
recoverBrokerLinks(lrh);
}
catch (SQLException e)
@@ -689,7 +723,7 @@ public class DerbyMessageStore implement
public void close() throws Exception
{
_closed.getAndSet(true);
- _stateManager.stateTransition(State.ACTIVE, State.CLOSING);
+ _stateManager.attainState(State.CLOSING);
try
{
@@ -710,7 +744,7 @@ public class DerbyMessageStore implement
}
}
- _stateManager.stateTransition(State.CLOSING, State.CLOSED);
+ _stateManager.attainState(State.CLOSED);
}
@Override
@@ -956,8 +990,8 @@ public class DerbyMessageStore implement
try
{
- stmt.setLong(1, link.getId().getLeastSignificantBits());
- stmt.setLong(2, link.getId().getMostSignificantBits());
+ stmt.setLong(1, link.getQMFId().getLeastSignificantBits());
+ stmt.setLong(2, link.getQMFId().getMostSignificantBits());
ResultSet rs = stmt.executeQuery();
try
{
@@ -970,8 +1004,8 @@ public class DerbyMessageStore implement
try
{
- insertStmt.setLong(1, link.getId().getLeastSignificantBits());
- insertStmt.setLong(2, link.getId().getMostSignificantBits());
+ insertStmt.setLong(1, link.getQMFId().getLeastSignificantBits());
+ insertStmt.setLong(2, link.getQMFId().getMostSignificantBits());
insertStmt.setLong(3, link.getCreateTime());
byte[] argumentBytes = convertStringMapToBytes(link.getArguments());
@@ -1048,8 +1082,8 @@ public class DerbyMessageStore implement
{
conn = newAutoCommitConnection();
stmt = conn.prepareStatement(DELETE_FROM_LINKS);
- stmt.setLong(1, link.getId().getLeastSignificantBits());
- stmt.setLong(2, link.getId().getMostSignificantBits());
+ stmt.setLong(1, link.getQMFId().getLeastSignificantBits());
+ stmt.setLong(2, link.getQMFId().getMostSignificantBits());
int results = stmt.executeUpdate();
if (results == 0)
@@ -1085,7 +1119,7 @@ public class DerbyMessageStore implement
try
{
- UUID id = bridge.getId();
+ UUID id = bridge.getQMFId();
stmt.setLong(1, id.getLeastSignificantBits());
stmt.setLong(2, id.getMostSignificantBits());
ResultSet rs = stmt.executeQuery();
@@ -1105,7 +1139,7 @@ public class DerbyMessageStore implement
insertStmt.setLong(3, bridge.getCreateTime());
- UUID linkId = bridge.getLink().getId();
+ UUID linkId = bridge.getLink().getQMFId();
insertStmt.setLong(4, linkId.getLeastSignificantBits());
insertStmt.setLong(5, linkId.getMostSignificantBits());
@@ -1151,8 +1185,8 @@ public class DerbyMessageStore implement
{
conn = newAutoCommitConnection();
stmt = conn.prepareStatement(DELETE_FROM_BRIDGES);
- stmt.setLong(1, bridge.getId().getLeastSignificantBits());
- stmt.setLong(2, bridge.getId().getMostSignificantBits());
+ stmt.setLong(1, bridge.getQMFId().getLeastSignificantBits());
+ stmt.setLong(2, bridge.getQMFId().getMostSignificantBits());
int results = stmt.executeUpdate();
if (results == 0)
@@ -1541,7 +1575,7 @@ public class DerbyMessageStore implement
buf = buf.slice();
MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
- StoredDerbyMessage message = new StoredDerbyMessage(messageId, metaData, false);
+ StoredDerbyMessage message = new StoredDerbyMessage(messageId, metaData, true);
messageHandler.message(message);
}
@@ -1921,6 +1955,7 @@ public class DerbyMessageStore implement
private class DerbyTransaction implements Transaction
{
private final ConnectionWrapper _connWrapper;
+ private int _storeSizeIncrease;
private DerbyTransaction()
@@ -1938,18 +1973,19 @@ public class DerbyMessageStore implement
@Override
public void enqueueMessage(TransactionLogResource queue, EnqueableMessage message) throws AMQStoreException
{
- if(message.getStoredMessage() instanceof StoredDerbyMessage)
+ final StoredMessage storedMessage = message.getStoredMessage();
+ if(storedMessage instanceof StoredDerbyMessage)
{
try
{
- ((StoredDerbyMessage)message.getStoredMessage()).store(_connWrapper.getConnection());
+ ((StoredDerbyMessage) storedMessage).store(_connWrapper.getConnection());
}
catch (SQLException e)
{
throw new AMQStoreException("Exception on enqueuing message " + _messageId, e);
}
}
-
+ _storeSizeIncrease += storedMessage.getMetaData().getContentSize();
DerbyMessageStore.this.enqueueMessage(_connWrapper, queue, message.getMessageNumber());
}
@@ -1964,12 +2000,15 @@ public class DerbyMessageStore implement
public void commitTran() throws AMQStoreException
{
DerbyMessageStore.this.commitTran(_connWrapper);
+ storedSizeChange(_storeSizeIncrease);
}
@Override
public StoreFuture commitTranAsync() throws AMQStoreException
{
- return DerbyMessageStore.this.commitTranAsync(_connWrapper);
+ final StoreFuture storeFuture = DerbyMessageStore.this.commitTranAsync(_connWrapper);
+ storedSizeChange(_storeSizeIncrease);
+ return storeFuture;
}
@Override
@@ -1998,6 +2037,8 @@ public class DerbyMessageStore implement
{
private final long _messageId;
+ private final boolean _isRecovered;
+
private StorableMessageMetaData _metaData;
private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
private byte[] _data;
@@ -2006,21 +2047,21 @@ public class DerbyMessageStore implement
StoredDerbyMessage(long messageId, StorableMessageMetaData metaData)
{
- this(messageId, metaData, true);
+ this(messageId, metaData, false);
}
StoredDerbyMessage(long messageId,
- StorableMessageMetaData metaData, boolean persist)
+ StorableMessageMetaData metaData, boolean isRecovered)
{
_messageId = messageId;
+ _isRecovered = isRecovered;
-
- _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
- if(persist)
+ if(!_isRecovered)
{
_metaData = metaData;
}
+ _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
}
@Override
@@ -2101,16 +2142,17 @@ public class DerbyMessageStore implement
@Override
public synchronized StoreFuture flushToStore()
{
+ Connection conn = null;
try
{
- if(_metaData != null)
+ if(!stored())
{
- Connection conn = newConnection();
+ conn = newConnection();
store(conn);
conn.commit();
- conn.close();
+ storedSizeChange(getMetaData().getContentSize());
}
}
catch (SQLException e)
@@ -2121,12 +2163,24 @@ public class DerbyMessageStore implement
}
throw new RuntimeException(e);
}
+ finally
+ {
+ closeConnection(conn);
+ }
return StoreFuture.IMMEDIATE_FUTURE;
}
+ @Override
+ public void remove()
+ {
+ int delta = getMetaData().getContentSize();
+ DerbyMessageStore.this.removeMessage(_messageId);
+ storedSizeChange(-delta);
+ }
+
private synchronized void store(final Connection conn) throws SQLException
{
- if(_metaData != null)
+ if (!stored())
{
try
{
@@ -2139,18 +2193,17 @@ public class DerbyMessageStore implement
_metaData = null;
_data = null;
}
- }
- if(_logger.isDebugEnabled())
- {
- _logger.debug("Storing message " + _messageId + " to store");
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Storing message " + _messageId + " to store");
+ }
}
}
- @Override
- public void remove()
+ private boolean stored()
{
- DerbyMessageStore.this.removeMessage(_messageId);
+ return _metaData == null || _isRecovered;
}
}
@@ -2446,4 +2499,181 @@ public class DerbyMessageStore implement
}
return results;
}
+
+ private synchronized void storedSizeChange(final int delta)
+ {
+ if(getPersistentSizeHighThreshold() > 0)
+ {
+ synchronized(this)
+ {
+ // the delta supplied is an approximation of a store size change. we don;t want to check the statistic every
+ // time, so we do so only when there's been enough change that it is worth looking again. We do this by
+ // assuming the total size will change by less than twice the amount of the message data change.
+ long newSize = _totalStoreSize += 3*delta;
+
+ Connection conn = null;
+ try
+ {
+
+ if(!_limitBusted && newSize > getPersistentSizeHighThreshold())
+ {
+ conn = newAutoCommitConnection();
+ _totalStoreSize = getSizeOnDisk(conn);
+ if(_totalStoreSize > getPersistentSizeHighThreshold())
+ {
+ _limitBusted = true;
+ _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_OVERFULL);
+ }
+ }
+ else if(_limitBusted && newSize < getPersistentSizeLowThreshold())
+ {
+ long oldSize = _totalStoreSize;
+ conn = newAutoCommitConnection();
+ _totalStoreSize = getSizeOnDisk(conn);
+ if(oldSize <= _totalStoreSize)
+ {
+
+ reduceSizeOnDisk(conn);
+
+ _totalStoreSize = getSizeOnDisk(conn);
+ }
+
+ if(_totalStoreSize < getPersistentSizeLowThreshold())
+ {
+ _limitBusted = false;
+ _eventManager.notifyEvent(Event.PERSISTENT_MESSAGE_SIZE_UNDERFULL);
+ }
+
+
+ }
+ }
+ catch (SQLException e)
+ {
+ closeConnection(conn);
+ throw new RuntimeException("Exception will processing store size change", e);
+ }
+ }
+ }
+ }
+
+ private void reduceSizeOnDisk(Connection conn)
+ {
+ CallableStatement cs = null;
+ PreparedStatement stmt = null;
+ try
+ {
+ String tableQuery =
+ "SELECT S.SCHEMANAME, T.TABLENAME FROM SYS.SYSSCHEMAS S, SYS.SYSTABLES T WHERE S.SCHEMAID = T.SCHEMAID AND T.TABLETYPE='T'";
+ stmt = conn.prepareStatement(tableQuery);
+ ResultSet rs = null;
+
+ List<String> schemas = new ArrayList<String>();
+ List<String> tables = new ArrayList<String>();
+
+ try
+ {
+ rs = stmt.executeQuery();
+ while(rs.next())
+ {
+ schemas.add(rs.getString(1));
+ tables.add(rs.getString(2));
+ }
+ }
+ finally
+ {
+ if(rs != null)
+ {
+ rs.close();
+ }
+ }
+
+
+ cs = conn.prepareCall
+ ("CALL SYSCS_UTIL.SYSCS_COMPRESS_TABLE(?, ?, ?)");
+
+ for(int i = 0; i < schemas.size(); i++)
+ {
+ cs.setString(1, schemas.get(i));
+ cs.setString(2, tables.get(i));
+ cs.setShort(3, (short) 0);
+ cs.execute();
+ }
+ }
+ catch (SQLException e)
+ {
+ closeConnection(conn);
+ throw new RuntimeException("Error reducing on disk size", e);
+ }
+ finally
+ {
+ closePreparedStatement(stmt);
+ closePreparedStatement(cs);
+ }
+
+ }
+
+ private long getSizeOnDisk(Connection conn)
+ {
+ PreparedStatement stmt = null;
+ try
+ {
+ String sizeQuery = "SELECT SUM(T2.NUMALLOCATEDPAGES * T2.PAGESIZE) TOTALSIZE" +
+ " FROM " +
+ " SYS.SYSTABLES systabs," +
+ " TABLE (SYSCS_DIAG.SPACE_TABLE(systabs.tablename)) AS T2" +
+ " WHERE systabs.tabletype = 'T'";
+
+ stmt = conn.prepareStatement(sizeQuery);
+
+ ResultSet rs = null;
+ long size = 0l;
+
+ try
+ {
+ rs = stmt.executeQuery();
+ while(rs.next())
+ {
+ size = rs.getLong(1);
+ }
+ }
+ finally
+ {
+ if(rs != null)
+ {
+ rs.close();
+ }
+ }
+
+ return size;
+
+ }
+ catch (SQLException e)
+ {
+ closeConnection(conn);
+ throw new RuntimeException("Error establishing on disk size", e);
+ }
+ finally
+ {
+ closePreparedStatement(stmt);
+ }
+
+ }
+
+
+ private long getPersistentSizeLowThreshold()
+ {
+ return _persistentSizeLowThreshold;
+ }
+
+ private long getPersistentSizeHighThreshold()
+ {
+ return _persistentSizeHighThreshold;
+ }
+
+ @Override
+ public String getStoreType()
+ {
+ return DERBY_STORE_TYPE;
+ }
+
}
\ No newline at end of file
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/AssignedSubscriptionMessageGroupManager.java Fri Aug 3 12:13:32 2012
@@ -20,10 +20,10 @@
*/
package org.apache.qpid.server.subscription;
+import org.apache.qpid.server.queue.QueueEntryVisitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import java.util.Iterator;
@@ -102,7 +102,7 @@ public class AssignedSubscriptionMessage
return visitor.getEntry();
}
- private class EntryFinder implements AMQQueue.Visitor
+ private class EntryFinder implements QueueEntryVisitor
{
private QueueEntry _entry;
private Subscription _sub;
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/DefinedGroupMessageGroupManager.java Fri Aug 3 12:13:32 2012
@@ -20,12 +20,12 @@
*/
package org.apache.qpid.server.subscription;
+import org.apache.qpid.server.queue.QueueEntryVisitor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.ServerMessage;
-import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import java.util.HashMap;
@@ -176,7 +176,7 @@ public class DefinedGroupMessageGroupMan
return visitor.getEntry();
}
- private class EntryFinder implements AMQQueue.Visitor
+ private class EntryFinder implements QueueEntryVisitor
{
private QueueEntry _entry;
private Subscription _sub;
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java Fri Aug 3 12:13:32 2012
@@ -45,7 +45,7 @@ class ExplicitAcceptDispositionChangeLis
final Subscription_0_10 subscription = getSubscription();
if(subscription != null && _entry.isAcquiredBy(_sub))
{
- subscription.getSession().acknowledge(subscription, _entry);
+ subscription.getSessionModel().acknowledge(subscription, _entry);
}
else
{
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java Fri Aug 3 12:13:32 2012
@@ -72,6 +72,10 @@ class ImplicitAcceptDispositionChangeLis
public boolean acquire()
{
boolean acquired = _entry.acquire(getSubscription());
+ if(acquired)
+ {
+ getSubscription().recordUnacknowledged(_entry);
+ }
return acquired;
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription.java Fri Aug 3 12:13:32 2012
@@ -23,6 +23,7 @@ package org.apache.qpid.server.subscript
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
@@ -32,6 +33,14 @@ public interface Subscription
boolean isTransient();
+ long getBytesOut();
+
+ long getMessagesOut();
+
+ long getUnacknowledgedBytes();
+
+ long getUnacknowledgedMessages();
+
public static enum State
{
ACTIVE,
@@ -45,6 +54,7 @@ public interface Subscription
}
AMQQueue getQueue();
+ AMQSessionModel getSessionModel();
QueueEntry.SubscriptionAcquiredState getOwningState();
QueueEntry.SubscriptionAssignedState getAssignedState();
@@ -108,4 +118,6 @@ public interface Subscription
boolean isSessionTransactional();
void queueEmpty() throws AMQException;
+
+ String getConsumerName();
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java Fri Aug 3 12:13:32 2012
@@ -44,6 +44,7 @@ import org.apache.qpid.server.logging.su
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
@@ -75,7 +76,7 @@ public abstract class SubscriptionImpl i
private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
- private AMQQueue.Context _queueContext;
+ private volatile AMQQueue.Context _queueContext;
private final ClientDeliveryMethod _deliveryMethod;
private final RecordDeliveryMethod _recordMethod;
@@ -90,10 +91,15 @@ public abstract class SubscriptionImpl i
private final long _subscriptionID;
private LogSubject _logSubject;
private LogActor _logActor;
- private UUID _id;
+ private UUID _qmfId;
private final AtomicLong _deliveredCount = new AtomicLong(0);
- private long _createTime = System.currentTimeMillis();
+ private final AtomicLong _deliveredBytes = new AtomicLong(0);
+
+ private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
+ private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
+ private long _createTime = System.currentTimeMillis();
+
static final class BrowserSubscription extends SubscriptionImpl
{
@@ -276,22 +282,13 @@ public abstract class SubscriptionImpl i
public void send(QueueEntry entry, boolean batch) throws AMQException
{
- // if we do not need to wait for client acknowledgements
- // we can decrement the reference count immediately.
-
- // By doing this _before_ the send we ensure that it
- // doesn't get sent if it can't be dequeued, preventing
- // duplicate delivery on recovery.
-
- // The send may of course still fail, in which case, as
- // the message is unacked, it will be lost.
-
+
synchronized (getChannel())
{
getChannel().getProtocolSession().setDeferFlush(batch);
long deliveryTag = getChannel().getNextDeliveryTag();
-
+ addUnacknowledgedMessage(entry);
recordMessageDelivery(entry, deliveryTag);
sendToClient(entry, deliveryTag);
@@ -371,6 +368,11 @@ public abstract class SubscriptionImpl i
}
+ public AMQSessionModel getSessionModel()
+ {
+ return _channel;
+ }
+
public ConfigStore getConfigStore()
{
return getQueue().getConfigStore();
@@ -389,7 +391,7 @@ public abstract class SubscriptionImpl i
}
_queue = queue;
- _id = getConfigStore().createId();
+ _qmfId = getConfigStore().createId();
getConfigStore().addConfiguredObject(this);
_logSubject = new SubscriptionLogSubject(this);
@@ -468,11 +470,6 @@ public abstract class SubscriptionImpl i
_deleted.set(true);
}
- public boolean filtersMessages()
- {
- return _filters != null || _noLocal;
- }
-
public boolean hasInterest(QueueEntry entry)
{
//check that the message hasn't been rejected
@@ -508,13 +505,6 @@ public abstract class SubscriptionImpl i
}
- private String id = String.valueOf(System.identityHashCode(this));
-
- private String debugIdentity()
- {
- return id;
- }
-
private boolean checkFilters(QueueEntry msg)
{
return (_filters == null) || _filters.allAllow(msg);
@@ -599,6 +589,11 @@ public abstract class SubscriptionImpl i
return _consumerTag;
}
+ public String getConsumerName()
+ {
+ return _consumerTag == null ? null : _consumerTag.asString();
+ }
+
public long getSubscriptionID()
{
return _subscriptionID;
@@ -687,6 +682,7 @@ public abstract class SubscriptionImpl i
{
_deliveryMethod.deliverToClient(this,entry,deliveryTag);
_deliveredCount.incrementAndGet();
+ _deliveredBytes.addAndGet(entry.getSize());
}
@@ -771,9 +767,10 @@ public abstract class SubscriptionImpl i
return true;
}
- public UUID getId()
+ @Override
+ public UUID getQMFId()
{
- return _id;
+ return _qmfId;
}
public boolean isDurable()
@@ -832,4 +829,44 @@ public abstract class SubscriptionImpl i
_channel.getProtocolSession().flushBatched();
}
+
+ public long getBytesOut()
+ {
+ return _deliveredBytes.longValue();
+ }
+
+ public long getMessagesOut()
+ {
+ return _deliveredCount.longValue();
+ }
+
+
+ protected void addUnacknowledgedMessage(QueueEntry entry)
+ {
+ final long size = entry.getSize();
+ _unacknowledgedBytes.addAndGet(size);
+ _unacknowledgedCount.incrementAndGet();
+ entry.addStateChangeListener(new QueueEntry.StateChangeListener()
+ {
+ public void stateChanged(QueueEntry entry, QueueEntry.State oldState, QueueEntry.State newState)
+ {
+ if(oldState.equals(QueueEntry.State.ACQUIRED) && !newState.equals(QueueEntry.State.ACQUIRED))
+ {
+ _unacknowledgedBytes.addAndGet(-size);
+ _unacknowledgedCount.decrementAndGet();
+ entry.removeStateChangeListener(this);
+ }
+ }
+ });
+ }
+
+ public long getUnacknowledgedBytes()
+ {
+ return _unacknowledgedBytes.longValue();
+ }
+
+ public long getUnacknowledgedMessages()
+ {
+ return _unacknowledgedCount.longValue();
+ }
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java Fri Aug 3 12:13:32 2012
@@ -98,7 +98,7 @@ public class Subscription_0_10 implement
private final Lock _stateChangeLock = new ReentrantLock();
private final AtomicReference<State> _state = new AtomicReference<State>(State.ACTIVE);
- private AMQQueue.Context _queueContext;
+ private volatile AMQQueue.Context _queueContext;
private final AtomicBoolean _deleted = new AtomicBoolean(false);
@@ -125,11 +125,15 @@ public class Subscription_0_10 implement
private LogActor _logActor;
private final Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
- private UUID _id;
+ private UUID _qmfId;
private String _traceExclude;
private String _trace;
private final long _createTime = System.currentTimeMillis();
private final AtomicLong _deliveredCount = new AtomicLong(0);
+ private final AtomicLong _deliveredBytes = new AtomicLong(0);
+ private final AtomicLong _unacknowledgedCount = new AtomicLong(0);
+ private final AtomicLong _unacknowledgedBytes = new AtomicLong(0);
+
private final Map<String, Object> _arguments;
private int _deferredMessageCredit;
private long _deferredSizeCredit;
@@ -185,10 +189,10 @@ public class Subscription_0_10 implement
}
_queue = queue;
- Map<String, Object> arguments = queue.getArguments() == null ? Collections.EMPTY_MAP : queue.getArguments();
+ Map<String, Object> arguments = queue.getArguments();
_traceExclude = (String) arguments.get("qpid.trace.exclude");
_trace = (String) arguments.get("qpid.trace.id");
- _id = getConfigStore().createId();
+ _qmfId = getConfigStore().createId();
getConfigStore().addConfiguredObject(this);
String filterLogString = null;
@@ -199,9 +203,13 @@ public class Subscription_0_10 implement
CurrentActor.get().message(this, SubscriptionMessages.CREATE(filterLogString, queue.isDurable() && exclusive,
filterLogString.length() > 0));
}
-
}
+ public String getConsumerName()
+ {
+ return _destination;
+ }
+
public boolean isSuspended()
{
return !isActive() || _deleted.get() || _session.isClosing(); // TODO check for Session suspension
@@ -620,10 +628,15 @@ public class Subscription_0_10 implement
_session.sendMessage(xfr, _postIdSettingAction);
entry.incrementDeliveryCount();
_deliveredCount.incrementAndGet();
+ _deliveredBytes.addAndGet(entry.getSize());
if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED)
{
forceDequeue(entry, false);
}
+ else if(_acquireMode == MessageAcquireMode.PRE_ACQUIRED)
+ {
+ recordUnacknowledged(entry);
+ }
}
else
{
@@ -632,6 +645,12 @@ public class Subscription_0_10 implement
}
}
+ void recordUnacknowledged(QueueEntry entry)
+ {
+ _unacknowledgedCount.incrementAndGet();
+ _unacknowledgedBytes.addAndGet(entry.getSize());
+ }
+
private void deferredAddCredit(final int deferredMessageCredit, final long deferredSizeCredit)
{
_deferredMessageCredit += deferredMessageCredit;
@@ -653,7 +672,7 @@ public class Subscription_0_10 implement
private void forceDequeue(final QueueEntry entry, final boolean restoreCredit)
{
- AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore());
+ AutoCommitTransaction dequeueTxn = new AutoCommitTransaction(getQueue().getVirtualHost().getMessageStore());
dequeueTxn.dequeue(entry.getQueue(), entry.getMessage(),
new ServerTransaction.Action()
{
@@ -690,7 +709,7 @@ public class Subscription_0_10 implement
entry.setRedelivered();
}
- if (getSession().isClosing() || !setRedelivered)
+ if (getSessionModel().isClosing() || !setRedelivered)
{
entry.decrementDeliveryCount();
}
@@ -918,6 +937,8 @@ public class Subscription_0_10 implement
// TODO Fix Store Context / cleanup
if(entry.isAcquiredBy(this))
{
+ _unacknowledgedBytes.addAndGet(-entry.getSize());
+ _unacknowledgedCount.decrementAndGet();
entry.discard();
}
}
@@ -944,7 +965,7 @@ public class Subscription_0_10 implement
return false;
}
- ServerSession getSession()
+ public ServerSession getSessionModel()
{
return _session;
}
@@ -952,7 +973,7 @@ public class Subscription_0_10 implement
public SessionConfig getSessionConfig()
{
- return getSession();
+ return getSessionModel();
}
public boolean isBrowsing()
@@ -990,9 +1011,10 @@ public class Subscription_0_10 implement
return _flowMode.toString();
}
- public UUID getId()
+ @Override
+ public UUID getQMFId()
{
- return _id;
+ return _qmfId;
}
public String getName()
@@ -1073,4 +1095,24 @@ public class Subscription_0_10 implement
{
_session.getConnection().flush();
}
+
+ public long getBytesOut()
+ {
+ return _deliveredBytes.longValue();
+ }
+
+ public long getMessagesOut()
+ {
+ return _deliveredCount.longValue();
+ }
+
+ public long getUnacknowledgedBytes()
+ {
+ return _unacknowledgedBytes.longValue();
+ }
+
+ public long getUnacknowledgedMessages()
+ {
+ return _unacknowledgedCount.longValue();
+ }
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/QpidAcceptor.java Fri Aug 3 12:13:32 2012
@@ -20,25 +20,62 @@
*/
package org.apache.qpid.server.transport;
+import org.apache.qpid.server.protocol.AmqpProtocolVersion;
import org.apache.qpid.transport.network.NetworkTransport;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
public class QpidAcceptor
{
- private NetworkTransport _transport;
- private String _protocol;
- public QpidAcceptor(NetworkTransport transport, String protocol)
+ public enum Transport
{
- _transport = transport;
- _protocol = protocol;
+ TCP("TCP"),
+ SSL("TCP/SSL");
+
+ private final String _asString;
+
+ Transport(String asString)
+ {
+ _asString = asString;
+ }
+
+ public String toString()
+ {
+ return _asString;
+ }
+ }
+
+ private NetworkTransport _networkTransport;
+ private Transport _transport;
+ private Set<AmqpProtocolVersion> _supported;
+
+
+ public QpidAcceptor(NetworkTransport transport, Transport protocol, Set<AmqpProtocolVersion> supported)
+ {
+ _networkTransport = transport;
+ _transport = protocol;
+ _supported = Collections.unmodifiableSet(new HashSet<AmqpProtocolVersion>(supported));
}
public NetworkTransport getNetworkTransport()
{
+ return _networkTransport;
+ }
+
+ public Transport getTransport()
+ {
return _transport;
}
+ public Set<AmqpProtocolVersion> getSupported()
+ {
+ return _supported;
+ }
+
public String toString()
{
- return _protocol;
+ return _transport.toString();
}
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnection.java Fri Aug 3 12:13:32 2012
@@ -20,6 +20,14 @@
*/
package org.apache.qpid.server.transport;
+import java.security.Principal;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import javax.security.auth.Subject;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.configuration.ConnectionConfig;
@@ -28,12 +36,9 @@ import org.apache.qpid.server.logging.Lo
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
-import org.apache.qpid.server.management.Managable;
-import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
-import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Connection;
@@ -48,17 +53,7 @@ import static org.apache.qpid.server.log
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT;
-import javax.management.JMException;
-import javax.security.auth.Subject;
-import java.security.Principal;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
-public class ServerConnection extends Connection implements Managable, AMQConnectionModel, LogSubject, AuthorizationHolder
+public class ServerConnection extends Connection implements AMQConnectionModel, LogSubject, AuthorizationHolder
{
private ConnectionConfig _config;
private Runnable _onOpenTask;
@@ -67,24 +62,19 @@ public class ServerConnection extends Co
private Subject _authorizedSubject = null;
private Principal _authorizedPrincipal = null;
- private boolean _statisticsEnabled = false;
private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
private final long _connectionId;
private final Object _reference = new Object();
- private ServerConnectionMBean _mBean;
private VirtualHost _virtualHost;
private AtomicLong _lastIoTime = new AtomicLong();
-
+ private boolean _blocking;
+ private Principal _peerPrincipal;
+
public ServerConnection(final long connectionId)
{
_connectionId = connectionId;
}
- public UUID getId()
- {
- return _config.getId();
- }
-
public Object getReference()
{
return _reference;
@@ -100,12 +90,12 @@ public class ServerConnection extends Co
protected void setState(State state)
{
super.setState(state);
-
+
if (state == State.OPEN)
{
if (_onOpenTask != null)
{
- _onOpenTask.run();
+ _onOpenTask.run();
}
_actor.message(ConnectionMessages.OPEN(getClientId(), "0-10", getClientVersion(), true, true, true));
@@ -118,7 +108,6 @@ public class ServerConnection extends Co
{
_virtualHost.getConnectionRegistry().deregisterConnection(this);
}
- unregisterConnectionMbean();
}
if (state == State.CLOSED)
@@ -156,8 +145,6 @@ public class ServerConnection extends Co
_virtualHost = virtualHost;
initialiseStatistics();
-
- registerConnectionMbean();
}
public void setConnectionConfig(final ConnectionConfig config)
@@ -193,7 +180,7 @@ public class ServerConnection extends Co
((ServerSession)session).close();
}
-
+
public LogSubject getLogSubject()
{
return (LogSubject) this;
@@ -273,7 +260,6 @@ public class ServerConnection extends Co
public void close(AMQConstant cause, String message) throws AMQException
{
closeSubscriptions();
- unregisterConnectionMbean();
ConnectionCloseCode replyCode = ConnectionCloseCode.NORMAL;
try
{
@@ -286,6 +272,46 @@ public class ServerConnection extends Co
close(replyCode, message);
}
+ public synchronized void block()
+ {
+ if(!_blocking)
+ {
+ _blocking = true;
+ for(AMQSessionModel ssn : getSessionModels())
+ {
+ ssn.block();
+ }
+ }
+ }
+
+ public synchronized void unblock()
+ {
+ if(_blocking)
+ {
+ _blocking = false;
+ for(AMQSessionModel ssn : getSessionModels())
+ {
+ ssn.unblock();
+ }
+ }
+ }
+
+ @Override
+ public synchronized void registerSession(final Session ssn)
+ {
+ super.registerSession(ssn);
+ if(_blocking)
+ {
+ ((ServerSession)ssn).block();
+ }
+ }
+
+ @Override
+ public synchronized void removeSession(final Session ssn)
+ {
+ super.removeSession(ssn);
+ }
+
public List<AMQSessionModel> getSessionModels()
{
List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
@@ -298,44 +324,38 @@ public class ServerConnection extends Co
public void registerMessageDelivered(long messageSize)
{
- if (isStatisticsEnabled())
- {
- _messagesDelivered.registerEvent(1L);
- _dataDelivered.registerEvent(messageSize);
- }
+ _messagesDelivered.registerEvent(1L);
+ _dataDelivered.registerEvent(messageSize);
_virtualHost.registerMessageDelivered(messageSize);
}
public void registerMessageReceived(long messageSize, long timestamp)
{
- if (isStatisticsEnabled())
- {
- _messagesReceived.registerEvent(1L, timestamp);
- _dataReceived.registerEvent(messageSize, timestamp);
- }
+ _messagesReceived.registerEvent(1L, timestamp);
+ _dataReceived.registerEvent(messageSize, timestamp);
_virtualHost.registerMessageReceived(messageSize, timestamp);
}
-
+
public StatisticsCounter getMessageReceiptStatistics()
{
return _messagesReceived;
}
-
+
public StatisticsCounter getDataReceiptStatistics()
{
return _dataReceived;
}
-
+
public StatisticsCounter getMessageDeliveryStatistics()
{
return _messagesDelivered;
}
-
+
public StatisticsCounter getDataDeliveryStatistics()
{
return _dataDelivered;
}
-
+
public void resetStatistics()
{
_messagesDelivered.reset();
@@ -346,25 +366,12 @@ public class ServerConnection extends Co
public void initialiseStatistics()
{
- setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
- _virtualHost.getApplicationRegistry().getConfiguration().isStatisticsGenerationConnectionsEnabled());
-
_messagesDelivered = new StatisticsCounter("messages-delivered-" + getConnectionId());
_dataDelivered = new StatisticsCounter("data-delivered-" + getConnectionId());
_messagesReceived = new StatisticsCounter("messages-received-" + getConnectionId());
_dataReceived = new StatisticsCounter("data-received-" + getConnectionId());
}
- public boolean isStatisticsEnabled()
- {
- return _statisticsEnabled;
- }
-
- public void setStatisticsEnabled(boolean enabled)
- {
- _statisticsEnabled = enabled;
- }
-
/**
* @return authorizedSubject
*/
@@ -389,7 +396,7 @@ public class ServerConnection extends Co
else
{
_authorizedSubject = authorizedSubject;
- _authorizedPrincipal = UsernamePrincipal.getUsernamePrincipalFromSubject(_authorizedSubject);
+ _authorizedPrincipal = authorizedSubject.getPrincipals().iterator().next();
}
}
@@ -408,6 +415,11 @@ public class ServerConnection extends Co
return !super.hasSessionWithName(name);
}
+ public String getRemoteAddressString()
+ {
+ return getConfig().getAddress();
+ }
+
public String getUserName()
{
return _authorizedPrincipal.getName();
@@ -436,12 +448,6 @@ public class ServerConnection extends Co
}
}
-
- public ManagedObject getManagedObject()
- {
- return _mBean;
- }
-
@Override
public void send(ProtocolEvent event)
{
@@ -449,53 +455,38 @@ public class ServerConnection extends Co
super.send(event);
}
- public AtomicLong getLastIoTime()
+ public long getLastIoTime()
{
- return _lastIoTime;
+ return _lastIoTime.longValue();
}
- void checkForNotification()
+ public String getClientId()
{
- int channelsCount = getSessionModels().size();
- if (_mBean != null && channelsCount >= getConnectionDelegate().getChannelMax())
- {
- _mBean.notifyClients("Channel count (" + channelsCount + ") has reached the threshold value");
- }
+ return getConnectionDelegate().getClientId();
}
- private void registerConnectionMbean()
+ public String getClientVersion()
{
- try
- {
- _mBean = new ServerConnectionMBean(this);
- _mBean.register();
- }
- catch (JMException jme)
- {
- log.error("Unable to register mBean for ServerConnection", jme);
- }
+ return getConnectionDelegate().getClientVersion();
}
- private void unregisterConnectionMbean()
+ public String getPrincipalAsString()
{
- if (_mBean != null)
- {
- if (log.isDebugEnabled())
- {
- log.debug("Unregistering mBean for ServerConnection" + _mBean);
- }
- _mBean.unregister();
- _mBean = null;
- }
+ return getAuthorizedPrincipal().getName();
}
- public String getClientId()
+ public long getSessionCountLimit()
{
- return getConnectionDelegate().getClientId();
+ return getChannelMax();
}
- public String getClientVersion()
+ public Principal getPeerPrincipal()
{
- return getConnectionDelegate().getClientVersion();
+ return _peerPrincipal;
+ }
+
+ public void setPeerPrincipal(Principal peerPrincipal)
+ {
+ _peerPrincipal = peerPrincipal;
}
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/transport/ServerConnectionDelegate.java Fri Aug 3 12:13:32 2012
@@ -20,18 +20,26 @@
*/
package org.apache.qpid.server.transport;
-import static org.apache.qpid.transport.Connection.State.CLOSE_RCVD;
-
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
import org.apache.qpid.common.ServerPropertyNames;
import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.server.configuration.BrokerConfig;
import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.auth.AuthenticationResult;
import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
+import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
import org.apache.qpid.server.subscription.Subscription_0_10;
import org.apache.qpid.server.virtualhost.State;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -39,16 +47,7 @@ import org.apache.qpid.transport.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.security.sasl.SaslException;
-import javax.security.sasl.SaslServer;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.StringTokenizer;
+import static org.apache.qpid.transport.Connection.State.CLOSE_RCVD;
public class ServerConnectionDelegate extends ServerDelegate
{
@@ -58,22 +57,25 @@ public class ServerConnectionDelegate ex
private final IApplicationRegistry _appRegistry;
private int _maxNoOfChannels;
private Map<String,Object> _clientProperties;
+ private final AuthenticationManager _authManager;
- public ServerConnectionDelegate(IApplicationRegistry appRegistry, String localFQDN)
+ public ServerConnectionDelegate(IApplicationRegistry appRegistry, String localFQDN, AuthenticationManager authManager)
{
- this(createConnectionProperties(appRegistry.getBroker()), Collections.singletonList((Object)"en_US"), appRegistry, localFQDN);
+ this(createConnectionProperties(appRegistry.getBrokerConfig()), Collections.singletonList((Object)"en_US"), appRegistry, localFQDN, authManager);
}
- public ServerConnectionDelegate(Map<String, Object> properties,
+ private ServerConnectionDelegate(Map<String, Object> properties,
List<Object> locales,
IApplicationRegistry appRegistry,
- String localFQDN)
+ String localFQDN,
+ AuthenticationManager authManager)
{
- super(properties, parseToList(appRegistry.getAuthenticationManager().getMechanisms()), locales);
-
+ super(properties, parseToList(authManager.getMechanisms()), locales);
+
_appRegistry = appRegistry;
_localFQDN = localFQDN;
- _maxNoOfChannels = ApplicationRegistry.getInstance().getConfiguration().getMaxChannelCount();
+ _maxNoOfChannels = appRegistry.getConfiguration().getMaxChannelCount();
+ _authManager = authManager;
}
private static Map<String, Object> createConnectionProperties(final BrokerConfig brokerConfig)
@@ -108,18 +110,17 @@ public class ServerConnectionDelegate ex
return ssn;
}
- protected SaslServer createSaslServer(String mechanism) throws SaslException
+ protected SaslServer createSaslServer(Connection conn, String mechanism) throws SaslException
{
- return _appRegistry.getAuthenticationManager().createSaslServer(mechanism, _localFQDN);
+ return _authManager.createSaslServer(mechanism, _localFQDN, ((ServerConnection) conn).getPeerPrincipal());
}
protected void secure(final SaslServer ss, final Connection conn, final byte[] response)
{
- final AuthenticationResult authResult = _appRegistry.getAuthenticationManager().authenticate(ss, response);
final ServerConnection sconn = (ServerConnection) conn;
-
-
+ final AuthenticationResult authResult = _authManager.authenticate(ss, response);
+
if (AuthenticationStatus.SUCCESS.equals(authResult.getStatus()))
{
tuneAuthorizedConnection(sconn);
@@ -168,7 +169,7 @@ public class ServerConnectionDelegate ex
vhost = _appRegistry.getVirtualHostRegistry().getVirtualHost(vhostName);
SecurityManager.setThreadSubject(sconn.getAuthorizedSubject());
-
+
if(vhost != null)
{
sconn.setVirtualHost(vhost);
@@ -194,7 +195,7 @@ public class ServerConnectionDelegate ex
sconn.setState(Connection.State.CLOSING);
sconn.invoke(new ConnectionClose(ConnectionCloseCode.INVALID_PATH, "Unknown virtualhost '"+vhostName+"'"));
}
-
+
}
@Override
@@ -216,7 +217,7 @@ public class ServerConnectionDelegate ex
setConnectionTuneOkChannelMax(sconn, okChannelMax);
}
-
+
@Override
protected int getHeartbeatMax()
{
@@ -225,7 +226,7 @@ public class ServerConnectionDelegate ex
}
@Override
- protected int getChannelMax()
+ public int getChannelMax()
{
return _maxNoOfChannels;
}
@@ -265,7 +266,6 @@ public class ServerConnectionDelegate ex
if(isSessionNameUnique(atc.getName(), conn))
{
super.sessionAttach(conn, atc);
- ((ServerConnection)conn).checkForNotification();
}
else
{
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org