You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2013/02/28 17:14:57 UTC
svn commit: r1451244 [28/45] - in /qpid/branches/asyncstore: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/ cpp/bindings/qmf/python/
cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf2/rub...
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java Thu Feb 28 16:14:30 2013
@@ -23,7 +23,6 @@ package org.apache.qpid.server.protocol;
import java.io.PrintWriter;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
-import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.security.sasl.SaslException;
@@ -40,12 +39,10 @@ import org.apache.qpid.amqp_1_0.transpor
import org.apache.qpid.amqp_1_0.type.Binary;
import org.apache.qpid.amqp_1_0.type.FrameBody;
import org.apache.qpid.protocol.ServerProtocolEngine;
-import org.apache.qpid.server.configuration.ConfigStore;
-import org.apache.qpid.server.configuration.ConnectionConfigType;
+import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.protocol.v1_0.Connection_1_0;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
+import org.apache.qpid.server.security.SubjectCreator;
+import org.apache.qpid.server.virtualhost.VirtualHost;
import org.apache.qpid.transport.Sender;
import org.apache.qpid.transport.network.NetworkConnection;
@@ -53,8 +50,10 @@ public class ProtocolEngine_1_0_0_SASL i
{
private long _readBytes;
private long _writtenBytes;
- private final UUID _id;
- private final IApplicationRegistry _appRegistry;
+
+ private long _lastReadTime;
+ private long _lastWriteTime;
+ private final Broker _broker;
private long _createTime = System.currentTimeMillis();
private ConnectionEndpoint _conn;
private long _connectionId;
@@ -113,13 +112,11 @@ public class ProtocolEngine_1_0_0_SASL i
private State _state = State.A;
- public ProtocolEngine_1_0_0_SASL(final NetworkConnection networkDriver, final IApplicationRegistry appRegistry,
+ public ProtocolEngine_1_0_0_SASL(final NetworkConnection networkDriver, final Broker broker,
long id)
{
- _id = appRegistry.getConfigStore().createId();
_connectionId = id;
- _appRegistry = appRegistry;
-
+ _broker = broker;
if(networkDriver != null)
{
setNetworkConnection(networkDriver, networkDriver.getSender());
@@ -162,21 +159,17 @@ public class ProtocolEngine_1_0_0_SASL i
_network = network;
_sender = sender;
- Container container = new Container(_appRegistry.getBrokerId().toString());
+ Container container = new Container(_broker.getId().toString());
- _conn = new ConnectionEndpoint(container, asSaslServerProvider(ApplicationRegistry.getInstance()
- .getAuthenticationManager(getLocalAddress())));
- _conn.setConnectionEventListener(new Connection_1_0(_appRegistry, _conn, _connectionId));
+ VirtualHost virtualHost = _broker.getVirtualHostRegistry().getVirtualHost((String)_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST));
+ _conn = new ConnectionEndpoint(container, asSaslServerProvider(_broker.getSubjectCreator(getLocalAddress())));
_conn.setRemoteAddress(getRemoteAddress());
-
-
+ _conn.setConnectionEventListener(new Connection_1_0(virtualHost, _conn, _connectionId));
_conn.setFrameOutputHandler(this);
_conn.setSaslFrameOutput(this);
_conn.setOnSaslComplete(new Runnable()
{
-
-
public void run()
{
if(_conn.isAuthenticated())
@@ -201,14 +194,14 @@ public class ProtocolEngine_1_0_0_SASL i
}
- private SaslServerProvider asSaslServerProvider(final AuthenticationManager authenticationManager)
+ private SaslServerProvider asSaslServerProvider(final SubjectCreator subjectCreator)
{
return new SaslServerProvider()
{
@Override
public SaslServer getSaslServer(String mechanism, String fqdn) throws SaslException
{
- return authenticationManager.createSaslServer(mechanism, fqdn, null);
+ return subjectCreator.createSaslServer(mechanism, fqdn, null);
}
};
}
@@ -218,22 +211,6 @@ public class ProtocolEngine_1_0_0_SASL i
return getRemoteAddress().toString();
}
-
- public ConfigStore getConfigStore()
- {
- return _appRegistry.getConfigStore();
- }
-
- public UUID getId()
- {
- return _id;
- }
-
- public ConnectionConfigType getConfigType()
- {
- return ConnectionConfigType.getInstance();
- }
-
public boolean isDurable()
{
return false;
@@ -244,6 +221,7 @@ public class ProtocolEngine_1_0_0_SASL i
public synchronized void received(ByteBuffer msg)
{
+ _lastReadTime = System.currentTimeMillis();
if(RAW_LOGGER.isLoggable(Level.FINE))
{
ByteBuffer dup = msg.duplicate();
@@ -386,17 +364,14 @@ public class ProtocolEngine_1_0_0_SASL i
synchronized(_sendLock)
{
-
+ _lastWriteTime = System.currentTimeMillis();
if(FRAME_LOGGER.isLoggable(Level.FINE))
{
FRAME_LOGGER.fine("SEND[" + getRemoteAddress() + "|" + amqFrame.getChannel() + "] : " + amqFrame.getFrameBody());
}
-
_frameWriter.setValue(amqFrame);
-
-
ByteBuffer dup = ByteBuffer.allocate(_conn.getMaxFrameSize());
int size = _frameWriter.writeToBuffer(dup);
@@ -447,4 +422,13 @@ public class ProtocolEngine_1_0_0_SASL i
return _connectionId;
}
+ public long getLastReadTime()
+ {
+ return _lastReadTime;
+ }
+
+ public long getLastWriteTime()
+ {
+ return _lastWriteTime;
+ }
}
Propchange: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java:r1375509-1450773
Propchange: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0:r1375509-1450773
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Thu Feb 28 16:14:30 2013
@@ -31,7 +31,6 @@ import org.apache.qpid.protocol.AMQConst
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.registry.IApplicationRegistry;
import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -44,7 +43,6 @@ import static org.apache.qpid.server.log
public class Connection_1_0 implements ConnectionEventListener
{
- private IApplicationRegistry _appRegistry;
private VirtualHost _vhost;
private final ConnectionEndpoint _conn;
private final long _connectionId;
@@ -62,10 +60,9 @@ public class Connection_1_0 implements C
- public Connection_1_0(IApplicationRegistry appRegistry, ConnectionEndpoint conn, long connectionId)
+ public Connection_1_0(VirtualHost virtualHost, ConnectionEndpoint conn, long connectionId)
{
- _appRegistry = appRegistry;
- _vhost = _appRegistry.getVirtualHostRegistry().getDefaultVirtualHost();
+ _vhost = virtualHost;
_conn = conn;
_connectionId = connectionId;
_vhost.getConnectionRegistry().registerConnection(_model);
@@ -74,7 +71,7 @@ public class Connection_1_0 implements C
public void remoteSessionCreation(SessionEndpoint endpoint)
{
- Session_1_0 session = new Session_1_0(_vhost, _appRegistry, this);
+ Session_1_0 session = new Session_1_0(_vhost, this);
_sessions.add(session);
endpoint.setSessionEventListener(session);
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java Thu Feb 28 16:14:30 2013
@@ -80,7 +80,7 @@ public class ExchangeDestination impleme
{
// NO-OP
}
- }, System.currentTimeMillis());
+ });
return ACCEPTED;
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java Thu Feb 28 16:14:30 2013
@@ -23,9 +23,10 @@ package org.apache.qpid.server.protocol.
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.MessageMetaData_1_0;
import org.apache.qpid.server.message.MessageReference;
@@ -34,11 +35,45 @@ import org.apache.qpid.server.store.Stor
public class Message_1_0 implements ServerMessage, InboundMessage
{
+
+
+ private static final AtomicIntegerFieldUpdater<Message_1_0> _refCountUpdater =
+ AtomicIntegerFieldUpdater.newUpdater(Message_1_0.class, "_referenceCount");
+
+ private volatile int _referenceCount = 0;
+
private final StoredMessage<MessageMetaData_1_0> _storedMessage;
private List<ByteBuffer> _fragments;
private WeakReference<Session_1_0> _session;
+ public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage)
+ {
+ _storedMessage = storedMessage;
+ _session = null;
+ _fragments = restoreFragments(storedMessage);
+ }
+
+ private static List<ByteBuffer> restoreFragments(StoredMessage<MessageMetaData_1_0> storedMessage)
+ {
+ ArrayList<ByteBuffer> fragments = new ArrayList<ByteBuffer>();
+ final int FRAGMENT_SIZE = 2048;
+ int offset = 0;
+ ByteBuffer b;
+ do
+ {
+
+ b = storedMessage.getContent(offset,FRAGMENT_SIZE);
+ if(b.hasRemaining())
+ {
+ fragments.add(b);
+ offset+= b.remaining();
+ }
+ }
+ while(b.hasRemaining());
+ return fragments;
+ }
+
public Message_1_0(final StoredMessage<MessageMetaData_1_0> storedMessage,
final List<ByteBuffer> fragments,
final Session_1_0 session)
@@ -136,11 +171,6 @@ public class Message_1_0 implements Serv
return buf;
}
- public SessionConfig getSessionConfig()
- {
- return null; //TODO
- }
-
public List<ByteBuffer> getFragments()
{
return _fragments;
@@ -148,7 +178,61 @@ public class Message_1_0 implements Serv
public Session_1_0 getSession()
{
- return _session.get();
+ return _session == null ? null : _session.get();
+ }
+
+
+ public boolean incrementReference()
+ {
+ if(_refCountUpdater.incrementAndGet(this) <= 0)
+ {
+ _refCountUpdater.decrementAndGet(this);
+ return false;
+ }
+ else
+ {
+ return true;
+ }
+ }
+
+ /**
+ * Threadsafe. This will decrement the reference count and when it reaches zero will remove the message from the
+ * message store.
+ *
+ *
+ * @throws org.apache.qpid.server.queue.MessageCleanupException when an attempt was made to remove the message from the message store and that
+ * failed
+ */
+ public void decrementReference()
+ {
+ int count = _refCountUpdater.decrementAndGet(this);
+
+ // note that the operation of decrementing the reference count and then removing the message does not
+ // have to be atomic since the ref count starts at 1 and the exchange itself decrements that after
+ // the message has been passed to all queues. i.e. we are
+ // not relying on the all the increments having taken place before the delivery manager decrements.
+ if (count == 0)
+ {
+ // set the reference count way below 0 so that we can detect that the message has been deleted
+ // this is to guard against the message being spontaneously recreated (from the mgmt console)
+ // by copying from other queues at the same time as it is being removed.
+ _refCountUpdater.set(this,Integer.MIN_VALUE/2);
+
+ // must check if the handle is null since there may be cases where we decide to throw away a message
+ // and the handle has not yet been constructed
+ if (_storedMessage != null)
+ {
+ _storedMessage.remove();
+ }
+ }
+ else
+ {
+ if (count < 0)
+ {
+ throw new RuntimeException("Reference count for message id " + getMessageNumber()
+ + " has gone below 0.");
+ }
+ }
}
public static class Reference extends MessageReference<Message_1_0>
@@ -160,13 +244,13 @@ public class Message_1_0 implements Serv
protected void onReference(Message_1_0 message)
{
-
+ message.incrementReference();
}
protected void onRelease(Message_1_0 message)
{
-
+ message.decrementReference();
}
-}
+ }
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Thu Feb 28 16:14:30 2013
@@ -36,7 +36,6 @@ import org.apache.qpid.amqp_1_0.type.tra
import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
-import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.message.InboundMessage;
@@ -45,8 +44,6 @@ import org.apache.qpid.server.protocol.A
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.transport.ServerConnection;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -58,7 +55,6 @@ import static org.apache.qpid.server.log
public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSubject
{
private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
- private IApplicationRegistry _appRegistry;
private VirtualHost _vhost;
private AutoCommitTransaction _transaction;
@@ -68,9 +64,8 @@ public class Session_1_0 implements Sess
private UUID _id = UUID.randomUUID();
- public Session_1_0(VirtualHost vhost, IApplicationRegistry appRegistry, final Connection_1_0 connection)
+ public Session_1_0(VirtualHost vhost, final Connection_1_0 connection)
{
- _appRegistry = appRegistry;
_vhost = vhost;
_transaction = new AutoCommitTransaction(vhost.getMessageStore());
_connection = connection;
@@ -456,8 +451,9 @@ public class Session_1_0 implements Sess
{
}
+
@Override
- public UUID getQMFId()
+ public UUID getId()
{
return _id;
}
@@ -580,13 +576,6 @@ public class Session_1_0 implements Sess
return 0;
}
- @Override
- public int compareTo(AMQSessionModel o)
- {
- return getQMFId().compareTo(o.getQMFId());
- }
-
-
public String toLogString()
{
@@ -604,4 +593,9 @@ public class Session_1_0 implements Sess
+ "] ";
}
+ @Override
+ public int compareTo(AMQSessionModel o)
+ {
+ return getId().compareTo(o.getId());
+ }
}
Propchange: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:r1375509-1450773
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Thu Feb 28 16:14:30 2013
@@ -23,8 +23,7 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.configuration.QueueConfig;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
+import org.apache.qpid.server.configuration.QueueConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeReferrer;
import org.apache.qpid.server.logging.LogSubject;
@@ -39,9 +38,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, TransactionLogResource, BaseQueue,
- QueueConfig
+public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, TransactionLogResource, BaseQueue
{
+ String getName();
+
public interface NotificationListener
{
void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg);
@@ -277,9 +277,7 @@ public interface AMQQueue extends Compar
public void doTask(AMQQueue queue) throws AMQException;
}
- void configure(ConfigurationPlugin config);
-
- ConfigurationPlugin getConfiguration();
+ void configure(QueueConfiguration config);
void setExclusive(boolean exclusive);
@@ -315,4 +313,18 @@ public interface AMQQueue extends Compar
*/
String getDescription();
+ long getPersistentByteDequeues();
+
+ long getPersistentMsgDequeues();
+
+ long getPersistentByteEnqueues();
+
+ long getPersistentMsgEnqueues();
+
+ long getTotalDequeueSize();
+
+ long getTotalEnqueueSize();
+
+ long getUnackedMessageCount();
+
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Thu Feb 28 16:14:30 2013
@@ -30,17 +30,25 @@ import org.apache.qpid.AMQSecurityExcept
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.configuration.QueueConfiguration;
-import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.exchange.DefaultExchangeFactory;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
public class AMQQueueFactory
{
+ public static final String X_QPID_FLOW_RESUME_CAPACITY = "x-qpid-flow-resume-capacity";
+ public static final String X_QPID_CAPACITY = "x-qpid-capacity";
+ public static final String X_QPID_MINIMUM_ALERT_REPEAT_GAP = "x-qpid-minimum-alert-repeat-gap";
+ public static final String X_QPID_MAXIMUM_MESSAGE_COUNT = "x-qpid-maximum-message-count";
+ public static final String X_QPID_MAXIMUM_MESSAGE_SIZE = "x-qpid-maximum-message-size";
+ public static final String X_QPID_MAXIMUM_MESSAGE_AGE = "x-qpid-maximum-message-age";
+ public static final String X_QPID_MAXIMUM_QUEUE_DEPTH = "x-qpid-maximum-queue-depth";
+
public static final String X_QPID_PRIORITIES = "x-qpid-priorities";
public static final String X_QPID_DESCRIPTION = "x-qpid-description";
public static final String QPID_LVQ_KEY = "qpid.LVQ_key";
@@ -119,42 +127,49 @@ public class AMQQueueFactory
}
private static final QueueProperty[] DECLAREABLE_PROPERTIES = {
- new QueueLongProperty("x-qpid-maximum-message-age")
+ new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_AGE)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMaximumMessageAge(value);
}
},
- new QueueLongProperty("x-qpid-maximum-message-size")
+ new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_SIZE)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMaximumMessageSize(value);
}
},
- new QueueLongProperty("x-qpid-maximum-message-count")
+ new QueueLongProperty(X_QPID_MAXIMUM_MESSAGE_COUNT)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMaximumMessageCount(value);
}
},
- new QueueLongProperty("x-qpid-minimum-alert-repeat-gap")
+ new QueueLongProperty(X_QPID_MAXIMUM_QUEUE_DEPTH)
+ {
+ public void setPropertyValue(AMQQueue queue, long value)
+ {
+ queue.setMaximumQueueDepth(value);
+ }
+ },
+ new QueueLongProperty(X_QPID_MINIMUM_ALERT_REPEAT_GAP)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setMinimumAlertRepeatGap(value);
}
},
- new QueueLongProperty("x-qpid-capacity")
+ new QueueLongProperty(X_QPID_CAPACITY)
{
public void setPropertyValue(AMQQueue queue, long value)
{
queue.setCapacity(value);
}
},
- new QueueLongProperty("x-qpid-flow-resume-capacity")
+ new QueueLongProperty(X_QPID_FLOW_RESUME_CAPACITY)
{
public void setPropertyValue(AMQQueue queue, long value)
{
@@ -411,9 +426,7 @@ public class AMQQueueFactory
*/
protected static String getDeadLetterQueueName(String name)
{
- ServerConfiguration serverConfig = ApplicationRegistry.getInstance().getConfiguration();
- String dlQueueName = name + serverConfig.getDeadLetterQueueSuffix();
- return dlQueueName;
+ return name + System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_QUEUE_SUFFIX, DEFAULT_DLQ_NAME_SUFFIX);
}
/**
@@ -425,9 +438,7 @@ public class AMQQueueFactory
*/
protected static String getDeadLetterExchangeName(String name)
{
- ServerConfiguration serverConfig = ApplicationRegistry.getInstance().getConfiguration();
- String dlExchangeName = name + serverConfig.getDeadLetterExchangeSuffix();
- return dlExchangeName;
+ return name + System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
}
private static Map<String, Object> createQueueArgumentsFromConfig(QueueConfiguration config)
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/InboundMessageAdapter.java Thu Feb 28 16:14:30 2013
@@ -47,7 +47,7 @@ public class InboundMessageAdapter imple
public AMQShortString getRoutingKeyShortString()
{
- return AMQShortString.valueOf(_entry.getMessage());
+ return AMQShortString.valueOf(_entry.getMessage().getRoutingKey());
}
public String getRoutingKey()
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/IncomingMessage.java Thu Feb 28 16:14:30 2013
@@ -34,7 +34,6 @@ import org.apache.qpid.server.message.En
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.MessageContentSource;
import org.apache.qpid.server.message.MessageMetaData;
-import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.StoredMessage;
import java.nio.ByteBuffer;
@@ -47,9 +46,6 @@ public class IncomingMessage implements
/** Used for debugging purposes. */
private static final Logger _logger = Logger.getLogger(IncomingMessage.class);
- private static final boolean SYNCHED_CLOCKS =
- ApplicationRegistry.getInstance().getConfiguration().getSynchedClocks();
-
private final MessagePublishInfo _messagePublishInfo;
private ContentHeaderBody _contentHeaderBody;
@@ -101,33 +97,7 @@ public class IncomingMessage implements
public void setExpiration()
{
- long expiration =
- ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getExpiration();
- long timestamp =
- ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getTimestamp();
-
- if (SYNCHED_CLOCKS)
- {
- _expiration = expiration;
- }
- else
- {
- // Update TTL to be in broker time.
- if (expiration != 0L)
- {
- if (timestamp != 0L)
- {
- // todo perhaps use arrival time
- long diff = (System.currentTimeMillis() - timestamp);
-
- if ((diff > 1000L) || (diff < 1000L))
- {
- _expiration = expiration + diff;
- }
- }
- }
- }
-
+ _expiration = ((BasicContentHeaderProperties) _contentHeaderBody.getProperties()).getExpiration();
}
public MessageMetaData headersReceived(long currentTime)
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java Thu Feb 28 16:14:30 2013
@@ -454,7 +454,7 @@ public abstract class QueueEntryImpl imp
{
}
- }, 0L);
+ });
txn.dequeue(currentQueue, message, new ServerTransaction.Action()
{
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java Thu Feb 28 16:14:30 2013
@@ -41,11 +41,8 @@ import org.apache.qpid.AMQSecurityExcept
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.pool.ReferenceCountingExecutorService;
import org.apache.qpid.server.binding.Binding;
-import org.apache.qpid.server.configuration.ConfigStore;
-import org.apache.qpid.server.configuration.ConfiguredObject;
-import org.apache.qpid.server.configuration.QueueConfigType;
import org.apache.qpid.server.configuration.QueueConfiguration;
-import org.apache.qpid.server.configuration.plugins.ConfigurationPlugin;
+import org.apache.qpid.server.configuration.plugins.AbstractConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
@@ -55,7 +52,6 @@ import org.apache.qpid.server.logging.me
import org.apache.qpid.server.logging.subjects.QueueLogSubject;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
-import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.subscription.AssignedSubscriptionMessageGroupManager;
import org.apache.qpid.server.subscription.DefinedGroupMessageGroupManager;
@@ -135,23 +131,23 @@ public class SimpleAMQQueue implements A
private final AtomicInteger _bindingCountHigh = new AtomicInteger();
/** max allowed size(KB) of a single message */
- private long _maximumMessageSize = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageSize();
+ private long _maximumMessageSize;
/** max allowed number of messages on a queue. */
- private long _maximumMessageCount = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageCount();
+ private long _maximumMessageCount;
/** max queue depth for the queue */
- private long _maximumQueueDepth = ApplicationRegistry.getInstance().getConfiguration().getMaximumQueueDepth();
+ private long _maximumQueueDepth;
/** maximum message age before alerts occur */
- private long _maximumMessageAge = ApplicationRegistry.getInstance().getConfiguration().getMaximumMessageAge();
+ private long _maximumMessageAge;
/** the minimum interval between sending out consecutive alerts of the same type */
- private long _minimumAlertRepeatGap = ApplicationRegistry.getInstance().getConfiguration().getMinimumAlertRepeatGap();
+ private long _minimumAlertRepeatGap;
- private long _capacity = ApplicationRegistry.getInstance().getConfiguration().getCapacity();
+ private long _capacity;
- private long _flowResumeCapacity = ApplicationRegistry.getInstance().getConfiguration().getFlowResumeCapacity();
+ private long _flowResumeCapacity;
private final Set<NotificationCheck> _notificationChecks = EnumSet.noneOf(NotificationCheck.class);
@@ -185,11 +181,10 @@ public class SimpleAMQQueue implements A
//TODO : persist creation time
private long _createTime = System.currentTimeMillis();
- private UUID _qmfId;
- private ConfigurationPlugin _queueConfiguration;
+ private AbstractConfiguration _queueConfiguration;
/** the maximum delivery count for each message on this queue or 0 if maximum delivery count is not to be enforced. */
- private int _maximumDeliveryCount = ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount();
+ private int _maximumDeliveryCount;
private final MessageGroupManager _messageGroupManager;
private final Collection<SubscriptionRegistrationListener> _subscriptionListeners =
@@ -243,7 +238,6 @@ public class SimpleAMQQueue implements A
_arguments = arguments == null ? new HashMap<String, Object>() : new HashMap<String, Object>(arguments);
_id = id;
- _qmfId = getConfigStore().createId();
_asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
_logSubject = new QueueLogSubject(this);
@@ -259,8 +253,6 @@ public class SimpleAMQQueue implements A
durable, !durable,
_entries.getPriorities() > 0));
- getConfigStore().addConfiguredObject(this);
-
if(arguments != null && arguments.containsKey(QPID_GROUP_HEADER_KEY))
{
if(arguments.containsKey(QPID_SHARED_MSG_GROUP) && String.valueOf(arguments.get(QPID_SHARED_MSG_GROUP)).equals("1"))
@@ -331,22 +323,6 @@ public class SimpleAMQQueue implements A
return _id;
}
- @Override
- public UUID getQMFId()
- {
- return _qmfId;
- }
-
- public QueueConfigType getConfigType()
- {
- return QueueConfigType.getInstance();
- }
-
- public ConfiguredObject getParent()
- {
- return getVirtualHost();
- }
-
public boolean isDurable()
{
return _durable;
@@ -621,24 +597,6 @@ public class SimpleAMQQueue implements A
break;
}
}
-
- reconfigure();
- }
-
- private void reconfigure()
- {
- //Reconfigure the queue for to reflect this new binding.
- ConfigurationPlugin config = getVirtualHost().getConfiguration().getQueueConfiguration(this);
-
- if (config != null)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Reconfiguring queue(" + this + ") with config:" + config + " was "+ _queueConfiguration);
- }
- // Reconfigure with new config.
- configure(config);
- }
}
public int getBindingCountHigh()
@@ -649,8 +607,6 @@ public class SimpleAMQQueue implements A
public void removeBinding(final Binding binding)
{
_bindings.remove(binding);
-
- reconfigure();
}
public List<Binding> getBindings()
@@ -1383,7 +1339,6 @@ public class SimpleAMQQueue implements A
}
_virtualHost.getQueueRegistry().unregisterQueue(_name);
- getConfigStore().removeConfiguredObject(this);
List<QueueEntry> entries = getMessagesOnTheQueue(new QueueEntryFilter()
{
@@ -1442,7 +1397,7 @@ public class SimpleAMQQueue implements A
{
}
- }, 0L);
+ });
txn.dequeue(this, entry.getMessage(),
new ServerTransaction.Action()
{
@@ -2161,39 +2116,21 @@ public class SimpleAMQQueue implements A
}
- public void configure(ConfigurationPlugin config)
+ public void configure(QueueConfiguration config)
{
if (config != null)
{
- if (config instanceof QueueConfiguration)
- {
-
- setMaximumMessageAge(((QueueConfiguration)config).getMaximumMessageAge());
- setMaximumQueueDepth(((QueueConfiguration)config).getMaximumQueueDepth());
- setMaximumMessageSize(((QueueConfiguration)config).getMaximumMessageSize());
- setMaximumMessageCount(((QueueConfiguration)config).getMaximumMessageCount());
- setMinimumAlertRepeatGap(((QueueConfiguration)config).getMinimumAlertRepeatGap());
- setMaximumDeliveryCount(((QueueConfiguration)config).getMaxDeliveryCount());
- _capacity = ((QueueConfiguration)config).getCapacity();
- _flowResumeCapacity = ((QueueConfiguration)config).getFlowResumeCapacity();
- }
-
- _queueConfiguration = config;
-
+ setMaximumMessageAge(config.getMaximumMessageAge());
+ setMaximumQueueDepth(config.getMaximumQueueDepth());
+ setMaximumMessageSize(config.getMaximumMessageSize());
+ setMaximumMessageCount(config.getMaximumMessageCount());
+ setMinimumAlertRepeatGap(config.getMinimumAlertRepeatGap());
+ setMaximumDeliveryCount(config.getMaxDeliveryCount());
+ _capacity = config.getCapacity();
+ _flowResumeCapacity = config.getFlowResumeCapacity();
}
}
-
- public ConfigurationPlugin getConfiguration()
- {
- return _queueConfiguration;
- }
-
- public ConfigStore getConfigStore()
- {
- return getVirtualHost().getConfigStore();
- }
-
public long getMessageDequeueCount()
{
return _dequeueCount.get();
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/ApplicationRegistry.java Thu Feb 28 16:14:30 2013
@@ -20,44 +20,39 @@
*/
package org.apache.qpid.server.registry;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.log4j.Logger;
-import org.apache.qpid.server.logging.*;
-import org.osgi.framework.BundleContext;
+import java.util.Collection;
+import java.util.Timer;
+import java.util.TimerTask;
+import org.apache.log4j.Logger;
import org.apache.qpid.common.Closeable;
import org.apache.qpid.common.QpidProperties;
-import org.apache.qpid.qmf.QMFService;
-import org.apache.qpid.server.configuration.BrokerConfig;
-import org.apache.qpid.server.configuration.ConfigStore;
-import org.apache.qpid.server.configuration.ConfigurationManager;
-import org.apache.qpid.server.configuration.ServerConfiguration;
-import org.apache.qpid.server.configuration.SystemConfig;
-import org.apache.qpid.server.configuration.SystemConfigImpl;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.configuration.ConfigurationEntryStore;
+import org.apache.qpid.server.configuration.ConfiguredObjectRecoverer;
+import org.apache.qpid.server.configuration.RecovererProvider;
+import org.apache.qpid.server.configuration.startup.DefaultRecovererProvider;
+import org.apache.qpid.server.logging.CompositeStartupMessageLogger;
+import org.apache.qpid.server.logging.Log4jMessageLogger;
+import org.apache.qpid.server.logging.LogActor;
+import org.apache.qpid.server.logging.LogRecorder;
+import org.apache.qpid.server.logging.RootMessageLogger;
+import org.apache.qpid.server.logging.SystemOutMessageLogger;
import org.apache.qpid.server.logging.actors.AbstractActor;
import org.apache.qpid.server.logging.actors.BrokerActor;
import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.BrokerMessages;
import org.apache.qpid.server.logging.messages.VirtualHostMessages;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.adapter.BrokerAdapter;
-import org.apache.qpid.server.plugins.PluginManager;
-import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManagerRegistry;
-import org.apache.qpid.server.security.auth.manager.IAuthenticationManagerRegistry;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.stats.StatisticsCounter;
-import org.apache.qpid.server.transport.QpidAcceptor;
+import org.apache.qpid.server.stats.StatisticsGatherer;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicReference;
-
/**
* An abstract application registry that provides access to configuration information and handles the
@@ -65,321 +60,90 @@ import java.util.concurrent.atomic.Atomi
* <p/>
* Subclasses should handle the construction of the "registered objects" such as the exchange registry.
*/
-public abstract class ApplicationRegistry implements IApplicationRegistry
+public class ApplicationRegistry implements IApplicationRegistry
{
-
private static final Logger _logger = Logger.getLogger(ApplicationRegistry.class);
- private static AtomicReference<IApplicationRegistry> _instance = new AtomicReference<IApplicationRegistry>(null);
-
- private final ServerConfiguration _configuration;
-
- private final Map<InetSocketAddress, QpidAcceptor> _acceptors =
- Collections.synchronizedMap(new HashMap<InetSocketAddress, QpidAcceptor>());
-
- private IAuthenticationManagerRegistry _authenticationManagerRegistry;
-
- private final VirtualHostRegistry _virtualHostRegistry = new VirtualHostRegistry(this);
-
- private SecurityManager _securityManager;
+ private final VirtualHostRegistry _virtualHostRegistry = new VirtualHostRegistry();
- private PluginManager _pluginManager;
-
- private ConfigurationManager _configurationManager;
-
- private RootMessageLogger _rootMessageLogger;
-
- private CompositeStartupMessageLogger _startupMessageLogger;
-
- private UUID _brokerId = UUID.randomUUID();
-
- private QMFService _qmfService;
-
- private BrokerConfig _brokerConfig;
+ private volatile RootMessageLogger _rootMessageLogger;
private Broker _broker;
- private ConfigStore _configStore;
-
private Timer _reportingTimer;
private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
- private BundleContext _bundleContext;
-
- private final List<PortBindingListener> _portBindingListeners = new ArrayList<PortBindingListener>();
-
- private int _httpManagementPort = -1, _httpsManagementPort = -1;
-
private LogRecorder _logRecorder;
- private List<IAuthenticationManagerRegistry.RegistryChangeListener> _authManagerChangeListeners =
- new ArrayList<IAuthenticationManagerRegistry.RegistryChangeListener>();
-
- public Map<InetSocketAddress, QpidAcceptor> getAcceptors()
- {
- synchronized (_acceptors)
- {
- return new HashMap<InetSocketAddress, QpidAcceptor>(_acceptors);
- }
- }
-
- protected void setSecurityManager(SecurityManager securityManager)
- {
- _securityManager = securityManager;
- }
-
- protected void setPluginManager(PluginManager pluginManager)
- {
- _pluginManager = pluginManager;
- }
-
- protected void setConfigurationManager(ConfigurationManager configurationManager)
- {
- _configurationManager = configurationManager;
- }
+ private ConfigurationEntryStore _store;
+ private TaskExecutor _taskExecutor;
protected void setRootMessageLogger(RootMessageLogger rootMessageLogger)
{
_rootMessageLogger = rootMessageLogger;
}
- protected CompositeStartupMessageLogger getStartupMessageLogger()
- {
- return _startupMessageLogger;
- }
-
- protected void setStartupMessageLogger(CompositeStartupMessageLogger startupMessageLogger)
+ public ApplicationRegistry(ConfigurationEntryStore store)
{
- _startupMessageLogger = startupMessageLogger;
- }
-
- protected void setBrokerId(UUID brokerId)
- {
- _brokerId = brokerId;
- }
-
- protected QMFService getQmfService()
- {
- return _qmfService;
- }
-
- protected void setQmfService(QMFService qmfService)
- {
- _qmfService = qmfService;
- }
-
- public static void initialise(IApplicationRegistry instance) throws Exception
- {
- if(instance == null)
- {
- throw new IllegalArgumentException("ApplicationRegistry instance must not be null");
- }
-
- if(!_instance.compareAndSet(null, instance))
- {
- throw new IllegalStateException("An ApplicationRegistry is already initialised");
- }
-
- _logger.info("Initialising Application Registry(" + instance + ")");
-
-
- final ConfigStore store = ConfigStore.newInstance();
- store.setRoot(new SystemConfigImpl(store));
- instance.setConfigStore(store);
-
- final BrokerConfig brokerConfig = new BrokerConfigAdapter(instance);
-
- final SystemConfig system = store.getRoot();
- system.addBroker(brokerConfig);
- instance.setBrokerConfig(brokerConfig);
-
- try
- {
- instance.initialise();
- }
- catch (Exception e)
- {
- _instance.set(null);
-
- //remove the Broker instance, then re-throw
- try
- {
- system.removeBroker(brokerConfig);
- }
- catch(Throwable t)
- {
- //ignore
- }
-
- throw e;
- }
- }
-
- public ConfigStore getConfigStore()
- {
- return _configStore;
- }
-
- public void setConfigStore(final ConfigStore configStore)
- {
- _configStore = configStore;
- }
-
- public static boolean isConfigured()
- {
- return _instance.get() != null;
- }
-
- public static void remove()
- {
- IApplicationRegistry instance = _instance.getAndSet(null);
- try
- {
- if (instance != null)
- {
- if (_logger.isInfoEnabled())
- {
- _logger.info("Shutting down ApplicationRegistry(" + instance + ")");
- }
- instance.close();
- }
- }
- catch (Exception e)
- {
- _logger.error("Error shutting down Application Registry(" + instance + "): " + e, e);
- }
- }
-
- protected ApplicationRegistry(ServerConfiguration configuration)
- {
- this(configuration, null);
- }
-
- protected ApplicationRegistry(ServerConfiguration configuration, BundleContext bundleContext)
- {
- _configuration = configuration;
- _bundleContext = bundleContext;
- }
-
- public void configure() throws ConfigurationException
- {
- _configurationManager = new ConfigurationManager();
-
- try
- {
- _pluginManager = new PluginManager(_configuration.getPluginDirectory(), _configuration.getCacheDirectory(), _bundleContext);
- }
- catch (Exception e)
- {
- throw new ConfigurationException(e);
- }
-
- _configuration.initialise();
+ _store = store;
+ initialiseStatistics();
}
public void initialise() throws Exception
{
+ // Create the RootLogger to be used during broker operation
+ boolean statusUpdatesEnabled = Boolean.parseBoolean(System.getProperty(BrokerProperties.PROPERTY_STATUS_UPDATES, "true"));
+ _rootMessageLogger = new Log4jMessageLogger(statusUpdatesEnabled);
+
_logRecorder = new LogRecorder();
- //Create the RootLogger to be used during broker operation
- _rootMessageLogger = new Log4jMessageLogger(_configuration);
//Create the composite (log4j+SystemOut MessageLogger to be used during startup
RootMessageLogger[] messageLoggers = {new SystemOutMessageLogger(), _rootMessageLogger};
- _startupMessageLogger = new CompositeStartupMessageLogger(messageLoggers);
+ CompositeStartupMessageLogger startupMessageLogger = new CompositeStartupMessageLogger(messageLoggers);
- BrokerActor actor = new BrokerActor(_startupMessageLogger);
- CurrentActor.setDefault(actor);
+ BrokerActor actor = new BrokerActor(startupMessageLogger);
CurrentActor.set(actor);
-
+ CurrentActor.setDefault(actor);
+ GenericActor.setDefaultMessageLogger(_rootMessageLogger);
try
{
- initialiseStatistics();
-
- if(_configuration.getHTTPManagementEnabled())
- {
- _httpManagementPort = _configuration.getHTTPManagementPort();
- }
- if (_configuration.getHTTPSManagementEnabled())
- {
- _httpsManagementPort = _configuration.getHTTPSManagementPort();
- }
-
- _broker = new BrokerAdapter(this);
-
- configure();
-
- _qmfService = new QMFService(getConfigStore(), this);
-
logStartupMessages(CurrentActor.get());
- _securityManager = new SecurityManager(_configuration, _pluginManager);
+ _taskExecutor = new TaskExecutor();
+ _taskExecutor.start();
- _authenticationManagerRegistry = createAuthenticationManagerRegistry(_configuration, _pluginManager);
+ RecovererProvider provider = new DefaultRecovererProvider((StatisticsGatherer)this, _virtualHostRegistry, _logRecorder, _rootMessageLogger, _taskExecutor);
+ ConfiguredObjectRecoverer<? extends ConfiguredObject> brokerRecoverer = provider.getRecoverer(Broker.class.getSimpleName());
+ _broker = (Broker) brokerRecoverer.create(provider, _store.getRootEntry());
- if(!_authManagerChangeListeners.isEmpty())
- {
- for(IAuthenticationManagerRegistry.RegistryChangeListener listener : _authManagerChangeListeners)
- {
+ _virtualHostRegistry.setDefaultVirtualHostName((String)_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST));
- _authenticationManagerRegistry.addRegistryChangeListener(listener);
- for(AuthenticationManager authMgr : _authenticationManagerRegistry.getAvailableAuthenticationManagers().values())
- {
- listener.authenticationManagerRegistered(authMgr);
- }
- }
- _authManagerChangeListeners.clear();
- }
- }
- finally
- {
- CurrentActor.remove();
- }
-
- CurrentActor.set(new BrokerActor(_rootMessageLogger));
- try
- {
- initialiseVirtualHosts();
initialiseStatisticsReporting();
+
+ // starting the broker
+ _broker.setDesiredState(State.INITIALISING, State.ACTIVE);
+
+ CurrentActor.get().message(BrokerMessages.READY());
}
finally
{
- // Startup complete, so pop the current actor
CurrentActor.remove();
}
- }
- protected IAuthenticationManagerRegistry createAuthenticationManagerRegistry(ServerConfiguration _configuration, PluginManager _pluginManager)
- throws ConfigurationException
- {
- return new AuthenticationManagerRegistry(_configuration, _pluginManager);
+ CurrentActor.setDefault(new BrokerActor(_rootMessageLogger));
}
- protected void initialiseVirtualHosts() throws Exception
+ private void initialiseStatisticsReporting()
{
- for (String name : _configuration.getVirtualHosts())
- {
- createVirtualHost(_configuration.getVirtualHostConfig(name));
- }
- getVirtualHostRegistry().setDefaultVirtualHostName(_configuration.getDefaultVirtualHost());
- }
-
- public void initialiseStatisticsReporting()
- {
- long report = _configuration.getStatisticsReportingPeriod() * 1000; // convert to ms
- final boolean broker = _configuration.isStatisticsGenerationBrokerEnabled();
- final boolean virtualhost = _configuration.isStatisticsGenerationVirtualhostsEnabled();
- final boolean reset = _configuration.isStatisticsReportResetEnabled();
+ long report = ((Number)_broker.getAttribute(Broker.STATISTICS_REPORTING_PERIOD)).intValue() * 1000; // convert to ms
+ final boolean reset = (Boolean)_broker.getAttribute(Broker.STATISTICS_REPORTING_RESET_ENABLED);
/* add a timer task to report statistics if generation is enabled for broker or virtualhosts */
- if (report > 0L && (broker || virtualhost))
+ if (report > 0L)
{
_reportingTimer = new Timer("Statistics-Reporting", true);
-
-
-
- _reportingTimer.scheduleAtFixedRate(new StatisticsReportingTask(broker, virtualhost, reset),
- report / 2,
- report);
+ StatisticsReportingTask task = new StatisticsReportingTask(reset, _rootMessageLogger);
+ _reportingTimer.scheduleAtFixedRate(task, report / 2, report);
}
}
@@ -388,76 +152,62 @@ public abstract class ApplicationRegistr
private final int DELIVERED = 0;
private final int RECEIVED = 1;
- private boolean _broker;
- private boolean _virtualhost;
- private boolean _reset;
+ private final boolean _reset;
+ private final RootMessageLogger _logger;
-
- public StatisticsReportingTask(boolean broker, boolean virtualhost, boolean reset)
+ public StatisticsReportingTask(boolean reset, RootMessageLogger logger)
{
- _broker = broker;
- _virtualhost = virtualhost;
_reset = reset;
+ _logger = logger;
}
public void run()
{
- CurrentActor.set(new AbstractActor(ApplicationRegistry.getInstance().getRootMessageLogger()) {
+ CurrentActor.set(new AbstractActor(_logger)
+ {
public String getLogMessage()
{
return "[" + Thread.currentThread().getName() + "] ";
}
});
-
- if (_broker)
+ try
{
CurrentActor.get().message(BrokerMessages.STATS_DATA(DELIVERED, _dataDelivered.getPeak() / 1024.0, _dataDelivered.getTotal()));
CurrentActor.get().message(BrokerMessages.STATS_MSGS(DELIVERED, _messagesDelivered.getPeak(), _messagesDelivered.getTotal()));
CurrentActor.get().message(BrokerMessages.STATS_DATA(RECEIVED, _dataReceived.getPeak() / 1024.0, _dataReceived.getTotal()));
CurrentActor.get().message(BrokerMessages.STATS_MSGS(RECEIVED, _messagesReceived.getPeak(), _messagesReceived.getTotal()));
- }
+ Collection<VirtualHost> hosts = _virtualHostRegistry.getVirtualHosts();
- if (_virtualhost)
- {
- for (VirtualHost vhost : getVirtualHostRegistry().getVirtualHosts())
+ if (hosts.size() > 1)
{
- String name = vhost.getName();
- StatisticsCounter dataDelivered = vhost.getDataDeliveryStatistics();
- StatisticsCounter messagesDelivered = vhost.getMessageDeliveryStatistics();
- StatisticsCounter dataReceived = vhost.getDataReceiptStatistics();
- StatisticsCounter messagesReceived = vhost.getMessageReceiptStatistics();
-
- CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, DELIVERED, dataDelivered.getPeak() / 1024.0, dataDelivered.getTotal()));
- CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, DELIVERED, messagesDelivered.getPeak(), messagesDelivered.getTotal()));
- CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, RECEIVED, dataReceived.getPeak() / 1024.0, dataReceived.getTotal()));
- CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, RECEIVED, messagesReceived.getPeak(), messagesReceived.getTotal()));
+ for (VirtualHost vhost : hosts)
+ {
+ String name = vhost.getName();
+ StatisticsCounter dataDelivered = vhost.getDataDeliveryStatistics();
+ StatisticsCounter messagesDelivered = vhost.getMessageDeliveryStatistics();
+ StatisticsCounter dataReceived = vhost.getDataReceiptStatistics();
+ StatisticsCounter messagesReceived = vhost.getMessageReceiptStatistics();
+
+ CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, DELIVERED, dataDelivered.getPeak() / 1024.0, dataDelivered.getTotal()));
+ CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, DELIVERED, messagesDelivered.getPeak(), messagesDelivered.getTotal()));
+ CurrentActor.get().message(VirtualHostMessages.STATS_DATA(name, RECEIVED, dataReceived.getPeak() / 1024.0, dataReceived.getTotal()));
+ CurrentActor.get().message(VirtualHostMessages.STATS_MSGS(name, RECEIVED, messagesReceived.getPeak(), messagesReceived.getTotal()));
+ }
}
- }
- if (_reset)
+ if (_reset)
+ {
+ resetStatistics();
+ }
+ }
+ catch(Exception e)
{
- resetStatistics();
+ ApplicationRegistry._logger.warn("Unexpected exception occured while reporting the statistics", e);
+ }
+ finally
+ {
+ CurrentActor.remove();
}
-
- CurrentActor.remove();
- }
- }
-
- /**
- * Get the ApplicationRegistry
- * @return the IApplicationRegistry instance
- * @throws IllegalStateException if no registry instance has been initialised.
- */
- public static IApplicationRegistry getInstance() throws IllegalStateException
- {
- IApplicationRegistry iApplicationRegistry = _instance.get();
- if (iApplicationRegistry == null)
- {
- throw new IllegalStateException("No ApplicationRegistry has been initialised");
- }
- else
- {
- return iApplicationRegistry;
}
}
@@ -488,7 +238,7 @@ public abstract class ApplicationRegistr
}
//Set the Actor for Broker Shutdown
- CurrentActor.set(new BrokerActor(getRootMessageLogger()));
+ CurrentActor.set(new BrokerActor(_rootMessageLogger));
try
{
//Stop Statistics Reporting
@@ -497,154 +247,34 @@ public abstract class ApplicationRegistr
_reportingTimer.cancel();
}
- //Stop incoming connections
- unbind();
+ if (_broker != null)
+ {
+ _broker.setDesiredState(_broker.getActualState(), State.STOPPED);
+ }
//Shutdown virtualhosts
close(_virtualHostRegistry);
- close(_authenticationManagerRegistry);
-
- close(_qmfService);
-
- close(_pluginManager);
-
- BrokerConfig broker = getBrokerConfig();
- if(broker != null)
+ if (_taskExecutor != null)
{
- broker.getSystem().removeBroker(broker);
+ _taskExecutor.stop();
}
CurrentActor.get().message(BrokerMessages.STOPPED());
- }
- finally
- {
- CurrentActor.remove();
- }
- }
-
- private void unbind()
- {
- List<QpidAcceptor> removedAcceptors = new ArrayList<QpidAcceptor>();
- synchronized (_acceptors)
- {
- for (InetSocketAddress bindAddress : _acceptors.keySet())
- {
- QpidAcceptor acceptor = _acceptors.get(bindAddress);
- removedAcceptors.add(acceptor);
- try
- {
- acceptor.getNetworkTransport().close();
- }
- catch (Throwable e)
- {
- _logger.error("Unable to close network driver due to:" + e.getMessage());
- }
+ _logRecorder.closeLogRecorder();
- CurrentActor.get().message(BrokerMessages.SHUTTING_DOWN(acceptor.toString(), bindAddress.getPort()));
- }
}
- synchronized (_portBindingListeners)
- {
- for(QpidAcceptor acceptor : removedAcceptors)
- {
- for(PortBindingListener listener : _portBindingListeners)
- {
- listener.unbound(acceptor);
- }
- }
- }
- }
-
- public ServerConfiguration getConfiguration()
- {
- return _configuration;
- }
-
- public void addAcceptor(InetSocketAddress bindAddress, QpidAcceptor acceptor)
- {
- synchronized (_acceptors)
- {
- _acceptors.put(bindAddress, acceptor);
- }
- synchronized (_portBindingListeners)
+ finally
{
- for(PortBindingListener listener : _portBindingListeners)
+ if (_taskExecutor != null)
{
- listener.bound(acceptor, bindAddress);
+ _taskExecutor.stopImmediately();
}
+ CurrentActor.remove();
}
- }
-
- public VirtualHostRegistry getVirtualHostRegistry()
- {
- return _virtualHostRegistry;
- }
-
- public SecurityManager getSecurityManager()
- {
- return _securityManager;
- }
-
- @Override
- public AuthenticationManager getAuthenticationManager(SocketAddress address)
- {
- return _authenticationManagerRegistry.getAuthenticationManager(address);
- }
-
- @Override
- public IAuthenticationManagerRegistry getAuthenticationManagerRegistry()
- {
- return _authenticationManagerRegistry;
- }
-
- public PluginManager getPluginManager()
- {
- return _pluginManager;
- }
-
- public ConfigurationManager getConfigurationManager()
- {
- return _configurationManager;
- }
-
- public RootMessageLogger getRootMessageLogger()
- {
- return _rootMessageLogger;
- }
-
- public RootMessageLogger getCompositeStartupMessageLogger()
- {
- return _startupMessageLogger;
- }
-
- public UUID getBrokerId()
- {
- return _brokerId;
- }
-
- public QMFService getQMFService()
- {
- return _qmfService;
- }
-
- public BrokerConfig getBrokerConfig()
- {
- return _brokerConfig;
- }
-
- public void setBrokerConfig(final BrokerConfig broker)
- {
- _brokerConfig = broker;
- }
-
- public VirtualHost createVirtualHost(final VirtualHostConfiguration vhostConfig) throws Exception
- {
- VirtualHostImpl virtualHost = new VirtualHostImpl(this, vhostConfig);
- _virtualHostRegistry.registerVirtualHost(virtualHost);
- getBrokerConfig().addVirtualHost(virtualHost);
- return virtualHost;
+ _store = null;
+ _broker = null;
}
public void registerMessageDelivered(long messageSize)
@@ -713,60 +343,10 @@ public abstract class ApplicationRegistr
logActor.message(BrokerMessages.MAX_MEMORY(Runtime.getRuntime().maxMemory()));
}
+ @Override
public Broker getBroker()
{
return _broker;
}
- @Override
- public void addPortBindingListener(PortBindingListener listener)
- {
- synchronized (_portBindingListeners)
- {
- _portBindingListeners.add(listener);
- }
- }
-
-
- @Override
- public boolean useHTTPManagement()
- {
- return _httpManagementPort != -1;
- }
-
- @Override
- public int getHTTPManagementPort()
- {
- return _httpManagementPort;
- }
-
- @Override
- public boolean useHTTPSManagement()
- {
- return _httpsManagementPort != -1;
- }
-
- @Override
- public int getHTTPSManagementPort()
- {
- return _httpsManagementPort;
- }
-
- public LogRecorder getLogRecorder()
- {
- return _logRecorder;
- }
-
- @Override
- public void addRegistryChangeListener(IAuthenticationManagerRegistry.RegistryChangeListener registryChangeListener)
- {
- if(_authenticationManagerRegistry == null)
- {
- _authManagerChangeListeners.add(registryChangeListener);
- }
- else
- {
- _authenticationManagerRegistry.addRegistryChangeListener(registryChangeListener);
- }
- }
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java Thu Feb 28 16:14:30 2013
@@ -20,27 +20,8 @@
*/
package org.apache.qpid.server.registry;
-import org.apache.qpid.qmf.QMFService;
-import org.apache.qpid.server.configuration.BrokerConfig;
-import org.apache.qpid.server.configuration.ConfigStore;
-import org.apache.qpid.server.configuration.ConfigurationManager;
-import org.apache.qpid.server.configuration.ServerConfiguration;
-import org.apache.qpid.server.configuration.VirtualHostConfiguration;
-import org.apache.qpid.server.logging.RootMessageLogger;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.plugins.PluginManager;
-import org.apache.qpid.server.security.SecurityManager;
-import org.apache.qpid.server.security.auth.manager.AuthenticationManager;
-import org.apache.qpid.server.security.auth.manager.IAuthenticationManagerRegistry;
import org.apache.qpid.server.stats.StatisticsGatherer;
-import org.apache.qpid.server.transport.QpidAcceptor;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.virtualhost.VirtualHostRegistry;
-
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.util.Map;
-import java.util.UUID;
public interface IApplicationRegistry extends StatisticsGatherer
{
@@ -56,80 +37,5 @@ public interface IApplicationRegistry ex
*/
void close();
- /**
- * Get the low level configuration. For use cases where the configured object approach is not required
- * you can get the complete configuration information.
- * @return a Commons Configuration instance
- */
- ServerConfiguration getConfiguration();
-
- /**
- * Get the AuthenticationManager for the given socket address
- *
- * If no AuthenticationManager has been specifically set for the given address, then use the default
- * AuthenticationManager
- *
- * @param address The (listening) socket address for which the AuthenticationManager is required
- * @return the AuthenticationManager
- */
- AuthenticationManager getAuthenticationManager(SocketAddress address);
-
- IAuthenticationManagerRegistry getAuthenticationManagerRegistry();
-
- VirtualHostRegistry getVirtualHostRegistry();
-
- SecurityManager getSecurityManager();
-
- PluginManager getPluginManager();
-
- ConfigurationManager getConfigurationManager();
-
- RootMessageLogger getRootMessageLogger();
-
- /**
- * Register any acceptors for this registry
- * @param bindAddress The address that the acceptor has been bound with
- * @param acceptor The acceptor in use
- */
- void addAcceptor(InetSocketAddress bindAddress, QpidAcceptor acceptor);
-
- public UUID getBrokerId();
-
- QMFService getQMFService();
-
- void setBrokerConfig(BrokerConfig broker);
-
- BrokerConfig getBrokerConfig();
-
Broker getBroker();
-
- VirtualHost createVirtualHost(VirtualHostConfiguration vhostConfig) throws Exception;
-
- ConfigStore getConfigStore();
-
- void setConfigStore(ConfigStore store);
-
- void initialiseStatisticsReporting();
-
- Map<InetSocketAddress, QpidAcceptor> getAcceptors();
-
- void addPortBindingListener(PortBindingListener listener);
-
- boolean useHTTPManagement();
-
- int getHTTPManagementPort();
-
- boolean useHTTPSManagement();
-
- int getHTTPSManagementPort();
-
- void addRegistryChangeListener(IAuthenticationManagerRegistry.RegistryChangeListener registryChangeListener);
-
- public interface PortBindingListener
- {
- public void bound(QpidAcceptor acceptor, InetSocketAddress bindAddress);
- public void unbound(QpidAcceptor acceptor);
-
- }
-
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/AuthorizationHolder.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/AuthorizationHolder.java?rev=1451244&r1=1451243&r2=1451244&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/AuthorizationHolder.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/security/AuthorizationHolder.java Thu Feb 28 16:14:30 2013
@@ -24,14 +24,14 @@ import javax.security.auth.Subject;
import java.security.Principal;
/**
- * Represents the authorization of the logged on user.
- *
+ * Represents the authorization of the logged on user.
+ *
*/
public interface AuthorizationHolder
{
- /**
+ /**
* Returns the {@link Subject} of the authorized user. This is guaranteed to
- * contain at least one {@link org.apache.qpid.server.security.auth.sasl.UsernamePrincipal}, representing the the identity
+ * contain at least one {@link org.apache.qpid.server.security.auth.UsernamePrincipal}, representing the the identity
* used when the user logged on to the application, and zero or more {@link org.apache.qpid.server.security.auth.sasl.GroupPrincipal}
* representing the group(s) to which the user belongs.
*
@@ -39,10 +39,10 @@ public interface AuthorizationHolder
*/
Subject getAuthorizedSubject();
- /**
+ /**
* Returns the {@link Principal} representing the the identity
* used when the user logged on to the application.
- *
+ *
* @return a Principal
*/
Principal getAuthorizedPrincipal();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org