You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/11/24 22:14:23 UTC
svn commit: r597918 [8/12] - in /incubator/qpid/branches/M2.1: gentools/
gentools/lib/ gentools/src/org/apache/qpid/gentools/ java/
java/broker/src/main/java/org/apache/qpid/server/
java/broker/src/main/java/org/apache/qpid/server/handler/ java/broker/...
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java Sat Nov 24 13:14:14 2007
@@ -25,6 +25,7 @@
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ChannelCloseBody;
import org.apache.qpid.framing.ChannelCloseOkBody;
+import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -47,16 +48,16 @@
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelCloseBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ChannelCloseBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
- ChannelCloseBody body = evt.getMethod();
+
if (_logger.isInfoEnabled())
{
- _logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId +
- " and method " + body.methodId);
+ _logger.info("Received channel close for id " + channelId + " citing class " + body.getClassId() +
+ " and method " + body.getMethodId());
}
- int channelId = evt.getChannelId();
+
AMQChannel channel = session.getChannel(channelId);
@@ -69,10 +70,8 @@
// Client requested closure so we don't wait for ok we send it
stateManager.getProtocolSession().closeChannelOk(channelId);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0);
- session.writeFrame(response);
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+ ChannelCloseOkBody responseBody = methodRegistry.createChannelCloseOkBody();
+ session.writeFrame(responseBody.generateFrame(channelId));
}
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java Sat Nov 24 13:14:14 2007
@@ -42,9 +42,9 @@
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelCloseOkBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ChannelCloseOkBody body, int channelId) throws AMQException
{
- int channelId = evt.getChannelId();
+
_logger.info("Received channel-close-ok for channel-id " + channelId);
// Let the Protocol Session know the channel is now closed.
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java Sat Nov 24 13:14:14 2007
@@ -22,9 +22,7 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.ChannelFlowBody;
-import org.apache.qpid.framing.ChannelFlowOkBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -46,27 +44,23 @@
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelFlowBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ChannelFlowBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
- ChannelFlowBody body = evt.getMethod();
- AMQChannel channel = session.getChannel(evt.getChannelId());
+
+ AMQChannel channel = session.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(evt.getChannelId());
+ throw body.getChannelNotFoundException(channelId);
}
- channel.setSuspended(!body.active);
- _logger.debug("Channel.Flow for channel " + evt.getChannelId() + ", active=" + body.active);
+ channel.setSuspended(!body.getActive());
+ _logger.debug("Channel.Flow for channel " + channelId + ", active=" + body.getActive());
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame response = ChannelFlowOkBody.createAMQFrame(evt.getChannelId(),
- (byte)8, (byte)0, // AMQP version (major, minor)
- body.active); // active
- session.writeFrame(response);
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createChannelFlowOkBody(body.getActive());
+ session.writeFrame(responseBody.generateFrame(channelId));
}
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java Sat Nov 24 13:14:14 2007
@@ -20,11 +20,17 @@
*/
package org.apache.qpid.server.handler;
+import java.util.UUID;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.ChannelOpenBody;
-import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
+import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQStateManager;
@@ -44,18 +50,55 @@
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelOpenBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ChannelOpenBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
- final AMQChannel channel = new AMQChannel(session,evt.getChannelId(), virtualHost.getMessageStore(),
+ final AMQChannel channel = new AMQChannel(session,channelId, virtualHost.getMessageStore(),
virtualHost.getExchangeRegistry());
session.addChannel(channel);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame response = ChannelOpenOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
- session.writeFrame(response);
+
+ ChannelOpenOkBody response;
+
+ ProtocolVersion pv = session.getProtocolVersion();
+
+ if(pv.equals(ProtocolVersion.v8_0))
+ {
+ MethodRegistry_8_0 methodRegistry = (MethodRegistry_8_0) MethodRegistry.getMethodRegistry(ProtocolVersion.v8_0);
+ response = methodRegistry.createChannelOpenOkBody();
+
+ }
+ else if(pv.equals(ProtocolVersion.v0_9))
+ {
+ MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) MethodRegistry.getMethodRegistry(ProtocolVersion.v0_9);
+ UUID uuid = UUID.randomUUID();
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ DataOutputStream dataOut = new DataOutputStream(output);
+ try
+ {
+ dataOut.writeLong(uuid.getMostSignificantBits());
+ dataOut.writeLong(uuid.getLeastSignificantBits());
+ dataOut.flush();
+ dataOut.close();
+ }
+ catch (IOException e)
+ {
+ // This *really* shouldn't happen as we're not doing any I/O
+ throw new RuntimeException("I/O exception when writing to byte array", e);
+ }
+
+ // should really associate this channelId to the session
+ byte[] channelName = output.toByteArray();
+
+ response = methodRegistry.createChannelOpenOkBody(channelName);
+ }
+ else
+ {
+ throw new AMQException(AMQConstant.INTERNAL_ERROR, "Got channel open for protocol version not catered for: " + pv, null);
+ }
+
+
+ session.writeFrame(response.generateFrame(channelId));
}
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java Sat Nov 24 13:14:14 2007
@@ -25,6 +25,7 @@
import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.ConnectionCloseOkBody;
+import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQStateManager;
@@ -45,14 +46,14 @@
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionCloseBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ConnectionCloseBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
- final ConnectionCloseBody body = evt.getMethod();
+
if (_logger.isInfoEnabled())
{
- _logger.info("ConnectionClose received with reply code/reply text " + body.replyCode + "/" +
- body.replyText + " for " + session);
+ _logger.info("ConnectionClose received with reply code/reply text " + body.getReplyCode() + "/" +
+ body.getReplyText() + " for " + session);
}
try
{
@@ -62,10 +63,10 @@
{
_logger.error("Error closing protocol session: " + e, e);
}
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0);
- session.writeFrame(response);
+
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+ ConnectionCloseOkBody responseBody = methodRegistry.createConnectionCloseOkBody();
+ session.writeFrame(responseBody.generateFrame(channelId));
+
}
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java Sat Nov 24 13:14:14 2007
@@ -44,7 +44,7 @@
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionCloseOkBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ConnectionCloseOkBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
//todo should this not do more than just log the method?
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java Sat Nov 24 13:14:14 2007
@@ -21,10 +21,7 @@
package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ConnectionOpenBody;
-import org.apache.qpid.framing.ConnectionOpenOkBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -56,20 +53,20 @@
return new AMQShortString(Long.toString(System.currentTimeMillis()));
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionOpenBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ConnectionOpenBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
- ConnectionOpenBody body = evt.getMethod();
+
//ignore leading '/'
String virtualHostName;
- if ((body.virtualHost != null) && body.virtualHost.charAt(0) == '/')
+ if ((body.getVirtualHost() != null) && body.getVirtualHost().charAt(0) == '/')
{
- virtualHostName = new StringBuilder(body.virtualHost.subSequence(1, body.virtualHost.length())).toString();
+ virtualHostName = new StringBuilder(body.getVirtualHost().subSequence(1, body.getVirtualHost().length())).toString();
}
else
{
- virtualHostName = body.virtualHost == null ? null : String.valueOf(body.virtualHost);
+ virtualHostName = body.getVirtualHost() == null ? null : String.valueOf(body.getVirtualHost());
}
VirtualHost virtualHost = stateManager.getVirtualHostRegistry().getVirtualHost(virtualHostName);
@@ -105,14 +102,14 @@
session.setContextKey(generateClientID());
}
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short) 0,
- (byte) 8, (byte) 0, // AMQP version (major, minor)
- body.virtualHost);
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost());
+
stateManager.changeState(AMQState.CONNECTION_OPEN);
- session.writeFrame(response);
+
+ session.writeFrame(responseBody.generateFrame(channelId));
+
+
}
}
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java Sat Nov 24 13:14:14 2007
@@ -25,11 +25,7 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ConnectionSecureBody;
-import org.apache.qpid.framing.ConnectionSecureOkBody;
-import org.apache.qpid.framing.ConnectionTuneBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -57,10 +53,10 @@
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionSecureOkBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ConnectionSecureOkBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
- ConnectionSecureOkBody body = evt.getMethod();
+
//fixme Vhost not defined yet
//session.getVirtualHost().getAuthenticationManager();
@@ -71,8 +67,8 @@
{
throw new AMQException("No SASL context set up in session");
}
-
- AuthenticationResult authResult = authMgr.authenticate(ss, body.response);
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+ AuthenticationResult authResult = authMgr.authenticate(ss, body.getResponse());
switch (authResult.status)
{
case ERROR:
@@ -80,45 +76,34 @@
// throw new AMQException(AMQConstant.NOT_ALLOWED.getCode(), AMQConstant.NOT_ALLOWED.getName());
_logger.info("Authentication failed");
stateManager.changeState(AMQState.CONNECTION_CLOSING);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame close = ConnectionCloseBody.createAMQFrame(0,
- (byte)8, (byte)0, // AMQP version (major, minor)
- ConnectionCloseBody.getClazz((byte)8, (byte)0), // classId
- ConnectionCloseBody.getMethod((byte)8, (byte)0), // methodId
- AMQConstant.NOT_ALLOWED.getCode(), // replyCode
- AMQConstant.NOT_ALLOWED.getName()); // replyText
- session.writeFrame(close);
+
+
+ ConnectionCloseBody connectionCloseBody =
+ methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(),
+ AMQConstant.NOT_ALLOWED.getName(),
+ body.getClazz(),
+ body.getMethod());
+
+ session.writeFrame(connectionCloseBody.generateFrame(0) );
disposeSaslServer(session);
break;
case SUCCESS:
_logger.info("Connected as: " + ss.getAuthorizationID());
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
- // TODO: Check the value of channelMax here: This should be the max
- // value of a 2-byte unsigned integer (as channel is only 2 bytes on the wire),
- // not Integer.MAX_VALUE (which is signed 4 bytes).
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame tune = ConnectionTuneBody.createAMQFrame(0,
- (byte)8, (byte)0, // AMQP version (major, minor)
- Integer.MAX_VALUE, // channelMax
- ConnectionStartOkMethodHandler.getConfiguredFrameSize(), // frameMax
- HeartbeatConfig.getInstance().getDelay()); // heartbeat
- session.writeFrame(tune);
+
+ ConnectionTuneBody tuneBody =
+ methodRegistry.createConnectionTuneBody(0xFFFF,
+ ConnectionStartOkMethodHandler.getConfiguredFrameSize(),
+ HeartbeatConfig.getInstance().getDelay());
+ session.writeFrame(tuneBody.generateFrame(0));
session.setAuthorizedID(new UsernamePrincipal(ss.getAuthorizationID()));
disposeSaslServer(session);
break;
case CONTINUE:
stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0,
- (byte)8, (byte)0, // AMQP version (major, minor)
- authResult.challenge); // challenge
- session.writeFrame(challenge);
+
+ ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.challenge);
+ session.writeFrame(secureBody.generateFrame(0));
}
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java Sat Nov 24 13:14:14 2007
@@ -26,11 +26,7 @@
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.ConnectionSecureBody;
-import org.apache.qpid.framing.ConnectionStartOkBody;
-import org.apache.qpid.framing.ConnectionTuneBody;
-import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -53,7 +49,7 @@
private static final int DEFAULT_FRAME_SIZE = 65536;
- public static StateAwareMethodListener<ConnectionStartOkBody> getInstance()
+ public static ConnectionStartOkMethodHandler getInstance()
{
return _instance;
}
@@ -62,51 +58,51 @@
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionStartOkBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ConnectionStartOkBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
- final ConnectionStartOkBody body = evt.getMethod();
- _logger.info("SASL Mechanism selected: " + body.mechanism);
- _logger.info("Locale selected: " + body.locale);
+
+ _logger.info("SASL Mechanism selected: " + body.getMechanism());
+ _logger.info("Locale selected: " + body.getLocale());
AuthenticationManager authMgr = ApplicationRegistry.getInstance().getAuthenticationManager();//session.getVirtualHost().getAuthenticationManager();
SaslServer ss = null;
try
{
- ss = authMgr.createSaslServer(String.valueOf(body.mechanism), session.getLocalFQDN());
+ ss = authMgr.createSaslServer(String.valueOf(body.getMechanism()), session.getLocalFQDN());
if (ss == null)
{
- throw body.getConnectionException(AMQConstant.RESOURCE_ERROR, "Unable to create SASL Server:" + body.mechanism
+ throw body.getConnectionException(AMQConstant.RESOURCE_ERROR, "Unable to create SASL Server:" + body.getMechanism()
);
}
session.setSaslServer(ss);
- AuthenticationResult authResult = authMgr.authenticate(ss, body.response);
+ AuthenticationResult authResult = authMgr.authenticate(ss, body.getResponse());
//save clientProperties
if (session.getClientProperties() == null)
{
- session.setClientProperties(body.clientProperties);
+ session.setClientProperties(body.getClientProperties());
}
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+
switch (authResult.status)
{
case ERROR:
_logger.info("Authentication failed");
stateManager.changeState(AMQState.CONNECTION_CLOSING);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame close = ConnectionCloseBody.createAMQFrame(0,
- (byte) 8, (byte) 0, // AMQP version (major, minor)
- ConnectionCloseBody.getClazz((byte) 8, (byte) 0), // classId
- ConnectionCloseBody.getMethod((byte) 8, (byte) 0), // methodId
- AMQConstant.NOT_ALLOWED.getCode(), // replyCode
- AMQConstant.NOT_ALLOWED.getName()); // replyText
- session.writeFrame(close);
+
+ ConnectionCloseBody closeBody =
+ methodRegistry.createConnectionCloseBody(AMQConstant.NOT_ALLOWED.getCode(), // replyCode
+ AMQConstant.NOT_ALLOWED.getName(),
+ body.getClazz(),
+ body.getMethod());
+
+ session.writeFrame(closeBody.generateFrame(0));
disposeSaslServer(session);
break;
@@ -115,25 +111,17 @@
session.setAuthorizedID(new UsernamePrincipal(ss.getAuthorizationID()));
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame tune = ConnectionTuneBody.createAMQFrame(0,
- (byte) 8, (byte) 0, // AMQP version (major, minor)
- Integer.MAX_VALUE, // channelMax
- getConfiguredFrameSize(), // frameMax
- HeartbeatConfig.getInstance().getDelay()); // heartbeat
- session.writeFrame(tune);
+
+ ConnectionTuneBody tuneBody = methodRegistry.createConnectionTuneBody(0xFFFF,
+ getConfiguredFrameSize(),
+ HeartbeatConfig.getInstance().getDelay());
+ session.writeFrame(tuneBody.generateFrame(0));
break;
case CONTINUE:
stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0,
- (byte) 8, (byte) 0, // AMQP version (major, minor)
- authResult.challenge); // challenge
- session.writeFrame(challenge);
+
+ ConnectionSecureBody secureBody = methodRegistry.createConnectionSecureBody(authResult.challenge);
+ session.writeFrame(secureBody.generateFrame(0));
}
}
catch (SaslException e)
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java Sat Nov 24 13:14:14 2007
@@ -40,15 +40,15 @@
return _instance;
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionTuneOkBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ConnectionTuneOkBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
- ConnectionTuneOkBody body = evt.getMethod();
+
if (_logger.isDebugEnabled())
{
_logger.debug(body);
}
stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
- session.initHeartbeats(body.heartbeat);
+ session.initHeartbeats(body.getHeartbeat());
}
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java Sat Nov 24 13:14:14 2007
@@ -21,10 +21,8 @@
package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ExchangeBoundBody;
-import org.apache.qpid.framing.ExchangeBoundOkBody;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -36,6 +34,8 @@
/**
* @author Apache Software Foundation
+ *
+ *
*/
public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBoundBody>
{
@@ -64,35 +64,32 @@
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ExchangeBoundBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ExchangeBoundBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
+
VirtualHost virtualHost = session.getVirtualHost();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
+ MethodRegistry methodRegistry = session.getMethodRegistry();
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- byte major = (byte)8;
- byte minor = (byte)0;
- ExchangeBoundBody body = evt.getMethod();
- AMQShortString exchangeName = body.exchange;
- AMQShortString queueName = body.queue;
- AMQShortString routingKey = body.routingKey;
+
+ AMQShortString exchangeName = body.getExchange();
+ AMQShortString queueName = body.getQueue();
+ AMQShortString routingKey = body.getRoutingKey();
if (exchangeName == null)
{
throw new AMQException("Exchange exchange must not be null");
}
Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName);
- AMQFrame response;
+ ExchangeBoundOkBody response;
if (exchange == null)
{
- // AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
- EXCHANGE_NOT_FOUND, // replyCode
- new AMQShortString("Exchange " + exchangeName + " not found")); // replyText
+
+
+ response = methodRegistry.createExchangeBoundOkBody(EXCHANGE_NOT_FOUND,
+ new AMQShortString("Exchange " + exchangeName + " not found"));
}
else if (routingKey == null)
{
@@ -100,18 +97,12 @@
{
if (exchange.hasBindings())
{
- // AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
- OK, // replyCode
- null); // replyText
+ response = methodRegistry.createExchangeBoundOkBody(OK, null);
}
else
{
- // AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
- NO_BINDINGS, // replyCode
+
+ response = methodRegistry.createExchangeBoundOkBody(NO_BINDINGS, // replyCode
null); // replyText
}
}
@@ -121,28 +112,22 @@
AMQQueue queue = queueRegistry.getQueue(queueName);
if (queue == null)
{
- // AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
- QUEUE_NOT_FOUND, // replyCode
+
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
new AMQShortString("Queue " + queueName + " not found")); // replyText
}
else
{
if (exchange.isBound(queue))
{
- // AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
- OK, // replyCode
+
+ response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
null); // replyText
}
else
{
- // AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
- QUEUE_NOT_BOUND, // replyCode
+
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_BOUND, // replyCode
new AMQShortString("Queue " + queueName + " not bound to exchange " + exchangeName)); // replyText
}
}
@@ -153,53 +138,43 @@
AMQQueue queue = queueRegistry.getQueue(queueName);
if (queue == null)
{
- // AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
- QUEUE_NOT_FOUND, // replyCode
+
+ response = methodRegistry.createExchangeBoundOkBody(QUEUE_NOT_FOUND, // replyCode
new AMQShortString("Queue " + queueName + " not found")); // replyText
}
else
{
- if (exchange.isBound(body.routingKey, queue))
+ if (exchange.isBound(body.getRoutingKey(), queue))
{
- // AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
- OK, // replyCode
+
+ response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
null); // replyText
}
else
{
- // AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
- SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode
+
+ response = methodRegistry.createExchangeBoundOkBody(SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode
new AMQShortString("Queue " + queueName + " not bound with routing key " +
- body.routingKey + " to exchange " + exchangeName)); // replyText
+ body.getRoutingKey() + " to exchange " + exchangeName)); // replyText
}
}
}
else
{
- if (exchange.isBound(body.routingKey))
+ if (exchange.isBound(body.getRoutingKey()))
{
- // AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
- OK, // replyCode
+
+ response = methodRegistry.createExchangeBoundOkBody(OK, // replyCode
null); // replyText
}
else
{
- // AMQP version change: Be aware of possible changes to parameter order as versions change.
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- major, minor, // AMQP version (major, minor)
- NO_QUEUE_BOUND_WITH_RK, // replyCode
- new AMQShortString("No queue bound with routing key " + body.routingKey +
+
+ response = methodRegistry.createExchangeBoundOkBody(NO_QUEUE_BOUND_WITH_RK, // replyCode
+ new AMQShortString("No queue bound with routing key " + body.getRoutingKey() +
" to exchange " + exchangeName)); // replyText
}
}
- session.writeFrame(response);
+ session.writeFrame(response.generateFrame(channelId));
}
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java Sat Nov 24 13:14:14 2007
@@ -24,9 +24,7 @@
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUnknownExchangeType;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.ExchangeDeclareBody;
-import org.apache.qpid.framing.ExchangeDeclareOkBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.exchange.Exchange;
@@ -54,61 +52,60 @@
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ExchangeDeclareBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ExchangeDeclareBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
- final ExchangeDeclareBody body = evt.getMethod();
+
if (_logger.isDebugEnabled())
{
- _logger.debug("Request to declare exchange of type " + body.type + " with name " + body.exchange);
+ _logger.debug("Request to declare exchange of type " + body.getType() + " with name " + body.getExchange());
}
synchronized(exchangeRegistry)
{
- Exchange exchange = exchangeRegistry.getExchange(body.exchange);
+ Exchange exchange = exchangeRegistry.getExchange(body.getExchange());
if (exchange == null)
{
- if(body.passive && ((body.type == null) || body.type.length() ==0))
+ if(body.getPassive() && ((body.getType() == null) || body.getType().length() ==0))
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + body.exchange);
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Unknown exchange: " + body.getExchange());
}
else
{
try
{
- exchange = exchangeFactory.createExchange(body.exchange == null ? null : body.exchange.intern(),
- body.type == null ? null : body.type.intern(),
- body.durable,
- body.passive, body.ticket);
+ exchange = exchangeFactory.createExchange(body.getExchange() == null ? null : body.getExchange().intern(),
+ body.getType() == null ? null : body.getType().intern(),
+ body.getDurable(),
+ body.getPassive(), body.getTicket());
exchangeRegistry.registerExchange(exchange);
}
catch(AMQUnknownExchangeType e)
{
- throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + body.exchange,e);
+ throw body.getConnectionException(AMQConstant.COMMAND_INVALID, "Unknown exchange: " + body.getExchange(),e);
}
}
}
- else if (!exchange.getType().equals(body.type))
+ else if (!exchange.getType().equals(body.getType()))
{
- throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.exchange + " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor());
+ throw new AMQConnectionException(AMQConstant.NOT_ALLOWED, "Attempt to redeclare exchange: " + body.getExchange() + " of type " + exchange.getType() + " to " + body.getType() +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor());
}
}
- if(!body.nowait)
+ if(!body.getNowait())
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame response = ExchangeDeclareOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
- session.writeFrame(response);
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createExchangeDeclareOkBody();
+ session.writeFrame(responseBody.generateFrame(channelId));
+
}
}
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java Sat Nov 24 13:14:14 2007
@@ -45,21 +45,20 @@
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ExchangeDeleteBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, ExchangeDeleteBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
- ExchangeDeleteBody body = evt.getMethod();
+
try
{
- exchangeRegistry.unregisterExchange(body.exchange, body.ifUnused);
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame response = ExchangeDeleteOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
- session.writeFrame(response);
+ exchangeRegistry.unregisterExchange(body.getExchange(), body.getIfUnused());
+
+ ExchangeDeleteOkBody responseBody = session.getMethodRegistry().createExchangeDeleteOkBody();
+
+ session.writeFrame(responseBody.generateFrame(channelId));
}
catch (ExchangeInUseException e)
{
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java Sat Nov 24 13:14:14 2007
@@ -23,9 +23,7 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidRoutingKeyException;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.QueueBindBody;
-import org.apache.qpid.framing.QueueBindOkBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
@@ -53,22 +51,24 @@
{
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueBindBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, QueueBindBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
- final QueueBindBody body = evt.getMethod();
+
final AMQQueue queue;
- if (body.queue == null)
+ final AMQShortString routingKey;
+
+ if (body.getQueue() == null)
{
- AMQChannel channel = session.getChannel(evt.getChannelId());
+ AMQChannel channel = session.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(evt.getChannelId());
+ throw body.getChannelNotFoundException(channelId);
}
queue = channel.getDefaultQueue();
@@ -78,41 +78,42 @@
throw body.getChannelException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null");
}
- if (body.routingKey == null)
+ if (body.getRoutingKey() == null)
+ {
+ routingKey = queue.getName();
+ }
+ else
{
- body.routingKey = queue.getName();
+ routingKey = body.getRoutingKey().intern();
}
}
else
{
- queue = queueRegistry.getQueue(body.queue);
+ queue = queueRegistry.getQueue(body.getQueue());
+ routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern();
}
if (queue == null)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
}
- final Exchange exch = exchangeRegistry.getExchange(body.exchange);
+ final Exchange exch = exchangeRegistry.getExchange(body.getExchange());
if (exch == null)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.exchange + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist.");
}
- if (body.routingKey != null)
- {
- body.routingKey = body.routingKey.intern();
- }
try
{
- if (!exch.isBound(body.routingKey, body.arguments, queue))
+ if (!exch.isBound(routingKey, body.getArguments(), queue))
{
- queue.bind(body.routingKey, body.arguments, exch);
+ queue.bind(routingKey, body.getArguments(), exch);
}
}
catch (AMQInvalidRoutingKeyException rke)
{
- throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, body.routingKey.toString());
+ throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, routingKey.toString());
}
catch (AMQException e)
{
@@ -121,15 +122,14 @@
if (_log.isInfoEnabled())
{
- _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + body.routingKey);
+ _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey);
}
- if (!body.nowait)
+ if (!body.getNowait())
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId(), (byte) 8, (byte) 0);
- session.writeFrame(response);
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createQueueBindOkBody();
+ session.writeFrame(responseBody.generateFrame(channelId));
+
}
}
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Sat Nov 24 13:14:14 2007
@@ -28,10 +28,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.configuration.Configured;
import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.QueueDeclareBody;
-import org.apache.qpid.framing.QueueDeclareOkBody;
+import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.configuration.Configurator;
@@ -70,7 +67,7 @@
Configurator.configure(this);
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueDeclareBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, QueueDeclareBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
@@ -78,12 +75,20 @@
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
MessageStore store = virtualHost.getMessageStore();
- QueueDeclareBody body = evt.getMethod();
+
+
+
+ final AMQShortString queueName;
// if we aren't given a queue name, we create one which we return to the client
- if (body.queue == null)
+
+ if (body.getQueue() == null)
{
- body.queue = createName();
+ queueName = createName();
+ }
+ else
+ {
+ queueName = body.getQueue().intern();
}
AMQQueue queue;
@@ -94,16 +99,12 @@
- if (((queue = queueRegistry.getQueue(body.queue)) == null))
+ if (((queue = queueRegistry.getQueue(queueName)) == null))
{
- if(body.queue != null)
- {
- body.queue = body.queue.intern();
- }
- if (body.passive)
+ if (body.getPassive())
{
- String msg = "Queue: " + body.queue + " not found on VirtualHost(" + virtualHost + ").";
+ String msg = "Queue: " + queueName + " not found on VirtualHost(" + virtualHost + ").";
throw body.getChannelException(AMQConstant.NOT_FOUND, msg);
}
else
@@ -118,42 +119,40 @@
{
Exchange defaultExchange = exchangeRegistry.getDefaultExchange();
- queue.bind(body.queue, null, defaultExchange);
- _log.info("Queue " + body.queue + " bound to default exchange(" + defaultExchange.getName() + ")");
+ queue.bind(queueName, null, defaultExchange);
+ _log.info("Queue " + queueName + " bound to default exchange(" + defaultExchange.getName() + ")");
}
}
}
else if (queue.getOwner() != null && !session.getContextKey().equals(queue.getOwner()))
{
- throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + body.queue + "'),"
+ throw body.getChannelException(AMQConstant.ALREADY_EXISTS, "Cannot declare queue('" + queueName + "'),"
+ " as exclusive queue with same name "
+ "declared on another client ID('"
+ queue.getOwner() + "')");
}
- AMQChannel channel = session.getChannel(evt.getChannelId());
+ AMQChannel channel = session.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(evt.getChannelId());
+ throw body.getChannelNotFoundException(channelId);
}
//set this as the default queue on the channel:
channel.setDefaultQueue(queue);
}
- if (!body.nowait)
+ if (!body.getNowait())
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame response = QueueDeclareOkBody.createAMQFrame(evt.getChannelId(),
- (byte) 8, (byte) 0, // AMQP version (major, minor)
- queue.getConsumerCount(), // consumerCount
- queue.getMessageCount(), // messageCount
- body.queue); // queue
- _log.info("Queue " + body.queue + " declared successfully");
- session.writeFrame(response);
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+ QueueDeclareOkBody responseBody =
+ methodRegistry.createQueueDeclareOkBody(queueName,
+ queue.getMessageCount(),
+ queue.getConsumerCount());
+ session.writeFrame(responseBody.generateFrame(channelId));
+
+ _log.info("Queue " + queueName + " declared successfully");
}
}
@@ -166,11 +165,11 @@
throws AMQException
{
final QueueRegistry registry = virtualHost.getQueueRegistry();
- AMQShortString owner = body.exclusive ? session.getContextKey() : null;
- final AMQQueue queue = new AMQQueue(body.queue, body.durable, owner, body.autoDelete, virtualHost);
+ AMQShortString owner = body.getExclusive() ? session.getContextKey() : null;
+ final AMQQueue queue = new AMQQueue(body.getQueue(), body.getDurable(), owner, body.getAutoDelete(), virtualHost);
final AMQShortString queueName = queue.getName();
- if (body.exclusive && !body.durable)
+ if (body.getExclusive() && !body.getDurable())
{
final AMQProtocolSession.Task deleteQueueTask =
new AMQProtocolSession.Task()
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java Sat Nov 24 13:14:14 2007
@@ -23,6 +23,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.QueueDeleteBody;
import org.apache.qpid.framing.QueueDeleteOkBody;
+import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -56,22 +57,21 @@
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueueDeleteBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, QueueDeleteBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
MessageStore store = virtualHost.getMessageStore();
- QueueDeleteBody body = evt.getMethod();
AMQQueue queue;
- if (body.queue == null)
+ if (body.getQueue() == null)
{
- AMQChannel channel = session.getChannel(evt.getChannelId());
+ AMQChannel channel = session.getChannel(channelId);
if (channel == null)
{
- throw body.getChannelNotFoundException(evt.getChannelId());
+ throw body.getChannelNotFoundException(channelId);
}
//get the default queue on the channel:
@@ -79,43 +79,40 @@
}
else
{
- queue = queueRegistry.getQueue(body.queue);
+ queue = queueRegistry.getQueue(body.getQueue());
}
if (queue == null)
{
if (_failIfNotFound)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
}
}
else
{
- if (body.ifEmpty && !queue.isEmpty())
+ if (body.getIfEmpty() && !queue.isEmpty())
{
- throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.queue + " is not empty.");
+ throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is not empty.");
}
- else if (body.ifUnused && !queue.isUnused())
+ else if (body.getIfUnused() && !queue.isUnused())
{
// TODO - Error code
- throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.queue + " is still used.");
+ throw body.getChannelException(AMQConstant.IN_USE, "Queue: " + body.getQueue() + " is still used.");
}
else
{
- int purged = queue.delete(body.ifUnused, body.ifEmpty);
+ int purged = queue.delete(body.getIfUnused(), body.getIfEmpty());
if (queue.isDurable())
{
store.removeQueue(queue.getName());
}
-
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- session.writeFrame(QueueDeleteOkBody.createAMQFrame(evt.getChannelId(),
- (byte) 8, (byte) 0, // AMQP version (major, minor)
- purged)); // messageCount
+
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+ QueueDeleteOkBody responseBody = methodRegistry.createQueueDeleteOkBody(purged);
+ session.writeFrame(responseBody.generateFrame(channelId));
}
}
}
Modified: incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java?rev=597918&r1=597917&r2=597918&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/broker/src/main/java/org/apache/qpid/server/handler/QueuePurgeHandler.java Sat Nov 24 13:14:14 2007
@@ -24,6 +24,8 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.QueuePurgeBody;
import org.apache.qpid.framing.QueuePurgeOkBody;
+import org.apache.qpid.framing.MethodRegistry;
+import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.protocol.AMQProtocolSession;
@@ -55,22 +57,22 @@
_failIfNotFound = failIfNotFound;
}
- public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<QueuePurgeBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, QueuePurgeBody body, int channelId) throws AMQException
{
AMQProtocolSession session = stateManager.getProtocolSession();
VirtualHost virtualHost = session.getVirtualHost();
QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
- AMQChannel channel = session.getChannel(evt.getChannelId());
+ AMQChannel channel = session.getChannel(channelId);
+
- QueuePurgeBody body = evt.getMethod();
AMQQueue queue;
- if(body.queue == null)
+ if(body.getQueue() == null)
{
if (channel == null)
{
- throw body.getChannelNotFoundException(evt.getChannelId());
+ throw body.getChannelNotFoundException(channelId);
}
//get the default queue on the channel:
@@ -86,14 +88,14 @@
}
else
{
- queue = queueRegistry.getQueue(body.queue);
+ queue = queueRegistry.getQueue(body.getQueue());
}
if(queue == null)
{
if(_failIfNotFound)
{
- throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.queue + " does not exist.");
+ throw body.getChannelException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist.");
}
}
else
@@ -101,14 +103,13 @@
long purged = queue.clearQueue(channel.getStoreContext());
- if(!body.nowait)
+ if(!body.getNowait())
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- session.writeFrame(QueuePurgeOkBody.createAMQFrame(evt.getChannelId(),
- (byte)8, (byte)0, // AMQP version (major, minor)
- purged)); // messageCount
+
+ MethodRegistry methodRegistry = session.getMethodRegistry();
+ AMQMethodBody responseBody = methodRegistry.createQueuePurgeOkBody(purged);
+ session.writeFrame(responseBody.generateFrame(channelId));
+
}
}
}