You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/10/12 01:46:39 UTC

svn commit: r1631137 [1/4] - in /qpid/branches/QPID-6125-ProtocolRefactoring/java: broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/...

Author: rgodfrey
Date: Sat Oct 11 23:46:39 2014
New Revision: 1631137

URL: http://svn.apache.org/r1631137
Log:
Move channel methods

Added:
    qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessorImpl.java   (with props)
    qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTagInUseException.java   (with props)
Removed:
    qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolSession.java
Modified:
    qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
    qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
    qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessor.java
    qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java
    qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java
    qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ProtocolOutputConverterImpl.java
    qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodDispatcherImpl.java
    qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ServerMethodProcessor.java
    qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMap.java
    qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/UnacknowledgedMessageMapImpl.java
    qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AMQChannelTest.java
    qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java
    qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AcknowledgeTest.java
    qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/BrokerTestHelper_0_8.java
    qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
    qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/QueueBrowserUsesNoAckTest.java
    qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
    qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
    qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/ExchangeBoundOkBody.java
    qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/FrameCreatingMethodProcessor.java
    qpid/branches/QPID-6125-ProtocolRefactoring/java/common/src/main/java/org/apache/qpid/framing/MethodProcessor.java

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java?rev=1631137&r1=1631136&r2=1631137&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java Sat Oct 11 23:46:39 2014
@@ -105,8 +105,8 @@ import org.apache.qpid.server.util.Conne
 import org.apache.qpid.server.virtualhost.VirtualHostImpl;
 import org.apache.qpid.transport.TransportException;
 
-public class AMQChannel<T extends AMQProtocolSession<T>>
-        implements AMQSessionModel<AMQChannel<T>,T>,
+public class AMQChannel
+        implements AMQSessionModel<AMQChannel, AMQProtocolEngine>,
                    AsyncAutoCommitTransaction.FutureRecorder
 {
     public static final int DEFAULT_PREFETCH = 4096;
@@ -158,7 +158,7 @@ public class AMQChannel<T extends AMQPro
     private final AtomicLong _txnRejects = new AtomicLong(0);
     private final AtomicLong _txnCount = new AtomicLong(0);
 
-    private final T _session;
+    private final AMQProtocolEngine _connection;
     private AtomicBoolean _closing = new AtomicBoolean(false);
 
     private final Set<Object> _blockingEntities = Collections.synchronizedSet(new HashSet<Object>());
@@ -180,8 +180,8 @@ public class AMQChannel<T extends AMQPro
     private final TransactionTimeoutHelper _transactionTimeoutHelper;
     private final UUID _id = UUID.randomUUID();
 
-    private final List<Action<? super AMQChannel<T>>> _taskList =
-            new CopyOnWriteArrayList<Action<? super AMQChannel<T>>>();
+    private final List<Action<? super AMQChannel>> _taskList =
+            new CopyOnWriteArrayList<Action<? super AMQChannel>>();
 
 
     private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction();
@@ -191,17 +191,18 @@ public class AMQChannel<T extends AMQPro
     private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
     private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
     private Session<?> _modelObject;
+    private ChannelMethodProcessor _channelMethodProcessor;
 
 
-    public AMQChannel(T session, int channelId, final MessageStore messageStore)
+    public AMQChannel(AMQProtocolEngine connection, int channelId, final MessageStore messageStore)
             throws AMQException
     {
-        _session = session;
+        _connection = connection;
         _channelId = channelId;
 
-        _subject = new Subject(false, session.getAuthorizedSubject().getPrincipals(),
-                               session.getAuthorizedSubject().getPublicCredentials(),
-                               session.getAuthorizedSubject().getPrivateCredentials());
+        _subject = new Subject(false, connection.getAuthorizedSubject().getPrincipals(),
+                               connection.getAuthorizedSubject().getPublicCredentials(),
+                               connection.getAuthorizedSubject().getPrivateCredentials());
         _subject.getPrincipals().add(new SessionPrincipal(this));
         _logSubject = new ChannelLogSubject(this);
 
@@ -210,7 +211,7 @@ public class AMQChannel<T extends AMQPro
         // by default the session is non-transactional
         _transaction = new AsyncAutoCommitTransaction(_messageStore, this);
 
-        _clientDeliveryMethod = session.createDeliveryMethod(_channelId);
+        _clientDeliveryMethod = connection.createDeliveryMethod(_channelId);
 
         _transactionTimeoutHelper = new TransactionTimeoutHelper(_logSubject, new CloseAction()
         {
@@ -238,6 +239,7 @@ public class AMQChannel<T extends AMQPro
                 return null;
             }
         });
+        _channelMethodProcessor = new ChannelMethodProcessorImpl(this);
 
     }
 
@@ -249,7 +251,7 @@ public class AMQChannel<T extends AMQPro
             @Override
             public long getActivityTime()
             {
-                return _session.getLastReceivedTime();
+                return _connection.getLastReceivedTime();
             }
         });
         _txnStarts.incrementAndGet();
@@ -354,7 +356,7 @@ public class AMQChannel<T extends AMQPro
                 final MessageMetaData messageMetaData =
                         new MessageMetaData(_currentMessage.getMessagePublishInfo(),
                                             _currentMessage.getContentHeader(),
-                                            getProtocolSession().getLastReceivedTime());
+                                            getConnection().getLastReceivedTime());
 
                 final StoredMessage<MessageMetaData> handle = _messageStore.addMessage(messageMetaData);
                 final AMQMessage amqMessage = createAMQMessage(_currentMessage, handle);
@@ -429,7 +431,7 @@ public class AMQChannel<T extends AMQPro
             {
                 long bodySize = _currentMessage.getSize();
                 long timestamp = _currentMessage.getContentHeader().getProperties().getTimestamp();
-                _session.registerMessageReceived(bodySize, timestamp);
+                _connection.registerMessageReceived(bodySize, timestamp);
                 _currentMessage = null;
             }
         }
@@ -442,13 +444,13 @@ public class AMQChannel<T extends AMQPro
      * Pre-requisite: the current message is judged to have no destination queues.
      *
      * @throws AMQConnectionException if the message is mandatory close-on-no-route
-     * @see AMQProtocolSession#isCloseWhenNoRoute()
+     * @see AMQProtocolEngine#isCloseWhenNoRoute()
      */
     private void handleUnroutableMessage(AMQMessage message) throws AMQConnectionException
     {
         boolean mandatory = message.isMandatory();
         String description = currentMessageDescription();
-        boolean closeOnNoRoute = _session.isCloseWhenNoRoute();
+        boolean closeOnNoRoute = _connection.isCloseWhenNoRoute();
 
         if(_logger.isDebugEnabled())
         {
@@ -457,13 +459,13 @@ public class AMQChannel<T extends AMQPro
                     description, mandatory, isTransactional(), closeOnNoRoute));
         }
 
-        if (mandatory && isTransactional() && _session.isCloseWhenNoRoute())
+        if (mandatory && isTransactional() && _connection.isCloseWhenNoRoute())
         {
             throw new AMQConnectionException(
                     AMQConstant.NO_ROUTE,
                     "No route for message " + currentMessageDescription(),
                     0, 0, // default class and method ids
-                    getProtocolSession().getMethodRegistry(),
+                    getConnection().getMethodRegistry(),
                     (Throwable) null);
         }
 
@@ -564,9 +566,10 @@ public class AMQChannel<T extends AMQPro
      */
     public AMQShortString consumeFromSource(AMQShortString tag, Collection<MessageSource> sources, boolean acks,
                                             FieldTable filters, boolean exclusive, boolean noLocal)
-            throws AMQException, MessageSource.ExistingConsumerPreventsExclusive,
-                   MessageSource.ExistingExclusiveConsumer, AMQInvalidArgumentException,
-                   MessageSource.ConsumerAccessRefused
+            throws MessageSource.ExistingConsumerPreventsExclusive,
+                   MessageSource.ExistingExclusiveConsumer,
+                   AMQInvalidArgumentException,
+                   MessageSource.ConsumerAccessRefused, ConsumerTagInUseException
     {
         if (tag == null)
         {
@@ -575,7 +578,7 @@ public class AMQChannel<T extends AMQPro
 
         if (_tag2SubscriptionTargetMap.containsKey(tag))
         {
-            throw new AMQException("Consumer already exists with same tag: " + tag);
+            throw new ConsumerTagInUseException("Consumer already exists with same tag: " + tag);
         }
 
         ConsumerTarget_0_8 target;
@@ -647,27 +650,11 @@ public class AMQChannel<T extends AMQPro
                 }
             }
         }
-        catch (AccessControlException e)
-        {
-            _tag2SubscriptionTargetMap.remove(tag);
-            throw e;
-        }
-        catch (MessageSource.ExistingExclusiveConsumer e)
-        {
-            _tag2SubscriptionTargetMap.remove(tag);
-            throw e;
-        }
-        catch (MessageSource.ExistingConsumerPreventsExclusive e)
-        {
-            _tag2SubscriptionTargetMap.remove(tag);
-            throw e;
-        }
-        catch (AMQInvalidArgumentException e)
-        {
-            _tag2SubscriptionTargetMap.remove(tag);
-            throw e;
-        }
-        catch (MessageSource.ConsumerAccessRefused e)
+        catch (AccessControlException
+                | MessageSource.ExistingExclusiveConsumer
+                | MessageSource.ExistingConsumerPreventsExclusive
+                | AMQInvalidArgumentException
+                | MessageSource.ConsumerAccessRefused e)
         {
             _tag2SubscriptionTargetMap.remove(tag);
             throw e;
@@ -728,7 +715,7 @@ public class AMQChannel<T extends AMQPro
 
         unsubscribeAllConsumers();
 
-        for (Action<? super AMQChannel<T>> task : _taskList)
+        for (Action<? super AMQChannel> task : _taskList)
         {
             task.performAction(this);
         }
@@ -895,9 +882,8 @@ public class AMQChannel<T extends AMQPro
     /**
      * Called to resend all outstanding unacknowledged messages to this same channel.
      *
-     * @throws AMQException When something goes wrong.
      */
-    public void resend() throws AMQException
+    public void resend()
     {
 
 
@@ -983,9 +969,8 @@ public class AMQChannel<T extends AMQPro
      * @param multiple    if true will acknowledge all messages up to an including the delivery tag. if false only
      *                    acknowledges the single message specified by the delivery tag
      *
-     * @throws AMQException if the delivery tag is unknown (e.g. not outstanding) on this channel
      */
-    public void acknowledgeMessage(long deliveryTag, boolean multiple) throws AMQException
+    public void acknowledgeMessage(long deliveryTag, boolean multiple)
     {
         Collection<MessageInstance> ackedMessages = getAckedMessages(deliveryTag, multiple);
         _transaction.dequeue(ackedMessages, new MessageAcknowledgeAction(ackedMessages));
@@ -1082,22 +1067,13 @@ public class AMQChannel<T extends AMQPro
 
     public boolean isSuspended()
     {
-        return _suspended.get()  || _closing.get() || _session.isClosing();
-    }
-
-    public void commit() throws AMQException
-    {
-        commit(null, false);
+        return _suspended.get()  || _closing.get() || _connection.isClosing();
     }
 
 
-    public void commit(final Runnable immediateAction, boolean async) throws AMQException
+    public void commit(final Runnable immediateAction, boolean async)
     {
 
-        if (!isTransactional())
-        {
-            throw new AMQException("Fatal error: commit called on non-transactional channel");
-        }
 
         if(async && _transaction instanceof LocalTransaction)
         {
@@ -1130,17 +1106,8 @@ public class AMQChannel<T extends AMQPro
         }
     }
 
-    public void rollback() throws AMQException
-    {
-        rollback(NULL_TASK);
-    }
-
-    public void rollback(Runnable postRollbackTask) throws AMQException
+    public void rollback(Runnable postRollbackTask)
     {
-        if (!isTransactional())
-        {
-            throw new AMQException("Fatal error: commit called on non-transactional channel");
-        }
 
         // stop all subscriptions
         _rollingBack = true;
@@ -1198,7 +1165,7 @@ public class AMQChannel<T extends AMQPro
 
     public String toString()
     {
-        return "("+ _suspended.get() + ", " + _closing.get() + ", " + _session.isClosing() + ") "+"["+_session.toString()+":"+_channelId+"]";
+        return "("+ _suspended.get() + ", " + _closing.get() + ", " + _connection.isClosing() + ") "+"["+ _connection.toString()+":"+_channelId+"]";
     }
 
     public void setDefaultQueue(AMQQueue queue)
@@ -1217,9 +1184,9 @@ public class AMQChannel<T extends AMQPro
         return _closing.get();
     }
 
-    public AMQProtocolSession getProtocolSession()
+    public AMQProtocolEngine getConnection()
     {
-        return _session;
+        return _connection;
     }
 
     public FlowCreditManager getCreditManager()
@@ -1262,7 +1229,7 @@ public class AMQChannel<T extends AMQPro
             throws AMQException
     {
 
-        AMQMessage message = new AMQMessage(handle, _session.getReference());
+        AMQMessage message = new AMQMessage(handle, _connection.getReference());
 
         final BasicContentHeaderProperties properties =
                   incomingMessage.getContentHeader().getProperties();
@@ -1273,7 +1240,7 @@ public class AMQChannel<T extends AMQPro
     private boolean checkMessageUserId(ContentHeaderBody header)
     {
         AMQShortString userID = header.getProperties().getUserId();
-        return (!_messageAuthorizationRequired || _session.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString()));
+        return (!_messageAuthorizationRequired || _connection.getAuthorizedPrincipal().getName().equals(userID == null? "" : userID.toString()));
 
     }
 
@@ -1284,14 +1251,14 @@ public class AMQChannel<T extends AMQPro
     }
 
     @Override
-    public T getConnectionModel()
+    public AMQProtocolEngine getConnectionModel()
     {
-        return _session;
+        return _connection;
     }
 
     public String getClientID()
     {
-        return String.valueOf(_session.getContextKey());
+        return String.valueOf(_connection.getContextKey());
     }
 
     public LogSubject getLogSubject()
@@ -1306,13 +1273,13 @@ public class AMQChannel<T extends AMQPro
     }
 
     @Override
-    public void addDeleteTask(final Action<? super AMQChannel<T>> task)
+    public void addDeleteTask(final Action<? super AMQChannel> task)
     {
         _taskList.add(task);
     }
 
     @Override
-    public void removeDeleteTask(final Action<? super AMQChannel<T>> task)
+    public void removeDeleteTask(final Action<? super AMQChannel> task)
     {
         _taskList.remove(task);
     }
@@ -1324,8 +1291,7 @@ public class AMQChannel<T extends AMQPro
 
     public ChannelMethodProcessor getMethodProcessor()
     {
-        // TODO
-        return null;
+        return _channelMethodProcessor;
     }
 
 
@@ -1356,7 +1322,7 @@ public class AMQChannel<T extends AMQPro
                                     public void postCommit()
                                     {
                                         final ProtocolOutputConverter outputConverter =
-                                                    _session.getProtocolOutputConverter();
+                                                    _connection.getProtocolOutputConverter();
 
                                         outputConverter.writeReturn(message.getMessagePublishInfo(),
                                                                     message.getContentHeaderBody(),
@@ -1479,7 +1445,7 @@ public class AMQChannel<T extends AMQPro
         public void postCommit()
         {
             AMQMessage message = _reference.getMessage();
-            _session.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(),
+            _connection.getProtocolOutputConverter().writeReturn(message.getMessagePublishInfo(),
                                                           message.getContentHeaderBody(),
                                                           message,
                                                           _channelId,
@@ -1548,7 +1514,7 @@ public class AMQChannel<T extends AMQPro
     @Override
     public Object getConnectionReference()
     {
-        return getProtocolSession().getReference();
+        return getConnection().getReference();
     }
 
     public int getUnacknowledgedMessageCount()
@@ -1558,9 +1524,9 @@ public class AMQChannel<T extends AMQPro
 
     private void flow(boolean flow)
     {
-        MethodRegistry methodRegistry = _session.getMethodRegistry();
+        MethodRegistry methodRegistry = _connection.getMethodRegistry();
         AMQMethodBody responseBody = methodRegistry.createChannelFlowBody(flow);
-        _session.writeFrame(responseBody.generateFrame(_channelId));
+        _connection.writeFrame(responseBody.generateFrame(_channelId));
     }
 
     @Override
@@ -1571,7 +1537,7 @@ public class AMQChannel<T extends AMQPro
 
     public VirtualHostImpl getVirtualHost()
     {
-        return getProtocolSession().getVirtualHost();
+        return getConnection().getVirtualHost();
     }
 
     public void checkTransactionStatus(long openWarn, long openClose, long idleWarn, long idleClose)
@@ -1585,11 +1551,11 @@ public class AMQChannel<T extends AMQPro
      */
     private void closeConnection(String reason) throws AMQException
     {
-        Lock receivedLock = _session.getReceivedLock();
+        Lock receivedLock = _connection.getReceivedLock();
         receivedLock.lock();
         try
         {
-            _session.close(AMQConstant.RESOURCE_ERROR, reason);
+            _connection.close(AMQConstant.RESOURCE_ERROR, reason);
         }
         finally
         {
@@ -1597,7 +1563,7 @@ public class AMQChannel<T extends AMQPro
         }
     }
 
-    public void deadLetter(long deliveryTag) throws AMQException
+    public void deadLetter(long deliveryTag)
     {
         final UnacknowledgedMessageMap unackedMap = getUnacknowledgedMessageMap();
         final MessageInstance rejectedQueueEntry = unackedMap.remove(deliveryTag);

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java?rev=1631137&r1=1631136&r2=1631137&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java Sat Oct 11 23:46:39 2014
@@ -57,6 +57,7 @@ import org.apache.qpid.framing.*;
 import org.apache.qpid.properties.ConnectionStartProperties;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 import org.apache.qpid.protocol.ServerProtocolEngine;
 import org.apache.qpid.server.configuration.BrokerProperties;
 import org.apache.qpid.server.connection.ConnectionPrincipal;
@@ -70,6 +71,7 @@ import org.apache.qpid.server.message.Se
 import org.apache.qpid.server.model.Broker;
 import org.apache.qpid.server.model.Port;
 import org.apache.qpid.server.model.Transport;
+import org.apache.qpid.server.protocol.AMQConnectionModel;
 import org.apache.qpid.server.protocol.AMQSessionModel;
 import org.apache.qpid.server.protocol.SessionModelListener;
 import org.apache.qpid.server.security.SubjectCreator;
@@ -84,7 +86,9 @@ import org.apache.qpid.transport.Transpo
 import org.apache.qpid.transport.network.NetworkConnection;
 import org.apache.qpid.util.BytesDataOutput;
 
-public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSession<AMQProtocolEngine>
+public class AMQProtocolEngine implements ServerProtocolEngine,
+                                          AMQConnectionModel<AMQProtocolEngine, AMQChannel>,
+                                          AMQVersionAwareProtocolSession
 {
     private static final Logger _logger = Logger.getLogger(AMQProtocolEngine.class);
 
@@ -103,13 +107,12 @@ public class AMQProtocolEngine implement
 
     private VirtualHostImpl<?,?,?> _virtualHost;
 
-    private final Map<Integer, AMQChannel<AMQProtocolEngine>> _channelMap =
-            new HashMap<Integer, AMQChannel<AMQProtocolEngine>>();
+    private final Map<Integer, AMQChannel> _channelMap =
+            new HashMap<Integer, AMQChannel>();
     private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners =
             new CopyOnWriteArrayList<SessionModelListener>();
 
-    @SuppressWarnings("unchecked")
-    private final AMQChannel<AMQProtocolEngine>[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1];
+    private final AMQChannel[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1];
 
     /**
      * The channels that the latest call to {@link #received(ByteBuffer)} applied to.
@@ -118,8 +121,8 @@ public class AMQProtocolEngine implement
      *
      * Thread-safety: guarded by {@link #_receivedLock}.
      */
-    private final Set<AMQChannel<AMQProtocolEngine>> _channelsForCurrentMessage =
-            new HashSet<AMQChannel<AMQProtocolEngine>>();
+    private final Set<AMQChannel> _channelsForCurrentMessage =
+            new HashSet<AMQChannel>();
 
     private AMQDecoder _decoder;
 
@@ -365,7 +368,7 @@ public class AMQProtocolEngine implement
     {
         RuntimeException exception = null;
 
-        for (AMQChannel<AMQProtocolEngine> channel : _channelsForCurrentMessage)
+        for (AMQChannel channel : _channelsForCurrentMessage)
         {
             try
             {
@@ -428,7 +431,7 @@ public class AMQProtocolEngine implement
     private void frameReceived(AMQFrame frame) throws AMQException
     {
         int channelId = frame.getChannel();
-        AMQChannel<AMQProtocolEngine> amqChannel = _channelMap.get(channelId);
+        AMQChannel amqChannel = _channelMap.get(channelId);
         if(amqChannel != null)
         {
             // The _receivedLock is already acquired in the caller
@@ -638,8 +641,17 @@ public class AMQProtocolEngine implement
                         _logger.info("Closing channel due to: " + e.getMessage());
                     }
 
-                    writeFrame(e.getCloseFrame(channelId));
-                    closeChannel(channelId, e.getErrorCode() == null ? AMQConstant.INTERNAL_ERROR : e.getErrorCode(), e.getMessage());
+                    AMQConstant errorType = e.getErrorCode();
+                    if(errorType == null)
+                    {
+                        errorType = AMQConstant.INTERNAL_ERROR;
+                    }
+                    writeFrame(new AMQFrame(channelId,
+                                            getMethodRegistry().createChannelCloseBody(errorType.getCode(),
+                                                                                       AMQShortString.validValueOf(e.getMessage()),
+                                                                                       e.getClassId(),
+                                                                                       e.getMethodId())));
+                    closeChannel(channelId, errorType, e.getMessage());
                 }
                 else
                 {
@@ -730,7 +742,7 @@ public class AMQProtocolEngine implement
     public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException
     {
 
-        AMQChannel<AMQProtocolEngine> channel = getAndAssertChannel(channelId);
+        AMQChannel channel = getAndAssertChannel(channelId);
 
         channel.publishContentHeader(body);
 
@@ -738,7 +750,7 @@ public class AMQProtocolEngine implement
 
     public void contentBodyReceived(int channelId, ContentBody body) throws AMQException
     {
-        AMQChannel<AMQProtocolEngine> channel = getAndAssertChannel(channelId);
+        AMQChannel channel = getAndAssertChannel(channelId);
 
         channel.publishContentBody(body);
     }
@@ -786,17 +798,17 @@ public class AMQProtocolEngine implement
         _contextKey = contextKey;
     }
 
-    public List<AMQChannel<AMQProtocolEngine>> getChannels()
+    public List<AMQChannel> getChannels()
     {
         synchronized (_channelMap)
         {
-            return new ArrayList<AMQChannel<AMQProtocolEngine>>(_channelMap.values());
+            return new ArrayList<AMQChannel>(_channelMap.values());
         }
     }
 
-    public AMQChannel<AMQProtocolEngine> getAndAssertChannel(int channelId) throws AMQException
+    public AMQChannel getAndAssertChannel(int channelId) throws AMQException
     {
-        AMQChannel<AMQProtocolEngine> channel = getChannel(channelId);
+        AMQChannel channel = getChannel(channelId);
         if (channel == null)
         {
             throw new AMQException(AMQConstant.NOT_FOUND, "Channel not found with id:" + channelId);
@@ -805,9 +817,9 @@ public class AMQProtocolEngine implement
         return channel;
     }
 
-    public AMQChannel<AMQProtocolEngine> getChannel(int channelId)
+    public AMQChannel getChannel(int channelId)
     {
-        final AMQChannel<AMQProtocolEngine> channel =
+        final AMQChannel channel =
                 ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? _cachedChannels[channelId] : _channelMap.get(channelId);
         if ((channel == null) || channel.isClosing())
         {
@@ -824,7 +836,7 @@ public class AMQProtocolEngine implement
         return !_closingChannelsList.isEmpty() && _closingChannelsList.containsKey(channelId);
     }
 
-    public void addChannel(AMQChannel<AMQProtocolEngine> channel) throws AMQException
+    public void addChannel(AMQChannel channel) throws AMQException
     {
         if (_closed)
         {
@@ -891,52 +903,52 @@ public class AMQProtocolEngine implement
         _maxNoOfChannels = value;
     }
 
-    /**
-     * Close a specific channel. This will remove any resources used by the channel, including: <ul><li>any queue
-     * subscriptions (this may in turn remove queues if they are auto delete</li> </ul>
-     *
-     * @param channelId id of the channel to close
-     *
-     * @throws IllegalArgumentException if the channel id is not valid
-     */
-    @Override
-    public void closeChannel(int channelId)
+
+    void closeChannel(AMQChannel channel)
     {
-        closeChannel(channelId, null, null);
+        closeChannel(channel, null, null, false);
+    }
+
+    public void closeChannelAndWriteFrame(AMQChannel channel, AMQConstant cause, String message)
+    {
+        writeFrame(new AMQFrame(channel.getChannelId(),
+                                getMethodRegistry().createChannelCloseBody(cause.getCode(),
+                                                                           AMQShortString.validValueOf(message),
+                                                                           _methodProcessor.getClassId(),
+                                                                           _methodProcessor.getMethodId())));
+        closeChannel(channel, cause, message, true);
     }
 
     public void closeChannel(int channelId, AMQConstant cause, String message)
     {
-        final AMQChannel<AMQProtocolEngine> channel = getChannel(channelId);
+        final AMQChannel channel = getChannel(channelId);
         if (channel == null)
         {
             throw new IllegalArgumentException("Unknown channel id");
         }
-        else
+        closeChannel(channel, cause, message, true);
+    }
+
+    void closeChannel(AMQChannel channel, AMQConstant cause, String message, boolean mark)
+    {
+        int channelId = channel.getChannelId();
+        try
         {
-            try
+            channel.close(cause, message);
+            if(mark)
             {
-                channel.close(cause, message);
                 markChannelAwaitingCloseOk(channelId);
             }
-            finally
-            {
-                removeChannel(channelId);
-            }
+        }
+        finally
+        {
+            removeChannel(channelId);
         }
     }
 
+
     public void closeChannelOk(int channelId)
     {
-        // todo QPID-847 - This is called from two locations ChannelCloseHandler and ChannelCloseOkHandler.
-        // When it is the CC_OK_Handler then it makes sense to remove the channel else we will leak memory.
-        // We do it from the Close Handler as we are sending the OK back to the client.
-        // While this is AMQP spec compliant. The Java client in the event of an IllegalArgumentException
-        // will send a close-ok.. Where we should call removeChannel.
-        // However, due to the poor exception handling on the client. The client-user will be notified of the
-        // InvalidArgument and if they then decide to close the session/connection then the there will be time
-        // for that to occur i.e. a new close method be sent before the exception handling can mark the session closed.
-
         _closingChannelsList.remove(channelId);
     }
 
@@ -952,7 +964,7 @@ public class AMQProtocolEngine implement
      */
     public void removeChannel(int channelId)
     {
-        AMQChannel<AMQProtocolEngine> session;
+        AMQChannel session;
         synchronized (_channelMap)
         {
             session = _channelMap.remove(channelId);
@@ -988,7 +1000,7 @@ public class AMQProtocolEngine implement
      */
     private void closeAllChannels()
     {
-        for (AMQChannel<AMQProtocolEngine> channel : getChannels())
+        for (AMQChannel channel : getChannels())
         {
             channel.close();
         }
@@ -1003,7 +1015,6 @@ public class AMQProtocolEngine implement
     }
 
     /** This must be called when the session is _closed in order to free up any resources managed by the session. */
-    @Override
     public void closeSession()
     {
 
@@ -1103,16 +1114,14 @@ public class AMQProtocolEngine implement
 
 
     void closeConnection(AMQConstant errorCode,
-                         String message, int channelId,
-                         int classId,
-                         int methodId)
+                         String message, int channelId)
     {
 
         if (_logger.isInfoEnabled())
         {
             _logger.info("Closing connection due to: " + message);
         }
-        closeConnection(channelId, new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), errorCode.getCode(), AMQShortString.validValueOf(message), classId, methodId)));
+        closeConnection(channelId, new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), errorCode.getCode(), AMQShortString.validValueOf(message), _methodProcessor.getClassId(), _methodProcessor.getMethodId())));
     }
 
     private void closeConnection(int channelId, AMQFrame frame)
@@ -1137,7 +1146,6 @@ public class AMQProtocolEngine implement
 
     }
 
-    @Override
     public void closeProtocolSession()
     {
         _network.close();
@@ -1520,7 +1528,7 @@ public class AMQProtocolEngine implement
         return String.valueOf(getRemoteAddress());
     }
 
-    public void closeSession(AMQChannel<AMQProtocolEngine> session, AMQConstant cause, String message)
+    public void closeSession(AMQChannel session, AMQConstant cause, String message)
     {
         int channelId = session.getChannelId();
         closeChannel(channelId, cause, message);
@@ -1549,7 +1557,7 @@ public class AMQProtocolEngine implement
             if(!_blocking)
             {
                 _blocking = true;
-                for(AMQChannel<AMQProtocolEngine> channel : _channelMap.values())
+                for(AMQChannel channel : _channelMap.values())
                 {
                     channel.block();
                 }
@@ -1564,7 +1572,7 @@ public class AMQProtocolEngine implement
             if(_blocking)
             {
                 _blocking = false;
-                for(AMQChannel<AMQProtocolEngine> channel : _channelMap.values())
+                for(AMQChannel channel : _channelMap.values())
                 {
                     channel.unblock();
                 }
@@ -1577,9 +1585,9 @@ public class AMQProtocolEngine implement
         return _closed;
     }
 
-    public List<AMQChannel<AMQProtocolEngine>> getSessionModels()
+    public List<AMQChannel> getSessionModels()
     {
-		return new ArrayList<AMQChannel<AMQProtocolEngine>>(getChannels());
+		return new ArrayList<AMQChannel>(getChannels());
     }
 
     public LogSubject getLogSubject()
@@ -1715,31 +1723,26 @@ public class AMQProtocolEngine implement
         return _lastWriteTime.get();
     }
 
-    @Override
     public boolean isCloseWhenNoRoute()
     {
         return _closeWhenNoRoute;
     }
 
-    @Override
     public boolean isCompressionSupported()
     {
         return _compressionSupported && _broker.isMessageCompressionEnabled();
     }
 
-    @Override
     public int getMessageCompressionThreshold()
     {
         return _messageCompressionThreshold;
     }
 
-    @Override
     public Broker<?> getBroker()
     {
         return _broker;
     }
 
-    @Override
     public SubjectCreator getSubjectCreator()
     {
         return _broker.getSubjectCreator(getLocalAddress(), getTransport().isSecure());

Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessor.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessor.java?rev=1631137&r1=1631136&r2=1631137&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessor.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ChannelMethodProcessor.java Sat Oct 11 23:46:39 2014
@@ -20,6 +20,90 @@
  */
 package org.apache.qpid.server.protocol.v0_8;
 
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+
 public interface ChannelMethodProcessor
 {
+    void receiveAccessRequest(AMQShortString realm,
+                              boolean exclusive,
+                              boolean passive,
+                              boolean active,
+                              boolean write,
+                              boolean read);
+
+    void receiveBasicAck(long deliveryTag, boolean multiple);
+
+    void receiveBasicCancel(AMQShortString consumerTag, boolean nowait);
+
+    void receiveBasicConsume(AMQShortString queue,
+                             AMQShortString consumerTag,
+                             boolean noLocal,
+                             boolean noAck,
+                             boolean exclusive,
+                             boolean nowait,
+                             FieldTable arguments);
+
+    void receiveBasicGet(AMQShortString queue, boolean noAck);
+
+    void receiveBasicPublish(AMQShortString exchange,
+                             AMQShortString routingKey,
+                             boolean mandatory,
+                             boolean immediate);
+
+    void receiveBasicQos(long prefetchSize, int prefetchCount, boolean global);
+
+    void receiveBasicRecover(boolean requeue, boolean sync);
+
+    void receiveBasicReject(long deliveryTag, boolean requeue);
+
+    void receiveChannelClose();
+
+    void receiveChannelCloseOk();
+
+    void receiveChannelFlow(boolean active);
+
+    void receiveExchangeBound(AMQShortString exchange, AMQShortString queue, AMQShortString routingKey);
+
+    void receiveExchangeDeclare(AMQShortString exchange,
+                                AMQShortString type,
+                                boolean passive,
+                                boolean durable,
+                                boolean autoDelete,
+                                boolean internal,
+                                boolean nowait,
+                                FieldTable arguments);
+
+    void receiveExchangeDelete(AMQShortString exchange, boolean ifUnused, boolean nowait);
+
+    void receiveQueueBind(AMQShortString queue,
+                          AMQShortString exchange,
+                          AMQShortString routingKey,
+                          boolean nowait,
+                          FieldTable arguments);
+
+    void receiveQueueDeclare(AMQShortString queueStr,
+                             boolean passive,
+                             boolean durable,
+                             boolean exclusive,
+                             boolean autoDelete,
+                             boolean nowait,
+                             FieldTable arguments);
+
+    void receiveQueueDelete(AMQShortString queue, boolean ifUnused, boolean ifEmpty, boolean nowait);
+
+    void receiveQueuePurge(AMQShortString queue, boolean nowait);
+
+    void receiveQueueUnbind(AMQShortString queue,
+                            AMQShortString exchange,
+                            AMQShortString routingKey,
+                            FieldTable arguments);
+
+    void receiveTxSelect();
+
+    void receiveTxCommit();
+
+    void receiveTxRollback();
+
+
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org