You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/10/12 01:46:39 UTC
svn commit: r1631137 [1/4] - in
/qpid/branches/QPID-6125-ProtocolRefactoring/java:
broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/
broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/...
Author: rgodfrey
Date: Sat Oct 11 23:46:39 2014
New Revision: 1631137
URL: http://svn.apache.org/r1631137
Log:
Move channel methods
Added:
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessorImpl.java (with props)
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTagInUseException.java (with props)
Removed:
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
Modified:
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessor.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodProcessor.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1631137&r1=1631136&r2=1631137&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Sat Oct 11 23:46:39 2014
@@ -105,8 +105,8 @@ import org.apache.qpid.server.util.Conne
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.transport.TransportException;
-public class AMQChannel<T extends AMQProtocolSession<T>>
- implements AMQSessionModel<AMQChannel<T>,T>,
+public class AMQChannel
+ implements AMQSessionModel<AMQChannel, AMQProtocolEngine>,
AsyncAutoCommitTransaction.FutureRecorder
{
public static final int DEFAULT_PREFETCH = 4096;
@@ -158,7 +158,7 @@ public class AMQChannel<T extends AMQPro
private final AtomicLong _txnRejects = new AtomicLong(0);
private final AtomicLong _txnCount = new AtomicLong(0);
- private final T _session;
+ private final AMQProtocolEngine _connection;
private AtomicBoolean _closing = new AtomicBoolean(false);
private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>());
@@ -180,8 +180,8 @@ public class AMQChannel<T extends AMQPro
private final TransactionTimeoutHelper _transactionTimeoutHelper;
private final UUID _id = UUID.randomUUID();
- private final List<Action<? super AMQChannel<T>>> _taskList =
- new CopyOnWriteArrayList<Action<? super AMQChannel<T>>>();
+ private final List<Action<? super AMQChannel>> _taskList =
+ new CopyOnWriteArrayList<Action<? super AMQChannel>>();
private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction();
@@ -191,17 +191,18 @@ public class AMQChannel<T extends AMQPro
private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
private Session<?> _modelObject;
+ private ChannelMethodProcessor _channelMethodProcessor;
- public AMQChannel(T session, int channelId, final MessageStore messageStore)
+ public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore)
throws AMQException
{
- _session = session;
+ _connection = connection;
_channelId = channelId;
- _subject = new Subject(false, session.getAuthorizedSubject().getPrincipals(),
- session.getAuthorizedSubject().getPublicCredentials(),
- session.getAuthorizedSubject().getPrivateCredentials());
+ _subject = new Subject(false, connection.getAuthorizedSubject().getPrincipals(),
+ connection.getAuthorizedSubject().getPublicCredentials(),
+ connection.getAuthorizedSubject().getPrivateCredentials());
_subject.getPrincipals().add(new SessionPrincipal(this));
_logSubject = new ChannelLogSubject(this);
@@ -210,7 +211,7 @@ public class AMQChannel<T extends AMQPro
// by default the session is non-transactional
_transaction = new AsyncAutoCommitTransaction(_messageStore, this);
- _clientDeliveryMethod = session.createDeliveryMethod(_channelId);
+ _clientDeliveryMethod = connection.createDeliveryMethod(_channelId);
_transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, new CloseAction()
{
@@ -238,6 +239,7 @@ public class AMQChannel<T extends AMQPro
return null;
}
});
+ _channelMethodProcessor = new ChannelMethodProcessorImpl(this);
}
@@ -249,7 +251,7 @@ public class AMQChannel<T extends AMQPro
@Override
public long getActivityTime()
{
- return _session.getLastReceivedTime();
+ return _connection.getLastReceivedTime();
}
});
_txnStarts.incrementAndGet();
@@ -354,7 +356,7 @@ public class AMQChannel<T extends AMQPro
final MessageMetaData messageMetaData =
new MessageMetaData(_currentMessage.getMessagePublishInfo(),
_currentMessage.getContentHeader(),
- getProtocolSession().getLastReceivedTime());
+ getConnection().getLastReceivedTime());
final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(messageMetaData);
final AMQMessage amqMessage = createAMQMessage(_currentMessage, handle);
@@ -429,7 +431,7 @@ public class AMQChannel<T extends AMQPro
{
long bodySize = _currentMessage.getSize();
long timestamp = _currentMessage.getContentHeader().getProperties().getTimestamp();
- _session.registerMessageReceived(bodySize, timestamp);
+ _connection.registerMessageReceived(bodySize, timestamp);
_currentMessage = null;
}
}
@@ -442,13 +444,13 @@ public class AMQChannel<T extends AMQPro
* Pre-requisite: the current message is judged to have no destination queues.
*
* @throws AMQConnectionException if the message is mandatory close-on-no-route
- * @see AMQProtocolSession#isCloseWhenNoRoute()
+ * @see AMQProtocolEngine#isCloseWhenNoRoute()
*/
private void handleUnroutableMessage(AMQMessage message) throws AMQConnectionException
{
boolean mandatory = message.isMandatory();
String description = currentMessageDescription();
- boolean closeOnNoRoute = _session.isCloseWhenNoRoute();
+ boolean closeOnNoRoute = _connection.isCloseWhenNoRoute();
if(_logger.isDebugEnabled())
{
@@ -457,13 +459,13 @@ public class AMQChannel<T extends AMQPro
description, mandatory, isTransactional(), closeOnNoRoute));
}
- if (mandatory && isTransactional() && _session.isCloseWhenNoRoute())
+ if (mandatory && isTransactional() && _connection.isCloseWhenNoRoute())
{
throw new AMQConnectionException(
AMQConstant.NO_ROUTE,
"No route for message " + currentMessageDescription(),
0, 0, // default class and method ids
- getProtocolSession().getMethodRegistry(),
+ getConnection().getMethodRegistry(),
(Throwable) null);
}
@@ -564,9 +566,10 @@ public class AMQChannel<T extends AMQPro
*/
public AMQShortString consumeFromSource(AMQShortString tag, Collection<MessageSource> sources, boolean acks,
FieldTable filters, boolean exclusive, boolean noLocal)
- throws AMQException, MessageSource.ExistingConsumerPreventsExclusive,
- MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException,
- MessageSource.ConsumerAccessRefused
+ throws MessageSource.ExistingConsumerPreventsExclusive,
+ MessageSource.ExistingExclusiveConsumer,
+ AMQInvalidArgumentException,
+ MessageSource.ConsumerAccessRefused, ConsumerTagInUseException
{
if (tag == null)
{
@@ -575,7 +578,7 @@ public class AMQChannel<T extends AMQPro
if (_tag2SubscriptionTargetMap.containsKey(tag))
{
- throw new AMQException("Consumer already exists with same tag: " + tag);
+ throw new ConsumerTagInUseException("Consumer already exists with same tag: " + tag);
}
ConsumerTarget_0_8 target;
@@ -647,27 +650,11 @@ public class AMQChannel<T extends AMQPro
}
}
}
- catch (AccessControlException e)
- {
- _tag2SubscriptionTargetMap.remove(tag);
- throw e;
- }
- catch (MessageSource.ExistingExclusiveConsumer e)
- {
- _tag2SubscriptionTargetMap.remove(tag);
- throw e;
- }
- catch (MessageSource.ExistingConsumerPreventsExclusive e)
- {
- _tag2SubscriptionTargetMap.remove(tag);
- throw e;
- }
- catch (AMQInvalidArgumentException e)
- {
- _tag2SubscriptionTargetMap.remove(tag);
- throw e;
- }
- catch (MessageSource.ConsumerAccessRefused e)
+ catch (AccessControlException
+ | MessageSource.ExistingExclusiveConsumer
+ | MessageSource.ExistingConsumerPreventsExclusive
+ | AMQInvalidArgumentException
+ | MessageSource.ConsumerAccessRefused e)
{
_tag2SubscriptionTargetMap.remove(tag);
throw e;
@@ -728,7 +715,7 @@ public class AMQChannel<T extends AMQPro
unsubscribeAllConsumers();
- for (Action<? super AMQChannel<T>> task : _taskList)
+ for (Action<? super AMQChannel> task : _taskList)
{
task.performAction(this);
}
@@ -895,9 +882,8 @@ public class AMQChannel<T extends AMQPro
/**
* Called to resend all outstanding unacknowledged messages to this same channel.
*
- * @throws AMQException When something goes wrong.
*/
- public void resend() throws AMQException
+ public void resend()
{
@@ -983,9 +969,8 @@ public class AMQChannel<T extends AMQPro
* @param multiple if true will acknowledge all messages up to an including the delivery tag. if false only
* acknowledges the single message specified by the delivery tag
*
- * @throws AMQException if the delivery tag is unknown (e.g. not outstanding) on this channel
*/
- public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
+ public void acknowledgeMessage(long deliveryTag, boolean multiple)
{
Collection<MessageInstance> ackedMessages = getAckedMessages(deliveryTag, multiple);
_transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages));
@@ -1082,22 +1067,13 @@ public class AMQChannel<T extends AMQPro
public boolean isSuspended()
{
- return _suspended.get() || _closing.get() || _session.isClosing();
- }
-
- public void commit() throws AMQException
- {
- commit(null, false);
+ return _suspended.get() || _closing.get() || _connection.isClosing();
}
- public void commit(final Runnable immediateAction, boolean async) throws AMQException
+ public void commit(final Runnable immediateAction, boolean async)
{
- if (!isTransactional())
- {
- throw new AMQException("Fatal error: commit called on non-transactional channel");
- }
if(async && _transaction instanceof LocalTransaction)
{
@@ -1130,17 +1106,8 @@ public class AMQChannel<T extends AMQPro
}
}
- public void rollback() throws AMQException
- {
- rollback(NULL_TASK);
- }
-
- public void rollback(Runnable postRollbackTask) throws AMQException
+ public void rollback(Runnable postRollbackTask)
{
- if (!isTransactional())
- {
- throw new AMQException("Fatal error: commit called on non-transactional channel");
- }
// stop all subscriptions
_rollingBack = true;
@@ -1198,7 +1165,7 @@ public class AMQChannel<T extends AMQPro
public String toString()
{
- return "("+ _suspended.get() + ", " + _closing.get() + ", " + _session.isClosing() + ") "+"["+_session.toString()+":"+_channelId+"]";
+ return "("+ _suspended.get() + ", " + _closing.get() + ", " + _connection.isClosing() + ") "+"["+ _connection.toString()+":"+_channelId+"]";
}
public void setDefaultQueue(AMQQueue queue)
@@ -1217,9 +1184,9 @@ public class AMQChannel<T extends AMQPro
return _closing.get();
}
- public AMQProtocolSession getProtocolSession()
+ public AMQProtocolEngine getConnection()
{
- return _session;
+ return _connection;
}
public FlowCreditManager getCreditManager()
@@ -1262,7 +1229,7 @@ public class AMQChannel<T extends AMQPro
throws AMQException
{
- AMQMessage message = new AMQMessage(handle, _session.getReference());
+ AMQMessage message = new AMQMessage(handle, _connection.getReference());
final BasicContentHeaderProperties properties =
incomingMessage.getContentHeader().getProperties();
@@ -1273,7 +1240,7 @@ public class AMQChannel<T extends AMQPro
private boolean checkMessageUserId(ContentHeaderBody header)
{
AMQShortString userID = header.getProperties().getUserId();
- return (!_messageAuthorizationRequired || _session.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString()));
+ return (!_messageAuthorizationRequired || _connection.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString()));
}
@@ -1284,14 +1251,14 @@ public class AMQChannel<T extends AMQPro
}
@Override
- public T getConnectionModel()
+ public AMQProtocolEngine getConnectionModel()
{
- return _session;
+ return _connection;
}
public String getClientID()
{
- return String.valueOf(_session.getContextKey());
+ return String.valueOf(_connection.getContextKey());
}
public LogSubject getLogSubject()
@@ -1306,13 +1273,13 @@ public class AMQChannel<T extends AMQPro
}
@Override
- public void addDeleteTask(final Action<? super AMQChannel<T>> task)
+ public void addDeleteTask(final Action<? super AMQChannel> task)
{
_taskList.add(task);
}
@Override
- public void removeDeleteTask(final Action<? super AMQChannel<T>> task)
+ public void removeDeleteTask(final Action<? super AMQChannel> task)
{
_taskList.remove(task);
}
@@ -1324,8 +1291,7 @@ public class AMQChannel<T extends AMQPro
public ChannelMethodProcessor getMethodProcessor()
{
- // TODO
- return null;
+ return _channelMethodProcessor;
}
@@ -1356,7 +1322,7 @@ public class AMQChannel<T extends AMQPro
public void postCommit()
{
final ProtocolOutputConverter outputConverter =
- _session.getProtocolOutputConverter();
+ _connection.getProtocolOutputConverter();
outputConverter.writeReturn(message.getMessagePublishInfo(),
message.getContentHeaderBody(),
@@ -1479,7 +1445,7 @@ public class AMQChannel<T extends AMQPro
public void postCommit()
{
AMQMessage message = _reference.getMessage();
- _session.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(),
+ _connection.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(),
message.getContentHeaderBody(),
message,
_channelId,
@@ -1548,7 +1514,7 @@ public class AMQChannel<T extends AMQPro
@Override
public Object getConnectionReference()
{
- return getProtocolSession().getReference();
+ return getConnection().getReference();
}
public int getUnacknowledgedMessageCount()
@@ -1558,9 +1524,9 @@ public class AMQChannel<T extends AMQPro
private void flow(boolean flow)
{
- MethodRegistry methodRegistry = _session.getMethodRegistry();
+ MethodRegistry methodRegistry = _connection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createChannelFlowBody(flow);
- _session.writeFrame(responseBody.generateFrame(_channelId));
+ _connection.writeFrame(responseBody.generateFrame(_channelId));
}
@Override
@@ -1571,7 +1537,7 @@ public class AMQChannel<T extends AMQPro
public VirtualHostImpl getVirtualHost()
{
- return getProtocolSession().getVirtualHost();
+ return getConnection().getVirtualHost();
}
public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose)
@@ -1585,11 +1551,11 @@ public class AMQChannel<T extends AMQPro
*/
private void closeConnection(String reason) throws AMQException
{
- Lock receivedLock = _session.getReceivedLock();
+ Lock receivedLock = _connection.getReceivedLock();
receivedLock.lock();
try
{
- _session.close(AMQConstant.RESOURCE_ERROR, reason);
+ _connection.close(AMQConstant.RESOURCE_ERROR, reason);
}
finally
{
@@ -1597,7 +1563,7 @@ public class AMQChannel<T extends AMQPro
}
}
- public void deadLetter(long deliveryTag) throws AMQException
+ public void deadLetter(long deliveryTag)
{
final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
final MessageInstance rejectedQueueEntry = unackedMap.remove(deliveryTag);
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1631137&r1=1631136&r2=1631137&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Sat Oct 11 23:46:39 2014
@@ -57,6 +57,7 @@ import org.apache.qpid.framing.*;
import org.apache.qpid.properties.ConnectionStartProperties;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.protocol.ServerProtocolEngine;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.connection.ConnectionPrincipal;
@@ -70,6 +71,7 @@ import org.apache.qpid.server.message.Se
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.SessionModelListener;
import org.apache.qpid.server.security.SubjectCreator;
@@ -84,7 +86,9 @@ import org.apache.qpid.transport.Transpo
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.util.BytesDataOutput;
-public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession<AMQProtocolEngine>
+public class AMQProtocolEngine implements ServerProtocolEngine,
+ AMQConnectionModel<AMQProtocolEngine, AMQChannel>,
+ AMQVersionAwareProtocolSession
{
private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
@@ -103,13 +107,12 @@ public class AMQProtocolEngine implement
private VirtualHostImpl<?,?,?> _virtualHost;
- private final Map<Integer, AMQChannel<AMQProtocolEngine>> _channelMap =
- new HashMap<Integer, AMQChannel<AMQProtocolEngine>>();
+ private final Map<Integer, AMQChannel> _channelMap =
+ new HashMap<Integer, AMQChannel>();
private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners =
new CopyOnWriteArrayList<SessionModelListener>();
- @SuppressWarnings("unchecked")
- private final AMQChannel<AMQProtocolEngine>[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1];
+ private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1];
/**
* The channels that the latest call to {@link #received(ByteBuffer)} applied to.
@@ -118,8 +121,8 @@ public class AMQProtocolEngine implement
*
* Thread-safety: guarded by {@link #_receivedLock}.
*/
- private final Set<AMQChannel<AMQProtocolEngine>> _channelsForCurrentMessage =
- new HashSet<AMQChannel<AMQProtocolEngine>>();
+ private final Set<AMQChannel> _channelsForCurrentMessage =
+ new HashSet<AMQChannel>();
private AMQDecoder _decoder;
@@ -365,7 +368,7 @@ public class AMQProtocolEngine implement
{
RuntimeException exception = null;
- for (AMQChannel<AMQProtocolEngine> channel : _channelsForCurrentMessage)
+ for (AMQChannel channel : _channelsForCurrentMessage)
{
try
{
@@ -428,7 +431,7 @@ public class AMQProtocolEngine implement
private void frameReceived(AMQFrame frame) throws AMQException
{
int channelId = frame.getChannel();
- AMQChannel<AMQProtocolEngine> amqChannel = _channelMap.get(channelId);
+ AMQChannel amqChannel = _channelMap.get(channelId);
if(amqChannel != null)
{
// The _receivedLock is already acquired in the caller
@@ -638,8 +641,17 @@ public class AMQProtocolEngine implement
_logger.info("Closing channel due to: " + e.getMessage());
}
- writeFrame(e.getCloseFrame(channelId));
- closeChannel(channelId, e.getErrorCode() == null ? AMQConstant.INTERNAL_ERROR : e.getErrorCode(), e.getMessage());
+ AMQConstant errorType = e.getErrorCode();
+ if(errorType == null)
+ {
+ errorType = AMQConstant.INTERNAL_ERROR;
+ }
+ writeFrame(new AMQFrame(channelId,
+ getMethodRegistry().createChannelCloseBody(errorType.getCode(),
+ AMQShortString.validValueOf(e.getMessage()),
+ e.getClassId(),
+ e.getMethodId())));
+ closeChannel(channelId, errorType, e.getMessage());
}
else
{
@@ -730,7 +742,7 @@ public class AMQProtocolEngine implement
public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
{
- AMQChannel<AMQProtocolEngine> channel = getAndAssertChannel(channelId);
+ AMQChannel channel = getAndAssertChannel(channelId);
channel.publishContentHeader(body);
@@ -738,7 +750,7 @@ public class AMQProtocolEngine implement
public void contentBodyReceived(int channelId, ContentBody body) throws AMQException
{
- AMQChannel<AMQProtocolEngine> channel = getAndAssertChannel(channelId);
+ AMQChannel channel = getAndAssertChannel(channelId);
channel.publishContentBody(body);
}
@@ -786,17 +798,17 @@ public class AMQProtocolEngine implement
_contextKey = contextKey;
}
- public List<AMQChannel<AMQProtocolEngine>> getChannels()
+ public List<AMQChannel> getChannels()
{
synchronized (_channelMap)
{
- return new ArrayList<AMQChannel<AMQProtocolEngine>>(_channelMap.values());
+ return new ArrayList<AMQChannel>(_channelMap.values());
}
}
- public AMQChannel<AMQProtocolEngine> getAndAssertChannel(int channelId) throws AMQException
+ public AMQChannel getAndAssertChannel(int channelId) throws AMQException
{
- AMQChannel<AMQProtocolEngine> channel = getChannel(channelId);
+ AMQChannel channel = getChannel(channelId);
if (channel == null)
{
throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId);
@@ -805,9 +817,9 @@ public class AMQProtocolEngine implement
return channel;
}
- public AMQChannel<AMQProtocolEngine> getChannel(int channelId)
+ public AMQChannel getChannel(int channelId)
{
- final AMQChannel<AMQProtocolEngine> channel =
+ final AMQChannel channel =
((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId);
if ((channel == null) || channel.isClosing())
{
@@ -824,7 +836,7 @@ public class AMQProtocolEngine implement
return !_closingChannelsList.isEmpty() && _closingChannelsList.containsKey(channelId);
}
- public void addChannel(AMQChannel<AMQProtocolEngine> channel) throws AMQException
+ public void addChannel(AMQChannel channel) throws AMQException
{
if (_closed)
{
@@ -891,52 +903,52 @@ public class AMQProtocolEngine implement
_maxNoOfChannels = value;
}
- /**
- * Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue
- * subscriptions (this may in turn remove queues if they are auto delete</li> </ul>
- *
- * @param channelId id of the channel to close
- *
- * @throws IllegalArgumentException if the channel id is not valid
- */
- @Override
- public void closeChannel(int channelId)
+
+ void closeChannel(AMQChannel channel)
{
- closeChannel(channelId, null, null);
+ closeChannel(channel, null, null, false);
+ }
+
+ public void closeChannelAndWriteFrame(AMQChannel channel, AMQConstant cause, String message)
+ {
+ writeFrame(new AMQFrame(channel.getChannelId(),
+ getMethodRegistry().createChannelCloseBody(cause.getCode(),
+ AMQShortString.validValueOf(message),
+ _methodProcessor.getClassId(),
+ _methodProcessor.getMethodId())));
+ closeChannel(channel, cause, message, true);
}
public void closeChannel(int channelId, AMQConstant cause, String message)
{
- final AMQChannel<AMQProtocolEngine> channel = getChannel(channelId);
+ final AMQChannel channel = getChannel(channelId);
if (channel == null)
{
throw new IllegalArgumentException("Unknown channel id");
}
- else
+ closeChannel(channel, cause, message, true);
+ }
+
+ void closeChannel(AMQChannel channel, AMQConstant cause, String message, boolean mark)
+ {
+ int channelId = channel.getChannelId();
+ try
{
- try
+ channel.close(cause, message);
+ if(mark)
{
- channel.close(cause, message);
markChannelAwaitingCloseOk(channelId);
}
- finally
- {
- removeChannel(channelId);
- }
+ }
+ finally
+ {
+ removeChannel(channelId);
}
}
+
public void closeChannelOk(int channelId)
{
- // todo QPID-847 - This is called from two locations ChannelCloseHandler and ChannelCloseOkHandler.
- // When it is the CC_OK_Handler then it makes sense to remove the channel else we will leak memory.
- // We do it from the Close Handler as we are sending the OK back to the client.
- // While this is AMQP spec compliant. The Java client in the event of an IllegalArgumentException
- // will send a close-ok.. Where we should call removeChannel.
- // However, due to the poor exception handling on the client. The client-user will be notified of the
- // InvalidArgument and if they then decide to close the session/connection then the there will be time
- // for that to occur i.e. a new close method be sent before the exception handling can mark the session closed.
-
_closingChannelsList.remove(channelId);
}
@@ -952,7 +964,7 @@ public class AMQProtocolEngine implement
*/
public void removeChannel(int channelId)
{
- AMQChannel<AMQProtocolEngine> session;
+ AMQChannel session;
synchronized (_channelMap)
{
session = _channelMap.remove(channelId);
@@ -988,7 +1000,7 @@ public class AMQProtocolEngine implement
*/
private void closeAllChannels()
{
- for (AMQChannel<AMQProtocolEngine> channel : getChannels())
+ for (AMQChannel channel : getChannels())
{
channel.close();
}
@@ -1003,7 +1015,6 @@ public class AMQProtocolEngine implement
}
/** This must be called when the session is _closed in order to free up any resources managed by the session. */
- @Override
public void closeSession()
{
@@ -1103,16 +1114,14 @@ public class AMQProtocolEngine implement
void closeConnection(AMQConstant errorCode,
- String message, int channelId,
- int classId,
- int methodId)
+ String message, int channelId)
{
if (_logger.isInfoEnabled())
{
_logger.info("Closing connection due to: " + message);
}
- closeConnection(channelId, new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), errorCode.getCode(), AMQShortString.validValueOf(message), classId, methodId)));
+ closeConnection(channelId, new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), errorCode.getCode(), AMQShortString.validValueOf(message), _methodProcessor.getClassId(), _methodProcessor.getMethodId())));
}
private void closeConnection(int channelId, AMQFrame frame)
@@ -1137,7 +1146,6 @@ public class AMQProtocolEngine implement
}
- @Override
public void closeProtocolSession()
{
_network.close();
@@ -1520,7 +1528,7 @@ public class AMQProtocolEngine implement
return String.valueOf(getRemoteAddress());
}
- public void closeSession(AMQChannel<AMQProtocolEngine> session, AMQConstant cause, String message)
+ public void closeSession(AMQChannel session, AMQConstant cause, String message)
{
int channelId = session.getChannelId();
closeChannel(channelId, cause, message);
@@ -1549,7 +1557,7 @@ public class AMQProtocolEngine implement
if(!_blocking)
{
_blocking = true;
- for(AMQChannel<AMQProtocolEngine> channel : _channelMap.values())
+ for(AMQChannel channel : _channelMap.values())
{
channel.block();
}
@@ -1564,7 +1572,7 @@ public class AMQProtocolEngine implement
if(_blocking)
{
_blocking = false;
- for(AMQChannel<AMQProtocolEngine> channel : _channelMap.values())
+ for(AMQChannel channel : _channelMap.values())
{
channel.unblock();
}
@@ -1577,9 +1585,9 @@ public class AMQProtocolEngine implement
return _closed;
}
- public List<AMQChannel<AMQProtocolEngine>> getSessionModels()
+ public List<AMQChannel> getSessionModels()
{
- return new ArrayList<AMQChannel<AMQProtocolEngine>>(getChannels());
+ return new ArrayList<AMQChannel>(getChannels());
}
public LogSubject getLogSubject()
@@ -1715,31 +1723,26 @@ public class AMQProtocolEngine implement
return _lastWriteTime.get();
}
- @Override
public boolean isCloseWhenNoRoute()
{
return _closeWhenNoRoute;
}
- @Override
public boolean isCompressionSupported()
{
return _compressionSupported && _broker.isMessageCompressionEnabled();
}
- @Override
public int getMessageCompressionThreshold()
{
return _messageCompressionThreshold;
}
- @Override
public Broker<?> getBroker()
{
return _broker;
}
- @Override
public SubjectCreator getSubjectCreator()
{
return _broker.getSubjectCreator(getLocalAddress(), getTransport().isSecure());
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessor.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessor.java?rev=1631137&r1=1631136&r2=1631137&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessor.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessor.java Sat Oct 11 23:46:39 2014
@@ -20,6 +20,90 @@
*/
package org.apache.qpid.server.protocol.v0_8;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
public interface ChannelMethodProcessor
{
+ void receiveAccessRequest(AMQShortString realm,
+ boolean exclusive,
+ boolean passive,
+ boolean active,
+ boolean write,
+ boolean read);
+
+ void receiveBasicAck(long deliveryTag, boolean multiple);
+
+ void receiveBasicCancel(AMQShortString consumerTag, boolean nowait);
+
+ void receiveBasicConsume(AMQShortString queue,
+ AMQShortString consumerTag,
+ boolean noLocal,
+ boolean noAck,
+ boolean exclusive,
+ boolean nowait,
+ FieldTable arguments);
+
+ void receiveBasicGet(AMQShortString queue, boolean noAck);
+
+ void receiveBasicPublish(AMQShortString exchange,
+ AMQShortString routingKey,
+ boolean mandatory,
+ boolean immediate);
+
+ void receiveBasicQos(long prefetchSize, int prefetchCount, boolean global);
+
+ void receiveBasicRecover(boolean requeue, boolean sync);
+
+ void receiveBasicReject(long deliveryTag, boolean requeue);
+
+ void receiveChannelClose();
+
+ void receiveChannelCloseOk();
+
+ void receiveChannelFlow(boolean active);
+
+ void receiveExchangeBound(AMQShortString exchange, AMQShortString queue, AMQShortString routingKey);
+
+ void receiveExchangeDeclare(AMQShortString exchange,
+ AMQShortString type,
+ boolean passive,
+ boolean durable,
+ boolean autoDelete,
+ boolean internal,
+ boolean nowait,
+ FieldTable arguments);
+
+ void receiveExchangeDelete(AMQShortString exchange, boolean ifUnused, boolean nowait);
+
+ void receiveQueueBind(AMQShortString queue,
+ AMQShortString exchange,
+ AMQShortString routingKey,
+ boolean nowait,
+ FieldTable arguments);
+
+ void receiveQueueDeclare(AMQShortString queueStr,
+ boolean passive,
+ boolean durable,
+ boolean exclusive,
+ boolean autoDelete,
+ boolean nowait,
+ FieldTable arguments);
+
+ void receiveQueueDelete(AMQShortString queue, boolean ifUnused, boolean ifEmpty, boolean nowait);
+
+ void receiveQueuePurge(AMQShortString queue, boolean nowait);
+
+ void receiveQueueUnbind(AMQShortString queue,
+ AMQShortString exchange,
+ AMQShortString routingKey,
+ FieldTable arguments);
+
+ void receiveTxSelect();
+
+ void receiveTxCommit();
+
+ void receiveTxRollback();
+
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org