You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2014/09/28 17:22:10 UTC
svn commit: r1628074 [2/6] - in
/qpid/branches/QPID-6125-ProtocolRefactoring/java:
broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/
broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/...
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicConsumeMethodHandler.java Sun Sep 28 15:22:03 2014
@@ -36,7 +36,6 @@ import org.apache.qpid.server.filter.AMQ
import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -56,16 +55,16 @@ public class BasicConsumeMethodHandler i
{
}
- public void methodReceived(AMQStateManager stateManager, BasicConsumeBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ BasicConsumeBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
-
- AMQChannel channel = protocolConnection.getChannel(channelId);
- VirtualHostImpl<?,?,?> vHost = protocolConnection.getVirtualHost();
+ AMQChannel channel = connection.getChannel(channelId);
+ VirtualHostImpl<?,?,?> vHost = connection.getVirtualHost();
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
else
{
@@ -119,12 +118,12 @@ public class BasicConsumeMethodHandler i
if (queueName != null)
{
String msg = "No such queue, '" + queueName + "'";
- throw body.getChannelException(AMQConstant.NOT_FOUND, msg);
+ throw body.getChannelException(AMQConstant.NOT_FOUND, msg, connection.getMethodRegistry());
}
else
{
String msg = "No queue name provided, no default queue defined.";
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED, msg);
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED, msg, connection.getMethodRegistry());
}
}
else
@@ -153,9 +152,9 @@ public class BasicConsumeMethodHandler i
body.getNoLocal());
if (!body.getNowait())
{
- MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createBasicConsumeOkBody(consumerTag);
- protocolConnection.writeFrame(responseBody.generateFrame(channelId));
+ connection.writeFrame(responseBody.generateFrame(channelId));
}
}
@@ -163,12 +162,12 @@ public class BasicConsumeMethodHandler i
{
AMQShortString msg = AMQShortString.validValueOf("Non-unique consumer tag, '" + body.getConsumerTag() + "'");
- MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode
msg, // replytext
body.getClazz(),
body.getMethod());
- protocolConnection.writeFrame(responseBody.generateFrame(0));
+ connection.writeFrame(responseBody.generateFrame(0));
}
}
@@ -176,12 +175,12 @@ public class BasicConsumeMethodHandler i
{
_logger.debug("Closing connection due to invalid selector");
- MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createChannelCloseBody(AMQConstant.ARGUMENT_INVALID.getCode(),
AMQShortString.validValueOf(ise.getMessage()),
body.getClazz(),
body.getMethod());
- protocolConnection.writeFrame(responseBody.generateFrame(channelId));
+ connection.writeFrame(responseBody.generateFrame(channelId));
}
@@ -190,28 +189,28 @@ public class BasicConsumeMethodHandler i
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
"Cannot subscribe to queue "
+ queue.getName()
- + " as it already has an existing exclusive consumer");
+ + " as it already has an existing exclusive consumer", connection.getMethodRegistry());
}
catch (AMQQueue.ExistingConsumerPreventsExclusive e)
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
"Cannot subscribe to queue "
+ queue.getName()
- + " exclusively as it already has a consumer");
+ + " exclusively as it already has a consumer", connection.getMethodRegistry());
}
catch (AccessControlException e)
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
"Cannot subscribe to queue "
+ queue.getName()
- + " permission denied");
+ + " permission denied", connection.getMethodRegistry());
}
catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
"Cannot subscribe to queue "
+ queue.getName()
- + " as it already has an incompatible exclusivity policy");
+ + " as it already has an incompatible exclusivity policy", connection.getMethodRegistry());
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java Sun Sep 28 15:22:03 2014
@@ -45,7 +45,6 @@ import org.apache.qpid.server.protocol.v
import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod;
import org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8;
import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -65,17 +64,17 @@ public class BasicGetMethodHandler imple
{
}
- public void methodReceived(AMQStateManager stateManager, BasicGetBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ BasicGetBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession protocolConnection = stateManager.getProtocolSession();
+ VirtualHostImpl vHost = connection.getVirtualHost();
- VirtualHostImpl vHost = protocolConnection.getVirtualHost();
-
- AMQChannel channel = protocolConnection.getChannel(channelId);
+ AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
else
{
@@ -87,12 +86,12 @@ public class BasicGetMethodHandler imple
if(body.getQueue()!=null)
{
throw body.getConnectionException(AMQConstant.NOT_FOUND,
- "No such queue, '" + body.getQueue()+ "'");
+ "No such queue, '" + body.getQueue()+ "'", connection.getMethodRegistry());
}
else
{
throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "No queue name provided, no default queue defined.");
+ "No queue name provided, no default queue defined.", connection.getMethodRegistry());
}
}
else
@@ -100,36 +99,37 @@ public class BasicGetMethodHandler imple
try
{
- if (!performGet(queue,protocolConnection, channel, !body.getNoAck()))
+ if (!performGet(queue,connection, channel, !body.getNoAck()))
{
- MethodRegistry methodRegistry = protocolConnection.getMethodRegistry();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
// TODO - set clusterId
BasicGetEmptyBody responseBody = methodRegistry.createBasicGetEmptyBody(null);
- protocolConnection.writeFrame(responseBody.generateFrame(channelId));
+ connection.writeFrame(responseBody.generateFrame(channelId));
}
}
catch (AccessControlException e)
{
throw body.getConnectionException(AMQConstant.ACCESS_REFUSED,
- e.getMessage());
+ e.getMessage(), connection.getMethodRegistry());
}
catch (MessageSource.ExistingExclusiveConsumer e)
{
throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Queue has an exclusive consumer");
+ "Queue has an exclusive consumer", connection.getMethodRegistry());
}
catch (MessageSource.ExistingConsumerPreventsExclusive e)
{
throw body.getConnectionException(AMQConstant.INTERNAL_ERROR,
"The GET request has been evaluated as an exclusive consumer, " +
- "this is likely due to a programming error in the Qpid broker");
+ "this is likely due to a programming error in the Qpid broker",
+ connection.getMethodRegistry());
}
catch (MessageSource.ConsumerAccessRefused consumerAccessRefused)
{
throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
- "Queue has an incompatible exclusivit policy");
+ "Queue has an incompatible exclusivit policy", connection.getMethodRegistry());
}
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicPublishMethodHandler.java Sun Sep 28 15:22:03 2014
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.protocol.v0_8.handler;
+import java.security.AccessControlException;
+
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -30,12 +32,9 @@ import org.apache.qpid.protocol.AMQConst
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import java.security.AccessControlException;
-
public class BasicPublishMethodHandler implements StateAwareMethodListener<BasicPublishBody>
{
private static final Logger _logger = Logger.getLogger(BasicPublishMethodHandler.class);
@@ -52,16 +51,17 @@ public class BasicPublishMethodHandler i
{
}
- public void methodReceived(AMQStateManager stateManager, BasicPublishBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ BasicPublishBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
if (_logger.isDebugEnabled())
{
_logger.debug("Publish received on channel " + channelId);
}
AMQShortString exchangeName = body.getExchange();
- VirtualHostImpl vHost = session.getVirtualHost();
+ VirtualHostImpl vHost = connection.getVirtualHost();
// TODO: check the delivery tag field details - is it unique across the broker or per subscriber?
@@ -79,21 +79,22 @@ public class BasicPublishMethodHandler i
// if the exchange does not exist we raise a channel exception
if (destination == null)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange name");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange name",
+ connection.getMethodRegistry());
}
else
{
// The partially populated BasicDeliver frame plus the received route body
// is stored in the channel. Once the final body frame has been received
// it is routed to the exchange.
- AMQChannel channel = session.getChannel(channelId);
+ AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
- MessagePublishInfo info = session.getMethodRegistry().getProtocolVersionMethodConverter().convertToInfo(body);
+ MessagePublishInfo info = connection.getMethodRegistry().getProtocolVersionMethodConverter().convertToInfo(body);
info.setExchange(exchangeName);
try
{
@@ -101,7 +102,7 @@ public class BasicPublishMethodHandler i
}
catch (AccessControlException e)
{
- throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry());
}
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicQosHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicQosHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicQosHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicQosHandler.java Sun Sep 28 15:22:03 2014
@@ -26,7 +26,6 @@ import org.apache.qpid.framing.BasicQosB
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
public class BasicQosHandler implements StateAwareMethodListener<BasicQosBody>
@@ -38,21 +37,22 @@ public class BasicQosHandler implements
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, BasicQosBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ BasicQosBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
- AMQChannel channel = session.getChannel(channelId);
+ AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
channel.sync();
channel.setCredit(body.getPrefetchSize(), body.getPrefetchCount());
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createBasicQosOkBody();
- session.writeFrame(responseBody.generateFrame(channelId));
+ connection.writeFrame(responseBody.generateFrame(channelId));
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverMethodHandler.java Sun Sep 28 15:22:03 2014
@@ -29,7 +29,6 @@ import org.apache.qpid.framing.ProtocolV
import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
public class BasicRecoverMethodHandler implements StateAwareMethodListener<BasicRecoverBody>
@@ -43,29 +42,29 @@ public class BasicRecoverMethodHandler i
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, BasicRecoverBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ BasicRecoverBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
-
- _logger.debug("Recover received on protocol session " + session + " and channel " + channelId);
- AMQChannel channel = session.getChannel(channelId);
+ _logger.debug("Recover received on protocol session " + connection + " and channel " + channelId);
+ AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
channel.resend();
// Qpid 0-8 hacks a synchronous -ok onto recover.
// In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant
- if(session.getProtocolVersion().equals(ProtocolVersion.v8_0))
+ if(connection.getProtocolVersion().equals(ProtocolVersion.v8_0))
{
- MethodRegistry_8_0 methodRegistry = (MethodRegistry_8_0) session.getMethodRegistry();
+ MethodRegistry_8_0 methodRegistry = (MethodRegistry_8_0) connection.getMethodRegistry();
AMQMethodBody recoverOk = methodRegistry.createBasicRecoverOkBody();
channel.sync();
- session.writeFrame(recoverOk.generateFrame(channelId));
+ connection.writeFrame(recoverOk.generateFrame(channelId));
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRecoverSyncMethodHandler.java Sun Sep 28 15:22:03 2014
@@ -31,7 +31,6 @@ import org.apache.qpid.framing.amqp_0_9.
import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
public class BasicRecoverSyncMethodHandler implements StateAwareMethodListener<BasicRecoverSyncBody>
@@ -45,35 +44,36 @@ public class BasicRecoverSyncMethodHandl
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, BasicRecoverSyncBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ BasicRecoverSyncBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
- _logger.debug("Recover received on protocol session " + session + " and channel " + channelId);
- AMQChannel channel = session.getChannel(channelId);
+ _logger.debug("Recover received on protocol session " + connection + " and channel " + channelId);
+ AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
channel.sync();
channel.resend();
// Qpid 0-8 hacks a synchronous -ok onto recover.
// In Qpid 0-9 we create a separate sync-recover, sync-recover-ok pair to be "more" compliant
- if(session.getProtocolVersion().equals(ProtocolVersion.v0_9))
+ if(connection.getProtocolVersion().equals(ProtocolVersion.v0_9))
{
- MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) session.getMethodRegistry();
+ MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) connection.getMethodRegistry();
AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody();
- session.writeFrame(recoverOk.generateFrame(channelId));
+ connection.writeFrame(recoverOk.generateFrame(channelId));
}
- else if(session.getProtocolVersion().equals(ProtocolVersion.v0_91))
+ else if(connection.getProtocolVersion().equals(ProtocolVersion.v0_91))
{
- MethodRegistry_0_91 methodRegistry = (MethodRegistry_0_91) session.getMethodRegistry();
+ MethodRegistry_0_91 methodRegistry = (MethodRegistry_0_91) connection.getMethodRegistry();
AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody();
- session.writeFrame(recoverOk.generateFrame(channelId));
+ connection.writeFrame(recoverOk.generateFrame(channelId));
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicRejectMethodHandler.java Sun Sep 28 15:22:03 2014
@@ -27,8 +27,6 @@ import org.apache.qpid.framing.BasicReje
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicRejectBody>
@@ -46,15 +44,16 @@ public class BasicRejectMethodHandler im
{
}
- public void methodReceived(AMQStateManager stateManager, BasicRejectBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ BasicRejectBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
- AMQChannel channel = session.getChannel(channelId);
+ AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
if (_logger.isDebugEnabled())
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseHandler.java Sun Sep 28 15:22:03 2014
@@ -29,7 +29,6 @@ import org.apache.qpid.framing.MethodReg
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
public class ChannelCloseHandler implements StateAwareMethodListener<ChannelCloseBody>
@@ -47,9 +46,10 @@ public class ChannelCloseHandler impleme
{
}
- public void methodReceived(AMQStateManager stateManager, ChannelCloseBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ ChannelCloseBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
if (_logger.isInfoEnabled())
{
@@ -58,19 +58,19 @@ public class ChannelCloseHandler impleme
}
- AMQChannel channel = session.getChannel(channelId);
+ AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "Trying to close unknown channel");
+ throw body.getConnectionException(AMQConstant.CHANNEL_ERROR, "Trying to close unknown channel", connection.getMethodRegistry());
}
channel.sync();
- session.closeChannel(channelId);
+ connection.closeChannel(channelId);
// Client requested closure so we don't wait for ok we send it
- stateManager.getProtocolSession().closeChannelOk(channelId);
+ connection.closeChannelOk(channelId);
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
ChannelCloseOkBody responseBody = methodRegistry.createChannelCloseOkBody();
- session.writeFrame(responseBody.generateFrame(channelId));
+ connection.writeFrame(responseBody.generateFrame(channelId));
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseOkHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseOkHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseOkHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelCloseOkHandler.java Sun Sep 28 15:22:03 2014
@@ -24,7 +24,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ChannelCloseOkBody;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
+import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
public class ChannelCloseOkHandler implements StateAwareMethodListener<ChannelCloseOkBody>
@@ -42,12 +42,14 @@ public class ChannelCloseOkHandler imple
{
}
- public void methodReceived(AMQStateManager stateManager, ChannelCloseOkBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ ChannelCloseOkBody body,
+ int channelId) throws AMQException
{
_logger.info("Received channel-close-ok for channel-id " + channelId);
// Let the Protocol Session know the channel is now closed.
- stateManager.getProtocolSession().closeChannelOk(channelId);
+ connection.closeChannelOk(channelId);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelFlowHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelFlowHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelFlowHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelFlowHandler.java Sun Sep 28 15:22:03 2014
@@ -28,7 +28,6 @@ import org.apache.qpid.framing.ChannelFl
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowBody>
@@ -46,23 +45,24 @@ public class ChannelFlowHandler implemen
{
}
- public void methodReceived(AMQStateManager stateManager, ChannelFlowBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ ChannelFlowBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
- AMQChannel channel = session.getChannel(channelId);
+ AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
channel.sync();
channel.setSuspended(!body.getActive());
_logger.debug("Channel.Flow for channel " + channelId + ", active=" + body.getActive());
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createChannelFlowOkBody(body.getActive());
- session.writeFrame(responseBody.generateFrame(channelId));
+ connection.writeFrame(responseBody.generateFrame(channelId));
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ChannelOpenHandler.java Sun Sep 28 15:22:03 2014
@@ -20,6 +20,11 @@
*/
package org.apache.qpid.server.protocol.v0_8.handler;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.UUID;
+
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
@@ -33,16 +38,10 @@ import org.apache.qpid.framing.amqp_8_0.
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import java.io.ByteArrayOutputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.UUID;
-
public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenBody>
{
private static final Logger _logger = Logger.getLogger(ChannelOpenHandler.class);
@@ -58,10 +57,11 @@ public class ChannelOpenHandler implemen
{
}
- public void methodReceived(AMQStateManager stateManager, ChannelOpenBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ ChannelOpenBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
- VirtualHostImpl virtualHost = session.getVirtualHost();
+ VirtualHostImpl virtualHost = connection.getVirtualHost();
// Protect the broker against out of order frame request.
if (virtualHost == null)
@@ -70,13 +70,13 @@ public class ChannelOpenHandler implemen
}
_logger.info("Connecting to: " + virtualHost.getName());
- final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getMessageStore());
+ final AMQChannel channel = new AMQChannel(connection,channelId, virtualHost.getMessageStore());
- session.addChannel(channel);
+ connection.addChannel(channel);
ChannelOpenOkBody response;
- ProtocolVersion pv = session.getProtocolVersion();
+ ProtocolVersion pv = connection.getProtocolVersion();
if(pv.equals(ProtocolVersion.v8_0))
{
@@ -138,6 +138,6 @@ public class ChannelOpenHandler implemen
}
- session.writeFrame(response.generateFrame(channelId));
+ connection.writeFrame(response.generateFrame(channelId));
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseMethodHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseMethodHandler.java Sun Sep 28 15:22:03 2014
@@ -27,7 +27,6 @@ import org.apache.qpid.framing.Connectio
import org.apache.qpid.framing.ConnectionCloseOkBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody>
@@ -45,28 +44,29 @@ public class ConnectionCloseMethodHandle
{
}
- public void methodReceived(AMQStateManager stateManager, ConnectionCloseBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ ConnectionCloseBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
if (_logger.isInfoEnabled())
{
_logger.info("ConnectionClose received with reply code/reply text " + body.getReplyCode() + "/" +
- body.getReplyText() + " for " + session);
+ body.getReplyText() + " for " + connection);
}
try
{
- session.closeSession();
+ connection.closeSession();
}
catch (Exception e)
{
_logger.error("Error closing protocol session: " + e, e);
}
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody();
- session.writeFrame(responseBody.generateFrame(channelId));
+ connection.writeFrame(responseBody.generateFrame(channelId));
- session.closeProtocolSession();
+ connection.closeProtocolSession();
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseOkMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseOkMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseOkMethodHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionCloseOkMethodHandler.java Sun Sep 28 15:22:03 2014
@@ -26,7 +26,6 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ConnectionCloseOkBody;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
public class ConnectionCloseOkMethodHandler implements StateAwareMethodListener<ConnectionCloseOkBody>
@@ -44,16 +43,17 @@ public class ConnectionCloseOkMethodHand
{
}
- public void methodReceived(AMQStateManager stateManager, ConnectionCloseOkBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ ConnectionCloseOkBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
//todo should this not do more than just log the method?
_logger.info("Received Connection-close-ok");
try
{
- stateManager.changeState(AMQState.CONNECTION_CLOSED);
- session.closeSession();
+ connection.changeState(AMQState.CONNECTION_CLOSED);
+ connection.closeSession();
}
catch (Exception e)
{
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionOpenMethodHandler.java Sun Sep 28 15:22:03 2014
@@ -34,7 +34,6 @@ import org.apache.qpid.server.model.Stat
import org.apache.qpid.server.model.port.AmqpPort;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -58,9 +57,10 @@ public class ConnectionOpenMethodHandler
return new AMQShortString(Long.toString(System.currentTimeMillis()));
}
- public void methodReceived(AMQStateManager stateManager, ConnectionOpenBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ ConnectionOpenBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
//ignore leading '/'
String virtualHostName;
@@ -73,42 +73,44 @@ public class ConnectionOpenMethodHandler
virtualHostName = body.getVirtualHost() == null ? null : String.valueOf(body.getVirtualHost());
}
- VirtualHostImpl virtualHost = ((AmqpPort)stateManager.getProtocolSession().getPort()).getVirtualHost(virtualHostName);
+ VirtualHostImpl virtualHost = ((AmqpPort)connection.getPort()).getVirtualHost(virtualHostName);
if (virtualHost == null)
{
- throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: '" + virtualHostName + "'");
+ throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown virtual host: '" + virtualHostName + "'",
+ connection.getMethodRegistry());
}
else
{
// Check virtualhost access
if (virtualHost.getState() != State.ACTIVE)
{
- throw body.getConnectionException(AMQConstant.CONNECTION_FORCED, "Virtual host '" + virtualHost.getName() + "' is not active");
+ throw body.getConnectionException(AMQConstant.CONNECTION_FORCED, "Virtual host '" + virtualHost.getName() + "' is not active",
+ connection.getMethodRegistry());
}
- session.setVirtualHost(virtualHost);
+ connection.setVirtualHost(virtualHost);
try
{
- virtualHost.getSecurityManager().authoriseCreateConnection(session);
+ virtualHost.getSecurityManager().authoriseCreateConnection(connection);
}
catch (AccessControlException e)
{
- throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry());
}
// See Spec (0.8.2). Section 3.1.2 Virtual Hosts
- if (session.getContextKey() == null)
+ if (connection.getContextKey() == null)
{
- session.setContextKey(generateClientID());
+ connection.setContextKey(generateClientID());
}
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost());
- stateManager.changeState(AMQState.CONNECTION_OPEN);
+ connection.changeState(AMQState.CONNECTION_OPEN);
- session.writeFrame(responseBody.generateFrame(channelId));
+ connection.writeFrame(responseBody.generateFrame(channelId));
}
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionSecureOkMethodHandler.java Sun Sep 28 15:22:03 2014
@@ -36,7 +36,6 @@ import org.apache.qpid.protocol.AMQConst
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
@@ -56,19 +55,20 @@ public class ConnectionSecureOkMethodHan
{
}
- public void methodReceived(AMQStateManager stateManager, ConnectionSecureOkBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ ConnectionSecureOkBody body,
+ int channelId) throws AMQException
{
- Broker<?> broker = stateManager.getBroker();
- AMQProtocolSession session = stateManager.getProtocolSession();
+ Broker<?> broker = connection.getBroker();
- SubjectCreator subjectCreator = stateManager.getSubjectCreator();
+ SubjectCreator subjectCreator = connection.getSubjectCreator();
- SaslServer ss = session.getSaslServer();
+ SaslServer ss = connection.getSaslServer();
if (ss == null)
{
throw new AMQException("No SASL context set up in session");
}
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse());
switch (authResult.getStatus())
{
@@ -78,7 +78,7 @@ public class ConnectionSecureOkMethodHan
_logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
// This should be abstracted
- stateManager.changeState(AMQState.CONNECTION_CLOSING);
+ connection.changeState(AMQState.CONNECTION_CLOSING);
ConnectionCloseBody connectionCloseBody =
methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(),
@@ -86,15 +86,15 @@ public class ConnectionSecureOkMethodHan
body.getClazz(),
body.getMethod());
- session.writeFrame(connectionCloseBody.generateFrame(0));
- disposeSaslServer(session);
+ connection.writeFrame(connectionCloseBody.generateFrame(0));
+ disposeSaslServer(connection);
break;
case SUCCESS:
if (_logger.isInfoEnabled())
{
_logger.info("Connected as: " + authResult.getSubject());
}
- stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
+ connection.changeState(AMQState.CONNECTION_NOT_TUNED);
int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
@@ -107,15 +107,15 @@ public class ConnectionSecureOkMethodHan
methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
frameMax,
broker.getConnection_heartBeatDelay());
- session.writeFrame(tuneBody.generateFrame(0));
- session.setAuthorizedSubject(authResult.getSubject());
- disposeSaslServer(session);
+ connection.writeFrame(tuneBody.generateFrame(0));
+ connection.setAuthorizedSubject(authResult.getSubject());
+ disposeSaslServer(connection);
break;
case CONTINUE:
- stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
+ connection.changeState(AMQState.CONNECTION_NOT_AUTH);
ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
- session.writeFrame(secureBody.generateFrame(0));
+ connection.writeFrame(secureBody.generateFrame(0));
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionStartOkMethodHandler.java Sun Sep 28 15:22:03 2014
@@ -35,7 +35,6 @@ import org.apache.qpid.protocol.AMQConst
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.security.SubjectCreator;
import org.apache.qpid.server.security.auth.SubjectAuthenticationResult;
@@ -56,32 +55,36 @@ public class ConnectionStartOkMethodHand
{
}
- public void methodReceived(AMQStateManager stateManager, ConnectionStartOkBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ ConnectionStartOkBody body,
+ int channelId) throws AMQException
{
- Broker<?> broker = stateManager.getBroker();
- AMQProtocolSession session = stateManager.getProtocolSession();
+ Broker<?> broker = connection.getBroker();
_logger.info("SASL Mechanism selected: " + body.getMechanism());
_logger.info("Locale selected: " + body.getLocale());
- SubjectCreator subjectCreator = stateManager.getSubjectCreator();
+ SubjectCreator subjectCreator = connection.getSubjectCreator();
SaslServer ss = null;
try
{
- ss = subjectCreator.createSaslServer(String.valueOf(body.getMechanism()), session.getLocalFQDN(), session.getPeerPrincipal());
+ ss = subjectCreator.createSaslServer(String.valueOf(body.getMechanism()),
+ connection.getLocalFQDN(),
+ connection.getPeerPrincipal());
if (ss == null)
{
- throw body.getConnectionException(AMQConstant.RESOURCE_ERROR, "Unable to create SASL Server:" + body.getMechanism());
+ throw body.getConnectionException(AMQConstant.RESOURCE_ERROR, "Unable to create SASL Server:" + body.getMechanism(),
+ connection.getMethodRegistry());
}
- session.setSaslServer(ss);
+ connection.setSaslServer(ss);
final SubjectAuthenticationResult authResult = subjectCreator.authenticate(ss, body.getResponse());
//save clientProperties
- session.setClientProperties(body.getClientProperties());
+ connection.setClientProperties(body.getClientProperties());
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
switch (authResult.getStatus())
{
@@ -90,7 +93,7 @@ public class ConnectionStartOkMethodHand
_logger.info("Authentication failed:" + (cause == null ? "" : cause.getMessage()));
- stateManager.changeState(AMQState.CONNECTION_CLOSING);
+ connection.changeState(AMQState.CONNECTION_CLOSING);
ConnectionCloseBody closeBody =
methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode
@@ -98,8 +101,8 @@ public class ConnectionStartOkMethodHand
body.getClazz(),
body.getMethod());
- session.writeFrame(closeBody.generateFrame(0));
- disposeSaslServer(session);
+ connection.writeFrame(closeBody.generateFrame(0));
+ disposeSaslServer(connection);
break;
case SUCCESS:
@@ -107,9 +110,9 @@ public class ConnectionStartOkMethodHand
{
_logger.info("Connected as: " + authResult.getSubject());
}
- session.setAuthorizedSubject(authResult.getSubject());
+ connection.setAuthorizedSubject(authResult.getSubject());
- stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
+ connection.changeState(AMQState.CONNECTION_NOT_TUNED);
int frameMax = broker.getContextValue(Integer.class, Broker.BROKER_FRAME_SIZE);
if(frameMax <= 0)
@@ -120,18 +123,18 @@ public class ConnectionStartOkMethodHand
ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(broker.getConnection_sessionCountLimit(),
frameMax,
broker.getConnection_heartBeatDelay());
- session.writeFrame(tuneBody.generateFrame(0));
+ connection.writeFrame(tuneBody.generateFrame(0));
break;
case CONTINUE:
- stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
+ connection.changeState(AMQState.CONNECTION_NOT_AUTH);
ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.getChallenge());
- session.writeFrame(secureBody.generateFrame(0));
+ connection.writeFrame(secureBody.generateFrame(0));
}
}
catch (SaslException e)
{
- disposeSaslServer(session);
+ disposeSaslServer(connection);
throw new AMQException("SASL error: " + e, e);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ConnectionTuneOkMethodHandler.java Sun Sep 28 15:22:03 2014
@@ -29,7 +29,6 @@ import org.apache.qpid.protocol.AMQConst
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.protocol.v0_8.state.AMQState;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
public class ConnectionTuneOkMethodHandler implements StateAwareMethodListener<ConnectionTuneOkBody>
@@ -43,19 +42,20 @@ public class ConnectionTuneOkMethodHandl
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, ConnectionTuneOkBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ ConnectionTuneOkBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
if (_logger.isDebugEnabled())
{
_logger.debug(body);
}
- stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
+ connection.changeState(AMQState.CONNECTION_NOT_OPENED);
- session.initHeartbeats(body.getHeartbeat());
+ connection.initHeartbeats(body.getHeartbeat());
- int brokerFrameMax = stateManager.getBroker().getContextValue(Integer.class,Broker.BROKER_FRAME_SIZE);
+ int brokerFrameMax = connection.getBroker().getContextValue(Integer.class,Broker.BROKER_FRAME_SIZE);
if(brokerFrameMax <= 0)
{
brokerFrameMax = Integer.MAX_VALUE;
@@ -68,7 +68,7 @@ public class ConnectionTuneOkMethodHandl
+ "greater than the broker will allow: "
+ brokerFrameMax,
body.getClazz(), body.getMethod(),
- body.getMajor(), body.getMinor(),null);
+ connection.getMethodRegistry(),null);
}
else if(body.getFrameMax() > 0 && body.getFrameMax() < AMQConstant.FRAME_MIN_SIZE.getCode())
{
@@ -77,13 +77,13 @@ public class ConnectionTuneOkMethodHandl
+ "which is smaller than the specification definined minimum: "
+ AMQConstant.FRAME_MIN_SIZE.getCode(),
body.getClazz(), body.getMethod(),
- body.getMajor(), body.getMinor(),null);
+ connection.getMethodRegistry(),null);
}
int frameMax = body.getFrameMax() == 0 ? brokerFrameMax : (int) body.getFrameMax();
- session.setMaxFrameSize(frameMax);
+ connection.setMaxFrameSize(frameMax);
long maxChannelNumber = body.getChannelMax();
//0 means no implied limit, except that forced by protocol limitations (0xFFFF)
- session.setMaximumNumberOfChannels( maxChannelNumber == 0 ? 0xFFFFL : maxChannelNumber);
+ connection.setMaximumNumberOfChannels(maxChannelNumber == 0 ? 0xFFFFL : maxChannelNumber);
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeBoundHandler.java Sun Sep 28 15:22:03 2014
@@ -29,7 +29,6 @@ import org.apache.qpid.server.exchange.E
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -65,16 +64,17 @@ public class ExchangeBoundHandler implem
{
}
- public void methodReceived(AMQStateManager stateManager, ExchangeBoundBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ ExchangeBoundBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
- VirtualHostImpl virtualHost = session.getVirtualHost();
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ VirtualHostImpl virtualHost = connection.getVirtualHost();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
- final AMQChannel channel = session.getChannel(channelId);
+ final AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
channel.sync();
@@ -227,7 +227,7 @@ public class ExchangeBoundHandler implem
}
}
}
- session.writeFrame(response.generateFrame(channelId));
+ connection.writeFrame(response.generateFrame(channelId));
}
protected boolean isDefaultExchange(final AMQShortString exchangeName)
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeclareHandler.java Sun Sep 28 15:22:03 2014
@@ -41,7 +41,6 @@ import org.apache.qpid.server.model.NoFa
import org.apache.qpid.server.model.UnknownConfiguredObjectException;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.ExchangeExistsException;
import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
@@ -62,14 +61,15 @@ public class ExchangeDeclareHandler impl
{
}
- public void methodReceived(AMQStateManager stateManager, ExchangeDeclareBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ ExchangeDeclareBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
- VirtualHostImpl virtualHost = session.getVirtualHost();
- final AMQChannel channel = session.getChannel(channelId);
+ VirtualHostImpl virtualHost = connection.getVirtualHost();
+ final AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
final AMQShortString exchangeName = body.getExchange();
@@ -89,7 +89,7 @@ public class ExchangeDeclareHandler impl
+ ExchangeDefaults.DIRECT_EXCHANGE_CLASS
+ " to " + body.getType() +".",
body.getClazz(), body.getMethod(),
- body.getMajor(), body.getMinor(),null);
+ connection.getMethodRegistry(),null);
}
}
else
@@ -99,14 +99,15 @@ public class ExchangeDeclareHandler impl
exchange = virtualHost.getExchange(exchangeName.toString());
if(exchange == null)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName);
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + exchangeName,
+ connection.getMethodRegistry());
}
else if (!(body.getType() == null || body.getType().length() ==0) && !exchange.getType().equals(body.getType().asString()))
{
throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " +
exchangeName + " of type " + exchange.getType()
- + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor(),null);
+ + " to " + body.getType() +".",body.getClazz(), body.getMethod(),connection.getMethodRegistry(),null);
}
}
@@ -139,7 +140,7 @@ public class ExchangeDeclareHandler impl
{
throw body.getConnectionException(AMQConstant.NOT_ALLOWED,
"Attempt to declare exchange: " + exchangeName +
- " which begins with reserved prefix.");
+ " which begins with reserved prefix.", connection.getMethodRegistry());
}
catch(ExchangeExistsException e)
@@ -147,40 +148,44 @@ public class ExchangeDeclareHandler impl
exchange = e.getExistingExchange();
if(!new AMQShortString(exchange.getType()).equals(body.getType()))
{
- throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: "
- + exchangeName + " of type "
- + exchange.getType()
- + " to " + body.getType() +".",
- body.getClazz(), body.getMethod(),
- body.getMajor(), body.getMinor(),null);
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: "
+ + exchangeName + " of type "
+ + exchange.getType()
+ + " to " + body.getType() + ".",
+ connection.getMethodRegistry());
}
}
catch(NoFactoryForTypeException e)
{
- throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + exchangeName,e);
+ throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange type '"+e.getType()+"' for exchange '" + exchangeName + "'", connection.getMethodRegistry());
}
catch (AccessControlException e)
{
- throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry());
}
catch (UnknownConfiguredObjectException e)
{
// note - since 0-8/9/9-1 can't set the alt. exchange this exception should never occur
- throw body.getConnectionException(AMQConstant.NOT_FOUND, "Unknown alternate exchange",e);
+ throw body.getConnectionException(AMQConstant.NOT_FOUND,
+ "Unknown alternate exchange "
+ + (e.getName() != null
+ ? "name: \"" + e.getName() + "\""
+ : "id: " + e.getId()),
+ connection.getMethodRegistry());
}
catch (IllegalArgumentException e)
{
- throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Error creating exchange",e);
+ throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Error creating exchange '"+exchangeName+"': " + e.getMessage(),connection.getMethodRegistry());
}
}
}
if(!body.getNowait())
{
- MethodRegistry methodRegistry = session.getMethodRegistry();
+ MethodRegistry methodRegistry = connection.getMethodRegistry();
AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody();
channel.sync();
- session.writeFrame(responseBody.generateFrame(channelId));
+ connection.writeFrame(responseBody.generateFrame(channelId));
}
}
Modified: qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java?rev=1628074&r1=1628073&r2=1628074&view=diff
==============================================================================
--- qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java (original)
+++ qpid/branches/QPID-6125-ProtocolRefactoring/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/ExchangeDeleteHandler.java Sun Sep 28 15:22:03 2014
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.protocol.v0_8.handler;
+import java.security.AccessControlException;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ExchangeDeleteBody;
@@ -28,14 +30,11 @@ import org.apache.qpid.protocol.AMQConst
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
-import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager;
import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener;
import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
import org.apache.qpid.server.virtualhost.RequiredExchangeException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import java.security.AccessControlException;
-
public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeDeleteBody>
{
private static final ExchangeDeleteHandler _instance = new ExchangeDeleteHandler();
@@ -49,14 +48,15 @@ public class ExchangeDeleteHandler imple
{
}
- public void methodReceived(AMQStateManager stateManager, ExchangeDeleteBody body, int channelId) throws AMQException
+ public void methodReceived(final AMQProtocolSession<?> connection,
+ ExchangeDeleteBody body,
+ int channelId) throws AMQException
{
- AMQProtocolSession session = stateManager.getProtocolSession();
- VirtualHostImpl virtualHost = session.getVirtualHost();
- final AMQChannel channel = session.getChannel(channelId);
+ VirtualHostImpl virtualHost = connection.getVirtualHost();
+ final AMQChannel channel = connection.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(channelId);
+ throw body.getChannelNotFoundException(channelId, connection.getMethodRegistry());
}
channel.sync();
try
@@ -64,7 +64,7 @@ public class ExchangeDeleteHandler imple
if(isDefaultExchange(body.getExchange()))
{
- throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Default Exchange cannot be deleted");
+ throw body.getConnectionException(AMQConstant.NOT_ALLOWED, "Default Exchange cannot be deleted", connection.getMethodRegistry());
}
final String exchangeName = body.getExchange().toString();
@@ -72,28 +72,31 @@ public class ExchangeDeleteHandler imple
final ExchangeImpl exchange = virtualHost.getExchange(exchangeName);
if(exchange == null)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "No such exchange: " + body.getExchange());
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "No such exchange: " + body.getExchange(),
+ connection.getMethodRegistry());
}
virtualHost.removeExchange(exchange, !body.getIfUnused());
- ExchangeDeleteOkBody responseBody = session.getMethodRegistry().createExchangeDeleteOkBody();
+ ExchangeDeleteOkBody responseBody = connection.getMethodRegistry().createExchangeDeleteOkBody();
- session.writeFrame(responseBody.generateFrame(channelId));
+ connection.writeFrame(responseBody.generateFrame(channelId));
}
catch (ExchangeIsAlternateException e)
{
- throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange");
+ throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange in use as an alternate exchange",
+ connection.getMethodRegistry());
}
catch (RequiredExchangeException e)
{
- throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange '"+body.getExchange()+"' cannot be deleted");
+ throw body.getChannelException(AMQConstant.NOT_ALLOWED, "Exchange '"+body.getExchange()+"' cannot be deleted",
+ connection.getMethodRegistry());
}
catch (AccessControlException e)
{
- throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage());
+ throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, e.getMessage(), connection.getMethodRegistry());
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org