You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2012/08/03 14:14:12 UTC
svn commit: r1368910 [15/27] - in /qpid/branches/asyncstore: ./ bin/
cpp/bindings/qmf/ruby/ cpp/bindings/qmf2/ruby/ cpp/bindings/qpid/python/
cpp/bindings/qpid/ruby/ cpp/bindings/qpid/ruby/features/
cpp/bindings/qpid/ruby/features/step_definitions/ cpp...
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java Fri Aug 3 12:13:32 2012
@@ -31,24 +31,21 @@ import java.util.UUID;
public interface AMQConnectionModel extends StatisticsGatherer
{
/**
- * get a unique id for this connection.
- *
- * @return a {@link UUID} representing the connection
- */
- public UUID getId();
-
- /**
* Close the underlying Connection
- *
+ *
* @param cause
* @param message
* @throws org.apache.qpid.AMQException
*/
public void close(AMQConstant cause, String message) throws AMQException;
+ public void block();
+
+ public void unblock();
+
/**
* Close the given requested Session
- *
+ *
* @param session
* @param cause
* @param message
@@ -57,10 +54,10 @@ public interface AMQConnectionModel exte
public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException;
public long getConnectionId();
-
+
/**
* Get a list of all sessions using this connection.
- *
+ *
* @return a list of {@link AMQSessionModel}s
*/
public List<AMQSessionModel> getSessionModels();
@@ -73,4 +70,16 @@ public interface AMQConnectionModel exte
public String getUserName();
public boolean isSessionNameUnique(byte[] name);
+
+ String getRemoteAddressString();
+
+ String getClientId();
+
+ String getClientVersion();
+
+ String getPrincipalAsString();
+
+ long getSessionCountLimit();
+
+ long getLastIoTime();
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolEngine.java Fri Aug 3 12:13:32 2012
@@ -20,8 +20,26 @@
*/
package org.apache.qpid.server.protocol;
-import org.apache.log4j.Logger;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.security.Principal;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import javax.security.auth.Subject;
+import javax.security.sasl.SaslServer;
+import org.apache.log4j.Logger;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
@@ -46,13 +64,10 @@ import org.apache.qpid.server.logging.ac
import org.apache.qpid.server.logging.actors.ManagementActor;
import org.apache.qpid.server.logging.messages.ConnectionMessages;
import org.apache.qpid.server.logging.subjects.ConnectionLogSubject;
-import org.apache.qpid.server.management.Managable;
-import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.output.ProtocolOutputConverter;
import org.apache.qpid.server.output.ProtocolOutputConverterRegistry;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.stats.StatisticsCounter;
@@ -66,25 +81,7 @@ import org.apache.qpid.transport.Transpo
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.util.BytesDataOutput;
-import javax.management.JMException;
-import javax.security.auth.Subject;
-import javax.security.sasl.SaslServer;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.SocketAddress;
-import java.nio.ByteBuffer;
-import java.security.Principal;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class AMQProtocolEngine implements ServerProtocolEngine, Managable, AMQProtocolSession, ConnectionConfig
+public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession, ConnectionConfig
{
private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
@@ -109,8 +106,6 @@ public class AMQProtocolEngine implement
private AMQCodecFactory _codecFactory;
- private AMQProtocolSessionMBean _managedObject;
-
private SaslServer _saslServer;
private Object _lastReceived;
@@ -147,12 +142,10 @@ public class AMQProtocolEngine implement
private long _maxFrameSize;
private final AtomicBoolean _closing = new AtomicBoolean(false);
- private final UUID _id;
+ private final UUID _qmfId;
private final ConfigStore _configStore;
private long _createTime = System.currentTimeMillis();
- private ApplicationRegistry _registry;
- private boolean _statisticsEnabled = false;
private StatisticsCounter _messagesDelivered, _dataDelivered, _messagesReceived, _dataReceived;
private NetworkConnection _network;
@@ -160,14 +153,13 @@ public class AMQProtocolEngine implement
private volatile boolean _deferFlush;
private long _lastReceivedTime;
+ private boolean _blocking;
- public ManagedObject getManagedObject()
- {
- return _managedObject;
- }
+ private final Lock _receivedLock;
public AMQProtocolEngine(VirtualHostRegistry virtualHostRegistry, NetworkConnection network, final long connectionId)
{
+ _receivedLock = new ReentrantLock();
_stateManager = new AMQStateManager(virtualHostRegistry, this);
_codecFactory = new AMQCodecFactory(true, this);
@@ -179,12 +171,12 @@ public class AMQProtocolEngine implement
_logSubject = new ConnectionLogSubject(this);
_configStore = virtualHostRegistry.getConfigStore();
- _id = _configStore.createId();
+ _qmfId = _configStore.createId();
_actor.message(ConnectionMessages.OPEN(null, null, null, false, false, false));
- _registry = virtualHostRegistry.getApplicationRegistry();
initialiseStatistics();
+
}
public void setNetworkConnection(NetworkConnection network)
@@ -198,11 +190,6 @@ public class AMQProtocolEngine implement
_sender = sender;
}
- private AMQProtocolSessionMBean createMBean() throws JMException
- {
- return new AMQProtocolSessionMBean(this);
- }
-
public long getSessionID()
{
return _connectionID;
@@ -244,6 +231,8 @@ public class AMQProtocolEngine implement
final long arrivalTime = System.currentTimeMillis();
_lastReceivedTime = arrivalTime;
_lastIoTime = arrivalTime;
+
+ _receivedLock.lock();
try
{
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
@@ -268,6 +257,10 @@ public class AMQProtocolEngine implement
_logger.error("Unexpected exception when processing datablock", e);
closeProtocolSession();
}
+ finally
+ {
+ _receivedLock.unlock();
+ }
}
private void receiveComplete()
@@ -374,7 +367,7 @@ public class AMQProtocolEngine implement
// This sets the protocol version (and hence framing classes) for this session.
setProtocolVersion(pv);
- String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
+ String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager(getLocalAddress()).getMechanisms();
String locales = "en_US";
@@ -576,7 +569,10 @@ public class AMQProtocolEngine implement
public List<AMQChannel> getChannels()
{
- return new ArrayList<AMQChannel>(_channelMap.values());
+ synchronized (_channelMap)
+ {
+ return new ArrayList<AMQChannel>(_channelMap.values());
+ }
}
public AMQChannel getAndAssertChannel(int channelId) throws AMQException
@@ -633,24 +629,21 @@ public class AMQProtocolEngine implement
}
else
{
- _channelMap.put(channel.getChannelId(), channel);
+ synchronized (_channelMap)
+ {
+ _channelMap.put(channel.getChannelId(), channel);
+
+ if(_blocking)
+ {
+ channel.block();
+ }
+ }
}
if (((channelId & CHANNEL_CACHE_SIZE) == channelId))
{
_cachedChannels[channelId] = channel;
}
-
- checkForNotification();
- }
-
- private void checkForNotification()
- {
- int channelsCount = _channelMap.size();
- if (_managedObject != null && channelsCount >= _maxNoOfChannels)
- {
- _managedObject.notifyClients("Channel count (" + channelsCount + ") has reached the threshold value");
- }
}
public Long getMaximumNumberOfChannels()
@@ -735,10 +728,14 @@ public class AMQProtocolEngine implement
*/
public void removeChannel(int channelId)
{
- _channelMap.remove(channelId);
- if ((channelId & CHANNEL_CACHE_SIZE) == channelId)
+ synchronized (_channelMap)
{
- _cachedChannels[channelId] = null;
+ _channelMap.remove(channelId);
+
+ if ((channelId & CHANNEL_CACHE_SIZE) == channelId)
+ {
+ _cachedChannels[channelId] = null;
+ }
}
}
@@ -763,12 +760,14 @@ public class AMQProtocolEngine implement
*/
private void closeAllChannels() throws AMQException
{
- for (AMQChannel channel : _channelMap.values())
+ for (AMQChannel channel : getChannels())
{
channel.close();
}
-
- _channelMap.clear();
+ synchronized (_channelMap)
+ {
+ _channelMap.clear();
+ }
for (int i = 0; i <= CHANNEL_CACHE_SIZE; i++)
{
_cachedChannels[i] = null;
@@ -780,6 +779,9 @@ public class AMQProtocolEngine implement
{
if(_closing.compareAndSet(false,true))
{
+ // force sync of outstanding async work
+ receiveComplete();
+
// REMOVE THIS SHOULD NOT BE HERE.
if (CurrentActor.get() == null)
{
@@ -796,13 +798,6 @@ public class AMQProtocolEngine implement
getConfigStore().removeConfiguredObject(this);
- if (_managedObject != null)
- {
- _managedObject.unregister();
- // Ensure we only do this once.
- _managedObject = null;
- }
-
for (Task task : _taskList)
{
task.doTask(this);
@@ -835,7 +830,7 @@ public class AMQProtocolEngine implement
}
}
- public void closeConnection(int channelId, AMQConnectionException e) throws AMQException
+ private void closeConnection(int channelId, AMQConnectionException e) throws AMQException
{
try
{
@@ -846,12 +841,18 @@ public class AMQProtocolEngine implement
markChannelAwaitingCloseOk(channelId);
closeSession();
- _stateManager.changeState(AMQState.CONNECTION_CLOSING);
- writeFrame(e.getCloseFrame(channelId));
}
finally
{
- closeProtocolSession();
+ try
+ {
+ _stateManager.changeState(AMQState.CONNECTION_CLOSING);
+ writeFrame(e.getCloseFrame(channelId));
+ }
+ finally
+ {
+ closeProtocolSession();
+ }
}
@@ -983,16 +984,6 @@ public class AMQProtocolEngine implement
_virtualHost.getConnectionRegistry().registerConnection(this);
_configStore.addConfiguredObject(this);
-
- try
- {
- _managedObject = createMBean();
- _managedObject.register();
- }
- catch (JMException e)
- {
- _logger.error(e);
- }
}
public void addSessionCloseTask(Task task)
@@ -1026,7 +1017,7 @@ public class AMQProtocolEngine implement
public Principal getAuthorizedPrincipal()
{
- return _authorizedSubject == null ? null : UsernamePrincipal.getUsernamePrincipalFromSubject(_authorizedSubject);
+ return _authorizedSubject == null ? null : _authorizedSubject.getPrincipals().iterator().next();
}
public SocketAddress getRemoteAddress()
@@ -1039,6 +1030,11 @@ public class AMQProtocolEngine implement
return _network.getLocalAddress();
}
+ public Principal getPeerPrincipal()
+ {
+ return _network.getPeerPrincipal();
+ }
+
public MethodRegistry getMethodRegistry()
{
return _methodRegistry;
@@ -1144,6 +1140,16 @@ public class AMQProtocolEngine implement
return _clientVersion;
}
+ public String getPrincipalAsString()
+ {
+ return getAuthId();
+ }
+
+ public long getSessionCountLimit()
+ {
+ return getMaximumNumberOfChannels();
+ }
+
public Boolean isIncoming()
{
return true;
@@ -1199,9 +1205,10 @@ public class AMQProtocolEngine implement
return false;
}
- public UUID getId()
+ @Override
+ public UUID getQMFId()
{
- return _id;
+ return _qmfId;
}
public long getConnectionId()
@@ -1337,6 +1344,36 @@ public class AMQProtocolEngine implement
(Throwable) null));
}
+ public void block()
+ {
+ synchronized (_channelMap)
+ {
+ if(!_blocking)
+ {
+ _blocking = true;
+ for(AMQChannel channel : _channelMap.values())
+ {
+ channel.block();
+ }
+ }
+ }
+ }
+
+ public void unblock()
+ {
+ synchronized (_channelMap)
+ {
+ if(_blocking)
+ {
+ _blocking = false;
+ for(AMQChannel channel : _channelMap.values())
+ {
+ channel.unblock();
+ }
+ }
+ }
+ }
+
public boolean isClosed()
{
return _closed;
@@ -1344,12 +1381,7 @@ public class AMQProtocolEngine implement
public List<AMQSessionModel> getSessionModels()
{
- List<AMQSessionModel> sessions = new ArrayList<AMQSessionModel>();
- for (AMQChannel channel : getChannels())
- {
- sessions.add((AMQSessionModel) channel);
- }
- return sessions;
+ return new ArrayList<AMQSessionModel>(getChannels());
}
public LogSubject getLogSubject()
@@ -1359,21 +1391,15 @@ public class AMQProtocolEngine implement
public void registerMessageDelivered(long messageSize)
{
- if (isStatisticsEnabled())
- {
- _messagesDelivered.registerEvent(1L);
- _dataDelivered.registerEvent(messageSize);
- }
+ _messagesDelivered.registerEvent(1L);
+ _dataDelivered.registerEvent(messageSize);
_virtualHost.registerMessageDelivered(messageSize);
}
public void registerMessageReceived(long messageSize, long timestamp)
{
- if (isStatisticsEnabled())
- {
- _messagesReceived.registerEvent(1L, timestamp);
- _dataReceived.registerEvent(messageSize, timestamp);
- }
+ _messagesReceived.registerEvent(1L, timestamp);
+ _dataReceived.registerEvent(messageSize, timestamp);
_virtualHost.registerMessageReceived(messageSize, timestamp);
}
@@ -1407,29 +1433,26 @@ public class AMQProtocolEngine implement
public void initialiseStatistics()
{
- setStatisticsEnabled(!StatisticsCounter.DISABLE_STATISTICS &&
- _registry.getConfiguration().isStatisticsGenerationConnectionsEnabled());
-
_messagesDelivered = new StatisticsCounter("messages-delivered-" + getSessionID());
_dataDelivered = new StatisticsCounter("data-delivered-" + getSessionID());
_messagesReceived = new StatisticsCounter("messages-received-" + getSessionID());
_dataReceived = new StatisticsCounter("data-received-" + getSessionID());
}
- public boolean isStatisticsEnabled()
+ public boolean isSessionNameUnique(byte[] name)
{
- return _statisticsEnabled;
+ // 0-8/0-9/0-9-1 sessions don't have names
+ return true;
}
- public void setStatisticsEnabled(boolean enabled)
+ public String getRemoteAddressString()
{
- _statisticsEnabled = enabled;
+ return String.valueOf(getRemoteAddress());
}
- public boolean isSessionNameUnique(byte[] name)
+ public String getClientId()
{
- // 0-8/0-9/0-9-1 sessions don't have names
- return true;
+ return String.valueOf(getContextKey());
}
public void setDeferFlush(boolean deferFlush)
@@ -1466,4 +1489,9 @@ public class AMQProtocolEngine implement
{
return _reference;
}
+
+ public Lock getReceivedLock()
+ {
+ return _receivedLock;
+ }
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Fri Aug 3 12:13:32 2012
@@ -20,12 +20,14 @@
*/
package org.apache.qpid.server.protocol;
+import java.net.SocketAddress;
+import java.security.Principal;
import java.util.List;
+import java.util.concurrent.locks.Lock;
import javax.security.auth.Subject;
import javax.security.sasl.SaslServer;
-import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -60,6 +62,13 @@ public interface AMQProtocolSession exte
long getLastReceivedTime();
+ /**
+ * Return the local socket address for the connection
+ *
+ * @return the socket address
+ */
+ SocketAddress getLocalAddress();
+
public static interface Task
{
public void doTask(AMQProtocolSession session) throws AMQException;
@@ -145,10 +154,6 @@ public interface AMQProtocolSession exte
void closeProtocolSession();
- /** This must be called to close the session in order to free up any resources managed by the session. */
- void closeConnection(int channelId, AMQConnectionException e) throws AMQException;
-
-
/** @return a key that uniquely identifies this session */
Object getKey();
@@ -210,4 +215,7 @@ public interface AMQProtocolSession exte
void mgmtCloseChannel(int channelId);
+ public Principal getPeerPrincipal();
+
+ Lock getReceivedLock();
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java Fri Aug 3 12:13:32 2012
@@ -37,26 +37,26 @@ import org.apache.qpid.server.queue.Simp
public interface AMQSessionModel extends Comparable<AMQSessionModel>
{
/** Unique session ID across entire broker*/
- public UUID getId();
+ public UUID getQMFId();
public AMQConnectionModel getConnectionModel();
public String getClientID();
-
+
public void close() throws AMQException;
public LogSubject getLogSubject();
-
+
/**
* This method is called from the housekeeping thread to check the status of
* transactions on this session and react appropriately.
- *
+ *
* If a transaction is open for too long or idle for too long then a warning
* is logged or the connection is closed, depending on the configuration. An open
* transaction is one that has recent activity. The transaction age is counted
- * from the time the transaction was started. An idle transaction is one that
- * has had no activity, such as publishing or acknowledgeing messages.
- *
+ * from the time the transaction was started. An idle transaction is one that
+ * has had no activity, such as publishing or acknowledging messages.
+ *
* @param openWarn time in milliseconds before alerting on open transaction
* @param openClose time in milliseconds before closing connection with open transaction
* @param idleWarn time in milliseconds before alerting on idle transaction
@@ -68,6 +68,22 @@ public interface AMQSessionModel extends
void unblock(AMQQueue queue);
+ void block();
+
+ void unblock();
+
+ boolean getBlocking();
boolean onSameConnection(InboundMessage inbound);
+
+ int getUnacknowledgedMessageCount();
+
+ Long getTxnCount();
+ Long getTxnStart();
+ Long getTxnCommits();
+ Long getTxnRejects();
+
+ int getChannelId();
+
+ int getConsumerCount();
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/MultiVersionProtocolEngine.java Fri Aug 3 12:13:32 2012
@@ -302,7 +302,7 @@ public class MultiVersionProtocolEngine
public ServerProtocolEngine getProtocolEngine()
{
final ConnectionDelegate connDelegate =
- new org.apache.qpid.server.transport.ServerConnectionDelegate(_appRegistry, _fqdn);
+ new org.apache.qpid.server.transport.ServerConnectionDelegate(_appRegistry, _fqdn, _appRegistry.getAuthenticationManager(getLocalAddress()));
ServerConnection conn = new ServerConnection(_id);
conn.setConnectionDelegate(connDelegate);
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_0_10.java Fri Aug 3 12:13:32 2012
@@ -47,7 +47,7 @@ public class ProtocolEngine_0_10 extend
private long _readBytes;
private long _writtenBytes;
private ServerConnection _connection;
- private final UUID _id;
+ private final UUID _qmfId;
private final IApplicationRegistry _appRegistry;
private long _createTime = System.currentTimeMillis();
@@ -59,7 +59,7 @@ public class ProtocolEngine_0_10 extend
_connection = conn;
_connection.setConnectionConfig(this);
- _id = appRegistry.getConfigStore().createId();
+ _qmfId = appRegistry.getConfigStore().createId();
_appRegistry = appRegistry;
if(network != null)
@@ -88,7 +88,7 @@ public class ProtocolEngine_0_10 extend
_network = network;
_connection.setSender(new Disassembler(sender, MAX_FRAME_SIZE));
-
+ _connection.setPeerPrincipal(_network.getPeerPrincipal());
// FIXME Two log messages to maintain compatibility with earlier protocol versions
_connection.getLogActor().message(ConnectionMessages.OPEN(null, null, null, false, false, false));
_connection.getLogActor().message(ConnectionMessages.OPEN(null, "0-10", null, false, true, false));
@@ -180,9 +180,10 @@ public class ProtocolEngine_0_10 extend
return _appRegistry.getConfigStore();
}
- public UUID getId()
+ @Override
+ public UUID getQMFId()
{
- return _id;
+ return _qmfId;
}
public ConnectionConfigType getConfigType()
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java Fri Aug 3 12:13:32 2012
@@ -26,12 +26,13 @@ import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
-import javax.security.auth.callback.CallbackHandler;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
import org.apache.qpid.amqp_1_0.codec.FrameWriter;
import org.apache.qpid.amqp_1_0.framing.AMQFrame;
import org.apache.qpid.amqp_1_0.framing.FrameHandler;
import org.apache.qpid.amqp_1_0.framing.OversizeFrameException;
-import org.apache.qpid.amqp_1_0.transport.CallbackHandlerSource;
+import org.apache.qpid.amqp_1_0.transport.SaslServerProvider;
import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
import org.apache.qpid.amqp_1_0.transport.Container;
import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler;
@@ -95,7 +96,7 @@ public class ProtocolEngine_1_0_0 implem
}
private State _state = State.A;
-
+
public ProtocolEngine_1_0_0(final IApplicationRegistry appRegistry, long id)
@@ -143,8 +144,9 @@ public class ProtocolEngine_1_0_0 implem
Container container = new Container(_appRegistry.getBrokerId().toString());
- _conn = new ConnectionEndpoint(container,asCallbackHandlerSource(_appRegistry.getAuthenticationManager()));
- _conn.setConnectionEventListener(new Connection_1_0(_appRegistry));
+ _conn = new ConnectionEndpoint(container, asSaslServerProvider(_appRegistry.getAuthenticationManager(
+ getLocalAddress())));
+ _conn.setConnectionEventListener(new Connection_1_0(_appRegistry, _conn, _connectionId));
_conn.setFrameOutputHandler(this);
_conn.setRemoteAddress(_network.getRemoteAddress());
@@ -155,14 +157,14 @@ public class ProtocolEngine_1_0_0 implem
_sender.flush();
}
- private CallbackHandlerSource asCallbackHandlerSource(final AuthenticationManager authenticationManager)
+ private SaslServerProvider asSaslServerProvider(final AuthenticationManager authenticationManager)
{
- return new CallbackHandlerSource()
+ return new SaslServerProvider()
{
@Override
- public CallbackHandler getHandler(String mechanism)
+ public SaslServer getSaslServer(String mechanism, String fqdn) throws SaslException
{
- return authenticationManager.getHandler(mechanism);
+ return authenticationManager.createSaslServer(mechanism, fqdn, null);
}
};
}
Propchange: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0.java:r1333988-1368650
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java Fri Aug 3 12:13:32 2012
@@ -26,13 +26,14 @@ import java.nio.ByteBuffer;
import java.util.UUID;
import java.util.logging.Level;
import java.util.logging.Logger;
-import javax.security.auth.callback.CallbackHandler;
+import javax.security.sasl.SaslException;
+import javax.security.sasl.SaslServer;
import org.apache.qpid.amqp_1_0.codec.FrameWriter;
import org.apache.qpid.amqp_1_0.codec.ProtocolHandler;
import org.apache.qpid.amqp_1_0.framing.AMQFrame;
import org.apache.qpid.amqp_1_0.framing.OversizeFrameException;
import org.apache.qpid.amqp_1_0.framing.SASLFrameHandler;
-import org.apache.qpid.amqp_1_0.transport.CallbackHandlerSource;
+import org.apache.qpid.amqp_1_0.transport.SaslServerProvider;
import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
import org.apache.qpid.amqp_1_0.transport.Container;
import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler;
@@ -57,7 +58,7 @@ public class ProtocolEngine_1_0_0_SASL i
private long _createTime = System.currentTimeMillis();
private ConnectionEndpoint _conn;
private long _connectionId;
-
+
private static final ByteBuffer HEADER =
ByteBuffer.wrap(new byte[]
{
@@ -163,9 +164,9 @@ public class ProtocolEngine_1_0_0_SASL i
Container container = new Container(_appRegistry.getBrokerId().toString());
- _conn = new ConnectionEndpoint(container, asCallbackHandlerSource(ApplicationRegistry.getInstance()
- .getAuthenticationManager()));
- _conn.setConnectionEventListener(new Connection_1_0(_appRegistry));
+ _conn = new ConnectionEndpoint(container, asSaslServerProvider(ApplicationRegistry.getInstance()
+ .getAuthenticationManager(getLocalAddress())));
+ _conn.setConnectionEventListener(new Connection_1_0(_appRegistry, _conn, _connectionId));
_conn.setRemoteAddress(getRemoteAddress());
@@ -200,14 +201,14 @@ public class ProtocolEngine_1_0_0_SASL i
}
- private CallbackHandlerSource asCallbackHandlerSource(final AuthenticationManager authenticationManager)
+ private SaslServerProvider asSaslServerProvider(final AuthenticationManager authenticationManager)
{
- return new CallbackHandlerSource()
+ return new SaslServerProvider()
{
@Override
- public CallbackHandler getHandler(String mechanism)
+ public SaslServer getSaslServer(String mechanism, String fqdn) throws SaslException
{
- return authenticationManager.getHandler(mechanism);
+ return authenticationManager.createSaslServer(mechanism, fqdn, null);
}
};
}
Propchange: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ProtocolEngine_1_0_0_SASL.java:r1333988-1368650
Propchange: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0:r1333988-1368650
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java Fri Aug 3 12:13:32 2012
@@ -20,21 +20,35 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import java.text.MessageFormat;
+import java.util.Collection;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint;
import org.apache.qpid.amqp_1_0.transport.ConnectionEventListener;
import org.apache.qpid.amqp_1_0.transport.SessionEndpoint;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.stats.StatisticsCounter;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
+
public class Connection_1_0 implements ConnectionEventListener
{
private IApplicationRegistry _appRegistry;
private VirtualHost _vhost;
+ private final ConnectionEndpoint _conn;
+ private final long _connectionId;
+ private final Collection<Session_1_0> _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>());
public static interface Task
@@ -48,18 +62,27 @@ public class Connection_1_0 implements C
- public Connection_1_0(IApplicationRegistry appRegistry)
+ public Connection_1_0(IApplicationRegistry appRegistry, ConnectionEndpoint conn, long connectionId)
{
_appRegistry = appRegistry;
_vhost = _appRegistry.getVirtualHostRegistry().getDefaultVirtualHost();
+ _conn = conn;
+ _connectionId = connectionId;
+ _vhost.getConnectionRegistry().registerConnection(_model);
+
}
public void remoteSessionCreation(SessionEndpoint endpoint)
{
Session_1_0 session = new Session_1_0(_vhost, _appRegistry, this);
+ _sessions.add(session);
endpoint.setSessionEventListener(session);
}
+ void sessionEnded(Session_1_0 session)
+ {
+ _sessions.remove(session);
+ }
void removeConnectionCloseTask(final Task task)
{
@@ -86,6 +109,8 @@ public class Connection_1_0 implements C
{
_closeTasks.clear();
}
+ _vhost.getConnectionRegistry().deregisterConnection(_model);
+
}
@@ -94,5 +119,174 @@ public class Connection_1_0 implements C
closeReceived();
}
+ private final AMQConnectionModel _model = new AMQConnectionModel()
+ {
+ private final StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter();
+ private final StatisticsCounter _messageReceiptStatistics = new StatisticsCounter();
+ private final StatisticsCounter _dataDeliveryStatistics = new StatisticsCounter();
+ private final StatisticsCounter _dataReceiptStatistics = new StatisticsCounter();
+
+ private final LogSubject _logSubject = new LogSubject()
+ {
+ @Override
+ public String toLogString()
+ {
+ return "[" +
+ MessageFormat.format(CONNECTION_FORMAT,
+ getConnectionId(),
+ getClientId(),
+ getRemoteAddressString(),
+ _vhost.getName())
+ + "] ";
+
+ }
+ };
+
+ @Override
+ public void close(AMQConstant cause, String message) throws AMQException
+ {
+ // TODO
+ }
+
+ @Override
+ public void block()
+ {
+ // TODO
+ }
+
+ @Override
+ public void unblock()
+ {
+ // TODO
+ }
+
+ @Override
+ public void closeSession(AMQSessionModel session, AMQConstant cause, String message) throws AMQException
+ {
+ // TODO
+ }
+
+ @Override
+ public long getConnectionId()
+ {
+ return _connectionId;
+ }
+
+ @Override
+ public List<AMQSessionModel> getSessionModels()
+ {
+ return new ArrayList<AMQSessionModel>(_sessions);
+ }
+
+ @Override
+ public LogSubject getLogSubject()
+ {
+ return _logSubject;
+ }
+
+ @Override
+ public String getUserName()
+ {
+ return getPrincipalAsString();
+ }
+
+ @Override
+ public boolean isSessionNameUnique(byte[] name)
+ {
+ return true; // TODO
+ }
+
+ @Override
+ public String getRemoteAddressString()
+ {
+ return String.valueOf(_conn.getRemoteAddress());
+ }
+
+ @Override
+ public String getClientId()
+ {
+ return _conn.getRemoteContainerId();
+ }
+
+ @Override
+ public String getClientVersion()
+ {
+ return ""; //TODO
+ }
+
+ @Override
+ public String getPrincipalAsString()
+ {
+ return String.valueOf(_conn.getUser());
+ }
+
+ @Override
+ public long getSessionCountLimit()
+ {
+ return 0; // TODO
+ }
+
+ @Override
+ public long getLastIoTime()
+ {
+ return 0; // TODO
+ }
+
+ @Override
+ public void initialiseStatistics()
+ {
+ // TODO
+ }
+
+ @Override
+ public void registerMessageReceived(long messageSize, long timestamp)
+ {
+ // TODO
+ }
+
+ @Override
+ public void registerMessageDelivered(long messageSize)
+ {
+ // TODO
+ }
+
+ @Override
+ public StatisticsCounter getMessageDeliveryStatistics()
+ {
+ return _messageDeliveryStatistics;
+ }
+
+ @Override
+ public StatisticsCounter getMessageReceiptStatistics()
+ {
+ return _messageReceiptStatistics;
+ }
+
+ @Override
+ public StatisticsCounter getDataDeliveryStatistics()
+ {
+ return _dataDeliveryStatistics;
+ }
+
+ @Override
+ public StatisticsCounter getDataReceiptStatistics()
+ {
+ return _dataReceiptStatistics;
+ }
+
+ @Override
+ public void resetStatistics()
+ {
+ // TODO
+ }
+
+
+ };
+
+ AMQConnectionModel getModel()
+ {
+ return _model;
+ }
+
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkRegistry.java Fri Aug 3 12:13:32 2012
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.server.protocol.v1_0;
import java.util.HashMap;
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java Fri Aug 3 12:13:32 2012
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.server.protocol.v1_0;
import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java Fri Aug 3 12:13:32 2012
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.server.protocol.v1_0;
import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Fri Aug 3 12:13:32 2012
@@ -200,7 +200,7 @@ public class SendingLink_1_0 implements
if(queue == null)
{
queue = AMQQueueFactory.createAMQQueueImpl(
- UUIDGenerator.generateUUID(),
+ UUIDGenerator.generateQueueUUID(name, _vhost.getName()),
name,
isDurable,
null,
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java Fri Aug 3 12:13:32 2012
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.protocol.v1_0;
+import java.text.MessageFormat;
import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
@@ -35,18 +36,26 @@ import org.apache.qpid.amqp_1_0.type.tra
import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
+import org.apache.qpid.protocol.ProtocolEngine;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.registry.IApplicationRegistry;
+import org.apache.qpid.server.transport.ServerConnection;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.*;
-public class Session_1_0 implements SessionEventListener
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
+
+public class Session_1_0 implements SessionEventListener, AMQSessionModel, LogSubject
{
private static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy");
private IApplicationRegistry _appRegistry;
@@ -56,6 +65,7 @@ public class Session_1_0 implements Sess
private final LinkedHashMap<Integer, ServerTransaction> _openTransactions =
new LinkedHashMap<Integer, ServerTransaction>();
private final Connection_1_0 _connection;
+ private UUID _id = UUID.randomUUID();
public Session_1_0(VirtualHost vhost, IApplicationRegistry appRegistry, final Connection_1_0 connection)
@@ -319,7 +329,7 @@ public class Session_1_0 implements Sess
? null
: (LifetimePolicy) properties.get(LIFETIME_POLICY);
- final AMQQueue tempQueue = queue = AMQQueueFactory.createAMQQueueImpl( UUIDGenerator.generateUUID(),
+ final AMQQueue tempQueue = queue = AMQQueueFactory.createAMQQueueImpl( UUIDGenerator.generateQueueUUID(queueName, _vhost.getName()),
queueName,
false, // durable
null, // owner
@@ -405,6 +415,8 @@ public class Session_1_0 implements Sess
iter.remove();
}
+ _connection.sessionEnded(this);
+
}
Integer binaryToInteger(final Binary txnId)
@@ -443,4 +455,153 @@ public class Session_1_0 implements Sess
public void forceEnd()
{
}
+
+ @Override
+ public UUID getQMFId()
+ {
+ return _id;
+ }
+
+ @Override
+ public AMQConnectionModel getConnectionModel()
+ {
+ return _connection.getModel();
+ }
+
+ @Override
+ public String getClientID()
+ {
+ // TODO
+ return "";
+ }
+
+ @Override
+ public void close() throws AMQException
+ {
+ // TODO - required for AMQSessionModel / management initiated closing
+ }
+
+ @Override
+ public LogSubject getLogSubject()
+ {
+ return this;
+ }
+
+ @Override
+ public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose) throws AMQException
+ {
+ // TODO - required for AMQSessionModel / long running transaction detection
+ }
+
+ @Override
+ public void block(AMQQueue queue)
+ {
+ // TODO - required for AMQSessionModel / producer side flow control
+ }
+
+ @Override
+ public void unblock(AMQQueue queue)
+ {
+ // TODO - required for AMQSessionModel / producer side flow control
+ }
+
+ @Override
+ public void block()
+ {
+ // TODO - required for AMQSessionModel / producer side flow control
+ }
+
+ @Override
+ public void unblock()
+ {
+ // TODO - required for AMQSessionModel / producer side flow control
+ }
+
+ @Override
+ public boolean getBlocking()
+ {
+ // TODO
+ return false;
+ }
+
+ @Override
+ public boolean onSameConnection(InboundMessage inbound)
+ {
+ // TODO
+ return false;
+ }
+
+ @Override
+ public int getUnacknowledgedMessageCount()
+ {
+ // TODO
+ return 0;
+ }
+
+ @Override
+ public Long getTxnCount()
+ {
+ // TODO
+ return 0l;
+ }
+
+ @Override
+ public Long getTxnStart()
+ {
+ // TODO
+ return 0l;
+ }
+
+ @Override
+ public Long getTxnCommits()
+ {
+ // TODO
+ return 0l;
+ }
+
+ @Override
+ public Long getTxnRejects()
+ {
+ // TODO
+ return 0l;
+ }
+
+ @Override
+ public int getChannelId()
+ {
+ // TODO
+ return 0;
+ }
+
+ @Override
+ public int getConsumerCount()
+ {
+ // TODO
+ return 0;
+ }
+
+ @Override
+ public int compareTo(AMQSessionModel o)
+ {
+ return getQMFId().compareTo(o.getQMFId());
+ }
+
+
+
+ public String toLogString()
+ {
+ long connectionId = getConnectionModel().getConnectionId();
+
+ String remoteAddress = getConnectionModel().getRemoteAddressString();
+
+ return "[" +
+ MessageFormat.format(CHANNEL_FORMAT,
+ connectionId,
+ getClientID(),
+ remoteAddress,
+ _vhost.getName(), // TODO - virtual host
+ 0) // TODO - channel)
+ + "] ";
+ }
+
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java Fri Aug 3 12:13:32 2012
@@ -49,6 +49,7 @@ import org.apache.qpid.amqp_1_0.type.tra
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.subscription.Subscription;
@@ -66,7 +67,7 @@ class Subscription_1_0 implements Subscr
private final QueueEntry.SubscriptionAssignedState _assignedState = new QueueEntry.SubscriptionAssignedState(this);
private final long _id;
private final boolean _acquires;
- private AMQQueue.Context _queueContext;
+ private volatile AMQQueue.Context _queueContext;
private Map<String, Object> _properties = new ConcurrentHashMap<String, Object>();
private ReentrantLock _stateChangeLock = new ReentrantLock();
@@ -631,4 +632,46 @@ class Subscription_1_0 implements Subscr
{
_filters = filters;
}
+
+ @Override
+ public AMQSessionModel getSessionModel()
+ {
+ // TODO
+ return getSession();
+ }
+
+ @Override
+ public long getBytesOut()
+ {
+ // TODO
+ return 0;
+ }
+
+ @Override
+ public long getMessagesOut()
+ {
+ // TODO
+ return 0;
+ }
+
+ @Override
+ public long getUnacknowledgedBytes()
+ {
+ // TODO
+ return 0;
+ }
+
+ @Override
+ public long getUnacknowledgedMessages()
+ {
+ // TODO
+ return 0;
+ }
+
+ @Override
+ public String getConsumerName()
+ {
+ //TODO
+ return "TODO";
+ }
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/UnsettledAction.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/UnsettledAction.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/UnsettledAction.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/UnsettledAction.java Fri Aug 3 12:13:32 2012
@@ -1,3 +1,23 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.server.protocol.v1_0;
import org.apache.qpid.amqp_1_0.type.DeliveryState;
Propchange: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/
------------------------------------------------------------------------------
Merged /qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue:r1333988-1368650
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java Fri Aug 3 12:13:32 2012
@@ -28,21 +28,25 @@ import org.apache.qpid.server.configurat
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeReferrer;
import org.apache.qpid.server.logging.LogSubject;
-import org.apache.qpid.server.management.Managable;
-import org.apache.qpid.server.management.ManagedObject;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.security.AuthorizationHolder;
import org.apache.qpid.server.store.TransactionLogResource;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
-public interface AMQQueue extends Managable, Comparable<AMQQueue>, ExchangeReferrer, TransactionLogResource, BaseQueue,
+public interface AMQQueue extends Comparable<AMQQueue>, ExchangeReferrer, TransactionLogResource, BaseQueue,
QueueConfig
{
+ public interface NotificationListener
+ {
+ void notifyClients(NotificationCheck notification, AMQQueue queue, String notificationMsg);
+ }
+
boolean getDeleteOnNoConsumers();
void setDeleteOnNoConsumers(boolean b);
@@ -57,6 +61,12 @@ public interface AMQQueue extends Managa
LogSubject getLogSubject();
+ long getUnackedMessageBytes();
+
+ long getTotalDequeueCount();
+
+ long getTotalEnqueueCount();
+
public interface Context
{
QueueEntry getLastSeenEntry();
@@ -79,6 +89,17 @@ public interface AMQQueue extends Managa
void unregisterSubscription(final Subscription subscription) throws AMQException;
+ Collection<Subscription> getConsumers();
+
+ interface SubscriptionRegistrationListener
+ {
+ void subscriptionRegistered(AMQQueue queue, Subscription subscription);
+ void subscriptionUnregistered(AMQQueue queue, Subscription subscription);
+ }
+
+ void addSubscriptionRegistrationListener(SubscriptionRegistrationListener listener);
+ void removeSubscriptionRegistrationListener(SubscriptionRegistrationListener listener);
+
int getConsumerCount();
@@ -109,7 +130,7 @@ public interface AMQQueue extends Managa
void dequeue(QueueEntry entry, Subscription sub);
- void decrementUnackedMsgCount();
+ void decrementUnackedMsgCount(QueueEntry queueEntry);
boolean resend(final QueueEntry entry, final Subscription subscription) throws AMQException;
@@ -139,20 +160,8 @@ public interface AMQQueue extends Managa
*/
public List<QueueEntry> getMessagesRangeOnTheQueue(final long fromPosition, final long toPosition);
+ void visit(QueueEntryVisitor visitor);
- void moveMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName);
-
- void copyMessagesToAnotherQueue(long fromMessageId, long toMessageId, String queueName);
-
- void removeMessagesFromQueue(long fromMessageId, long toMessageId);
-
- static interface Visitor
- {
- boolean visit(QueueEntry entry);
- }
-
- void visit(Visitor visitor);
-
long getMaximumMessageSize();
@@ -216,8 +225,6 @@ public interface AMQQueue extends Managa
void setAlternateExchange(Exchange exchange);
- void setAlternateExchange(String exchangeName);
-
Map<String, Object> getArguments();
void checkCapacity(AMQSessionModel channel);
@@ -245,12 +252,12 @@ public interface AMQQueue extends Managa
}
/**
- * ExistingSubscriptionPreventsExclusive signals a failure to create an exclusize subscription, as a subscription
+ * ExistingSubscriptionPreventsExclusive signals a failure to create an exclusive subscription, as a subscription
* already exists.
*
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Represent failure to create an exclusize subscription, as a subscription already exists.
+ * <tr><td> Represent failure to create an exclusive subscription, as a subscription already exists.
* </table>
*
* @todo Not an AMQP exception as no status code.
@@ -274,9 +281,7 @@ public interface AMQQueue extends Managa
ConfigurationPlugin getConfiguration();
- ManagedObject getManagedObject();
-
- void setExclusive(boolean exclusive) throws AMQException;
+ void setExclusive(boolean exclusive);
/**
* Gets the maximum delivery count. If a message on this queue
@@ -295,4 +300,19 @@ public interface AMQQueue extends Managa
*/
public void setMaximumDeliveryCount(final int maximumDeliveryCount);
+ void setNotificationListener(NotificationListener listener);
+
+ /**
+ * Sets the free text description of this queue.
+ *
+ * @param description
+ *
+ */
+ void setDescription(String description);
+
+ /**
+ * Gets the free text description of this queue.
+ */
+ String getDescription();
+
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java Fri Aug 3 12:13:32 2012
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.queue;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
@@ -41,6 +42,7 @@ import org.apache.qpid.server.virtualhos
public class AMQQueueFactory
{
public static final String X_QPID_PRIORITIES = "x-qpid-priorities";
+ public static final String X_QPID_DESCRIPTION = "x-qpid-description";
public static final String QPID_LVQ_KEY = "qpid.LVQ_key";
public static final String QPID_LAST_VALUE_QUEUE = "qpid.last_value_queue";
public static final String QPID_LAST_VALUE_QUEUE_KEY = "qpid.last_value_queue_key";
@@ -169,29 +171,7 @@ public class AMQQueueFactory
};
/**
- * Creates a new queue with a random id.
- *
- * @see #createAMQQueueImpl(UUID, String, boolean, String, boolean, boolean, VirtualHost, Map)
- * @deprecated because only called from unit tests
- * */
- @Deprecated
- public static AMQQueue createAMQQueueImpl(AMQShortString name,
- boolean durable,
- AMQShortString owner,
- boolean autoDelete,
- boolean exclusive,
- VirtualHost virtualHost, final FieldTable arguments) throws AMQException
- {
- return createAMQQueueImpl(UUIDGenerator.generateUUID(),
- name == null ? null : name.toString(),
- durable,
- owner == null ? null : owner.toString(),
- autoDelete,
- exclusive, virtualHost, FieldTable.convertToMap(arguments));
- }
-
- /**
- * @param id the id to use. If default then one is generated from queueName. TODO check correctness of calls that pass a null value.
+ * @param id the id to use.
*/
public static AMQQueue createAMQQueueImpl(UUID id,
String queueName,
@@ -300,7 +280,7 @@ public class AMQQueueFactory
if(dlExchange == null)
{
- dlExchange = exchangeFactory.createExchange(UUIDGenerator.generateUUID(dlExchangeName, virtualHost.getName()), new AMQShortString(dlExchangeName), ExchangeDefaults.FANOUT_EXCHANGE_CLASS, true, false, 0);
+ dlExchange = exchangeFactory.createExchange(UUIDGenerator.generateExchangeUUID(dlExchangeName, virtualHost.getName()), new AMQShortString(dlExchangeName), ExchangeDefaults.FANOUT_EXCHANGE_CLASS, true, false, 0);
exchangeRegistry.registerExchange(dlExchange);
@@ -322,7 +302,7 @@ public class AMQQueueFactory
args.put(X_QPID_DLQ_ENABLED, false);
args.put(X_QPID_MAXIMUM_DELIVERY_COUNT, 0);
- dlQueue = createAMQQueueImpl(UUIDGenerator.generateUUID(dlQueueName, virtualHost.getName()), dlQueueName, true, owner, false, exclusive, virtualHost, args);
+ dlQueue = createAMQQueueImpl(UUIDGenerator.generateQueueUUID(dlQueueName, virtualHost.getName()), dlQueueName, true, owner, false, exclusive, virtualHost, args);
//enter the dlq in the persistent store
virtualHost.getMessageStore().createQueue(dlQueue, FieldTable.convertToFieldTable(args));
@@ -350,42 +330,16 @@ public class AMQQueueFactory
boolean autodelete = config.getAutoDelete();
boolean exclusive = config.getExclusive();
String owner = config.getOwner();
- Map<String,Object> arguments = null;
-
- if(config.isLVQ() || config.getLVQKey() != null)
- {
- arguments = new HashMap<String,Object>();
- arguments.put(QPID_LAST_VALUE_QUEUE, 1);
- arguments.put(QPID_LAST_VALUE_QUEUE_KEY, config.getLVQKey() == null ? QPID_LVQ_KEY : config.getLVQKey());
- }
- else if (config.getPriority() || config.getPriorities() > 0)
- {
- arguments = new HashMap<String,Object>();
- arguments.put(X_QPID_PRIORITIES, config.getPriorities() < 0 ? 10 : config.getPriorities());
- }
- else if (config.getQueueSortKey() != null && !"".equals(config.getQueueSortKey()))
- {
- arguments = new HashMap<String,Object>();
- arguments.put(QPID_QUEUE_SORT_KEY, config.getQueueSortKey());
- }
- if (!config.getAutoDelete() && config.isDeadLetterQueueEnabled())
- {
- if (arguments == null)
- {
- arguments = new HashMap<String,Object>();
- }
- arguments.put(X_QPID_DLQ_ENABLED, true);
- }
+ Map<String, Object> arguments = createQueueArgumentsFromConfig(config);
// we need queues that are defined in config to have deterministic ids.
- UUID id = UUIDGenerator.generateUUID(queueName, host.getName());
+ UUID id = UUIDGenerator.generateQueueUUID(queueName, host.getName());
AMQQueue q = createAMQQueueImpl(id, queueName, durable, owner, autodelete, exclusive, host, arguments);
q.configure(config);
return q;
}
-
/**
* Validates DLQ and DLE names
* <p>
@@ -475,4 +429,43 @@ public class AMQQueueFactory
String dlExchangeName = name + serverConfig.getDeadLetterExchangeSuffix();
return dlExchangeName;
}
+
+ private static Map<String, Object> createQueueArgumentsFromConfig(QueueConfiguration config)
+ {
+ Map<String,Object> arguments = new HashMap<String,Object>();
+
+ if(config.isLVQ() || config.getLVQKey() != null)
+ {
+ arguments.put(QPID_LAST_VALUE_QUEUE, 1);
+ arguments.put(QPID_LAST_VALUE_QUEUE_KEY, config.getLVQKey() == null ? QPID_LVQ_KEY : config.getLVQKey());
+ }
+ else if (config.getPriority() || config.getPriorities() > 0)
+ {
+ arguments.put(X_QPID_PRIORITIES, config.getPriorities() < 0 ? 10 : config.getPriorities());
+ }
+ else if (config.getQueueSortKey() != null && !"".equals(config.getQueueSortKey()))
+ {
+ arguments.put(QPID_QUEUE_SORT_KEY, config.getQueueSortKey());
+ }
+
+ if (!config.getAutoDelete() && config.isDeadLetterQueueEnabled())
+ {
+ arguments.put(X_QPID_DLQ_ENABLED, true);
+ }
+
+ if (config.getDescription() != null && !"".equals(config.getDescription()))
+ {
+ arguments.put(X_QPID_DESCRIPTION, config.getDescription());
+ }
+
+ if (arguments.isEmpty())
+ {
+ return Collections.emptyMap();
+ }
+ else
+ {
+ return arguments;
+ }
+ }
+
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueueList.java Fri Aug 3 12:13:32 2012
@@ -24,17 +24,25 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import java.util.Collections;
+import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
public class ConflationQueueList extends SimpleQueueEntryList
{
+ private static final Logger LOGGER = LoggerFactory.getLogger(ConflationQueueList.class);
private final String _conflationKey;
private final ConcurrentHashMap<Object, AtomicReference<QueueEntry>> _latestValuesMap =
new ConcurrentHashMap<Object, AtomicReference<QueueEntry>>();
+ private final QueueEntry _deleteInProgress = new SimpleQueueEntryImpl(this);
+ private final QueueEntry _newerEntryAlreadyBeenAndGone = new SimpleQueueEntryImpl(this);
+
public ConflationQueueList(AMQQueue queue, String conflationKey)
{
super(queue);
@@ -52,48 +60,98 @@ public class ConflationQueueList extends
return new ConflationQueueEntry(this, message);
}
-
+ /**
+ * Updates the list using super.add and also updates {@link #_latestValuesMap} and discards entries as necessary.
+ */
@Override
public ConflationQueueEntry add(final ServerMessage message)
{
- ConflationQueueEntry entry = (ConflationQueueEntry) (super.add(message));
- AtomicReference<QueueEntry> latestValueReference = null;
+ final ConflationQueueEntry addedEntry = (ConflationQueueEntry) (super.add(message));
- Object value = message.getMessageHeader().getHeader(_conflationKey);
- if(value != null)
+ final Object keyValue = message.getMessageHeader().getHeader(_conflationKey);
+ if (keyValue != null)
{
- latestValueReference = _latestValuesMap.get(value);
- if(latestValueReference == null)
+ if(LOGGER.isDebugEnabled())
{
- _latestValuesMap.putIfAbsent(value, new AtomicReference<QueueEntry>(entry));
- latestValueReference = _latestValuesMap.get(value);
+ LOGGER.debug("Adding entry " + addedEntry + " for message " + message.getMessageNumber() + " with conflation key " + keyValue);
}
- QueueEntry oldEntry;
+ final AtomicReference<QueueEntry> referenceToEntry = new AtomicReference<QueueEntry>(addedEntry);
+ AtomicReference<QueueEntry> entryReferenceFromMap = null;
+ QueueEntry entryFromMap;
+
+ // Iterate until we have got a valid atomic reference object and either the referent is newer than the current
+ // entry, or the current entry has replaced it in the reference. Note that the _deletedEntryPlaceholder is a special value
+ // indicating that the reference object is no longer valid (it is being removed from the map).
+ boolean keepTryingToUpdateEntryReference = true;
do
{
- oldEntry = latestValueReference.get();
+ do
+ {
+ entryReferenceFromMap = getOrPutIfAbsent(keyValue, referenceToEntry);
+
+ // entryFromMap can be either an older entry, a newer entry (added recently by another thread), or addedEntry (if it's for a new key value)
+ entryFromMap = entryReferenceFromMap.get();
+ }
+ while(entryFromMap == _deleteInProgress);
+
+ boolean entryFromMapIsOlder = entryFromMap != _newerEntryAlreadyBeenAndGone && entryFromMap.compareTo(addedEntry) < 0;
+
+ keepTryingToUpdateEntryReference = entryFromMapIsOlder
+ && !entryReferenceFromMap.compareAndSet(entryFromMap, addedEntry);
}
- while(oldEntry.compareTo(entry) < 0 && !latestValueReference.compareAndSet(oldEntry, entry));
+ while(keepTryingToUpdateEntryReference);
- if(oldEntry.compareTo(entry) < 0)
+ if (entryFromMap == _newerEntryAlreadyBeenAndGone)
+ {
+ discardEntry(addedEntry);
+ }
+ else if (entryFromMap.compareTo(addedEntry) > 0)
{
- // We replaced some other entry to become the newest value
- if(oldEntry.acquire())
+ if(LOGGER.isDebugEnabled())
{
- discardEntry(oldEntry);
+ LOGGER.debug("New entry " + addedEntry.getEntryId() + " for message " + addedEntry.getMessage().getMessageNumber() + " being immediately discarded because a newer entry arrived. The newer entry is: " + entryFromMap + " for message " + entryFromMap.getMessage().getMessageNumber());
}
+ discardEntry(addedEntry);
}
- else if (oldEntry.compareTo(entry) > 0)
+ else if (entryFromMap.compareTo(addedEntry) < 0)
{
- // A newer entry came along
- discardEntry(entry);
-
+ if(LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Entry " + addedEntry + " for message " + addedEntry.getMessage().getMessageNumber() + " replacing older entry " + entryFromMap + " for message " + entryFromMap.getMessage().getMessageNumber());
+ }
+ discardEntry(entryFromMap);
}
+
+ addedEntry.setLatestValueReference(entryReferenceFromMap);
}
- entry.setLatestValueReference(latestValueReference);
- return entry;
+ return addedEntry;
+ }
+
+ /**
+ * Returns:
+ *
+ * <ul>
+ * <li>the existing entry reference if the value already exists in the map, or</li>
+ * <li>referenceToValue if none exists, or</li>
+ * <li>a reference to {@link #_newerEntryAlreadyBeenAndGone} if another thread concurrently
+ * adds and removes during execution of this method.</li>
+ * </ul>
+ */
+ private AtomicReference<QueueEntry> getOrPutIfAbsent(final Object key, final AtomicReference<QueueEntry> referenceToAddedValue)
+ {
+ AtomicReference<QueueEntry> latestValueReference = _latestValuesMap.putIfAbsent(key, referenceToAddedValue);
+
+ if(latestValueReference == null)
+ {
+ latestValueReference = _latestValuesMap.get(key);
+ if(latestValueReference == null)
+ {
+ return new AtomicReference<QueueEntry>(_newerEntryAlreadyBeenAndGone);
+ }
+ }
+ return latestValueReference;
}
private void discardEntry(final QueueEntry entry)
@@ -104,11 +162,13 @@ public class ConflationQueueList extends
txn.dequeue(entry.getQueue(),entry.getMessage(),
new ServerTransaction.Action()
{
+ @Override
public void postCommit()
{
entry.discard();
}
+ @Override
public void onRollback()
{
@@ -120,7 +180,6 @@ public class ConflationQueueList extends
private final class ConflationQueueEntry extends SimpleQueueEntryImpl
{
-
private AtomicReference<QueueEntry> _latestValueReference;
public ConflationQueueEntry(SimpleQueueEntryList queueEntryList, ServerMessage message)
@@ -128,25 +187,56 @@ public class ConflationQueueList extends
super(queueEntryList, message);
}
-
+ @Override
public void release()
{
super.release();
- if(_latestValueReference != null)
+ discardIfReleasedEntryIsNoLongerLatest();
+ }
+
+ @Override
+ public boolean delete()
+ {
+ if(super.delete())
{
- if(_latestValueReference.get() != this)
+ if(_latestValueReference != null && _latestValueReference.compareAndSet(this, _deleteInProgress))
{
- discardEntry(this);
+ Object key = getMessageHeader().getHeader(_conflationKey);
+ _latestValuesMap.remove(key,_latestValueReference);
}
+ return true;
+ }
+ else
+ {
+ return false;
}
-
}
public void setLatestValueReference(final AtomicReference<QueueEntry> latestValueReference)
{
_latestValueReference = latestValueReference;
}
+
+ private void discardIfReleasedEntryIsNoLongerLatest()
+ {
+ if(_latestValueReference != null)
+ {
+ if(_latestValueReference.get() != this)
+ {
+ discardEntry(this);
+ }
+ }
+ }
+
+ }
+
+ /**
+ * Exposed purposes of unit test only.
+ */
+ Map<Object, AtomicReference<QueueEntry>> getLatestValuesMap()
+ {
+ return Collections.unmodifiableMap(_latestValuesMap);
}
static class Factory implements QueueEntryListFactory
@@ -163,5 +253,4 @@ public class ConflationQueueList extends
return new ConflationQueueList(queue, _conflationKey);
}
}
-
}
Modified: qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java?rev=1368910&r1=1368909&r2=1368910&view=diff
==============================================================================
--- qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java (original)
+++ qpid/branches/asyncstore/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java Fri Aug 3 12:13:32 2012
@@ -20,12 +20,10 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@@ -33,11 +31,11 @@ import java.util.concurrent.ConcurrentMa
public class DefaultQueueRegistry implements QueueRegistry
{
- private static final Logger LOGGER = Logger.getLogger(DefaultExchangeRegistry.class);
-
private ConcurrentMap<AMQShortString, AMQQueue> _queueMap = new ConcurrentHashMap<AMQShortString, AMQQueue>();
private final VirtualHost _virtualHost;
+ private final Collection<RegistryChangeListener> _listeners =
+ new ArrayList<RegistryChangeListener>();
public DefaultQueueRegistry(VirtualHost virtualHost)
{
@@ -52,11 +50,28 @@ public class DefaultQueueRegistry implem
public void registerQueue(AMQQueue queue)
{
_queueMap.put(queue.getNameShortString(), queue);
+ synchronized (_listeners)
+ {
+ for(RegistryChangeListener listener : _listeners)
+ {
+ listener.queueRegistered(queue);
+ }
+ }
}
public void unregisterQueue(AMQShortString name)
{
- _queueMap.remove(name);
+ AMQQueue q = _queueMap.remove(name);
+ if(q != null)
+ {
+ synchronized (_listeners)
+ {
+ for(RegistryChangeListener listener : _listeners)
+ {
+ listener.queueUnregistered(q);
+ }
+ }
+ }
}
public AMQQueue getQueue(AMQShortString name)
@@ -79,19 +94,30 @@ public class DefaultQueueRegistry implem
return getQueue(new AMQShortString(queue));
}
+ public void addRegistryChangeListener(RegistryChangeListener listener)
+ {
+ synchronized(_listeners)
+ {
+ _listeners.add(listener);
+ }
+ }
+
@Override
public void stopAllAndUnregisterMBeans()
{
for (final AMQQueue queue : getQueues())
{
queue.stop();
- try
- {
- queue.getManagedObject().unregister();
- }
- catch (AMQException e)
+
+ //TODO: this is a bit of a hack, what if the listeners aren't aware
+ //that we are just unregistering the MBean because of HA, and aren't
+ //actually removing the queue as such.
+ synchronized (_listeners)
{
- LOGGER.warn("Failed to unregister mbean", e);
+ for(RegistryChangeListener listener : _listeners)
+ {
+ listener.queueUnregistered(queue);
+ }
}
}
_queueMap.clear();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org