You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kp...@apache.org on 2007/02/14 21:02:22 UTC
svn commit: r507672 [4/16] - in /incubator/qpid/branches/qpid.0-9:
gentools/src/org/apache/qpid/gentools/ gentools/templ.java/
gentools/templ.net/ java/ java/broker/ java/broker/bin/
java/broker/distribution/ java/broker/distribution/src/ java/broker/d...
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/NoConsumerFilter.java Wed Feb 14 12:02:03 2007
@@ -20,15 +20,9 @@
*/
package org.apache.qpid.server.filter;
-import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.filter.jms.selector.SelectorParser;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQInvalidSelectorException;
import org.apache.log4j.Logger;
-
-
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
public class NoConsumerFilter implements MessageFilter
{
@@ -36,7 +30,7 @@
public NoConsumerFilter() throws AMQException
- {
+ {
_logger.info("Created NoConsumerFilter");
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java Wed Feb 14 12:02:03 2007
@@ -21,19 +21,12 @@
// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
//
-import java.io.IOException;
+//import java.io.IOException;
import java.util.HashMap;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-
-//import org.apache.activemq.command.ActiveMQDestination;
-//import org.apache.activemq.command.Message;
-//import org.apache.activemq.command.TransactionId;
-//import org.apache.activemq.util.JMSExceptionSupport;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.message.jms.JMSMessage;
import org.apache.qpid.AMQException;
import org.apache.log4j.Logger;
@@ -44,41 +37,11 @@
*/
public class PropertyExpression implements Expression
{
-
- interface SubExpression
- {
- public Object evaluate(AMQMessage message);
- }
-
- interface JMSExpression
- {
- public abstract Object evaluate(JMSMessage message);
- }
-
- static class SubJMSExpression implements SubExpression
- {
- JMSExpression _expression;
-
- SubJMSExpression(JMSExpression expression)
- {
- _expression = expression;
- }
-
-
- public Object evaluate(AMQMessage message)
- {
- JMSMessage msg = (JMSMessage) message.getDecodedMessage(AMQMessage.JMS_MESSAGE);
- if (msg != null)
- {
- return _expression.evaluate(msg);
- }
- else
- {
- return null;
- }
- }
- }
-
+ // Constants - defined the same as JMS
+ private static final int NON_PERSISTENT = 1;
+ private static final int PERSISTENT = 2;
+ private static final int DEFAULT_PRIORITY = 4;
+
private final static Logger _logger = org.apache.log4j.Logger.getLogger(PropertyExpression.class);
@@ -86,62 +49,43 @@
static
{
- JMS_PROPERTY_EXPRESSIONS.put("JMSDestination", new SubJMSExpression(
- new JMSExpression()
+ JMS_PROPERTY_EXPRESSIONS.put("JMSDestination",
+ new Expression()
{
- public Object evaluate(JMSMessage message)
+ public Object evaluate(AMQMessage message)
{
- return message.getJMSDestination();
+ return message.getDestination();
}
}
- ));
-//
-// public Object evaluate(AMQMessage message)
-// {
-// //fixme
-//
-//
-//// AMQDestination dest = message.getOriginalDestination();
-//// if (dest == null)
-//// {
-//// dest = message.getDestination();
-//// }
-//// if (dest == null)
-//// {
-//// return null;
-//// }
-//// return dest.toString();
-// return "";
-// }
-// });
- JMS_PROPERTY_EXPRESSIONS.put("JMSReplyTo", new SubJMSExpression(
- new JMSExpression()
+ );
+ JMS_PROPERTY_EXPRESSIONS.put("JMSReplyTo",
+ new Expression()
{
- public Object evaluate(JMSMessage message)
+ public Object evaluate(AMQMessage message)
{
- return message.getJMSReplyTo();
+ return message.getReplyTo();
}
- })
+ }
);
- JMS_PROPERTY_EXPRESSIONS.put("JMSType", new SubJMSExpression(
- new JMSExpression()
+ JMS_PROPERTY_EXPRESSIONS.put("JMSType",
+ new Expression()
{
- public Object evaluate(JMSMessage message)
+ public Object evaluate(AMQMessage message)
{
- return message.getJMSType();
+ return message.getType();
}
}
- ));
+ );
- JMS_PROPERTY_EXPRESSIONS.put("JMSDeliveryMode", new SubJMSExpression(
- new JMSExpression()
+ JMS_PROPERTY_EXPRESSIONS.put("JMSDeliveryMode",
+ new Expression()
{
- public Object evaluate(JMSMessage message)
+ public Object evaluate(AMQMessage message)
{
try
{
- Integer mode = new Integer(message.getAMQMessage().isPersistent() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+ int mode = message.isPersistent() ? PERSISTENT : NON_PERSISTENT;
_logger.info("JMSDeliveryMode is :" + mode);
return mode;
}
@@ -150,83 +94,83 @@
//shouldn't happen
}
- return DeliveryMode.NON_PERSISTENT;
+ return NON_PERSISTENT;
}
- }));
+ });
- JMS_PROPERTY_EXPRESSIONS.put("JMSPriority", new SubJMSExpression(
- new JMSExpression()
+ JMS_PROPERTY_EXPRESSIONS.put("JMSPriority",
+ new Expression()
{
- public Object evaluate(JMSMessage message)
+ public Object evaluate(AMQMessage message)
{
- return message.getJMSPriority();
+ return message.getPriority();
}
}
- ));
+ );
- JMS_PROPERTY_EXPRESSIONS.put("AMQMessageID", new SubJMSExpression(
- new JMSExpression()
+ JMS_PROPERTY_EXPRESSIONS.put("AMQMessageID",
+ new Expression()
{
- public Object evaluate(JMSMessage message)
+ public Object evaluate(AMQMessage message)
{
- return message.getAMQMessage().getMessageId();
+ return message.getMessageId();
}
}
- ));
+ );
- JMS_PROPERTY_EXPRESSIONS.put("JMSTimestamp", new SubJMSExpression(
- new JMSExpression()
+ JMS_PROPERTY_EXPRESSIONS.put("JMSTimestamp",
+ new Expression()
{
- public Object evaluate(JMSMessage message)
+ public Object evaluate(AMQMessage message)
{
- return message.getJMSTimestamp();
+ return message.getTimestamp();
}
}
- ));
+ );
- JMS_PROPERTY_EXPRESSIONS.put("JMSCorrelationID", new SubJMSExpression(
- new JMSExpression()
+ JMS_PROPERTY_EXPRESSIONS.put("JMSCorrelationID",
+ new Expression()
{
- public Object evaluate(JMSMessage message)
+ public Object evaluate(AMQMessage message)
{
- return message.getJMSCorrelationID();
+ return message.getCorrelationId();
}
}
- ));
+ );
- JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", new SubJMSExpression(
- new JMSExpression()
+ JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration",
+ new Expression()
{
- public Object evaluate(JMSMessage message)
+ public Object evaluate(AMQMessage message)
{
- return message.getJMSExpiration();
+ return message.getExpiration();
}
}
- ));
+ );
- JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new SubJMSExpression(
- new JMSExpression()
+ JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered",
+ new Expression()
{
- public Object evaluate(JMSMessage message)
+ public Object evaluate(AMQMessage message)
{
- return message.getAMQMessage().isRedelivered();
+ return message.isRedelivered();
}
}
- ));
+ );
}
- private final String name;
- private final SubExpression jmsPropertyExpression;
+ private final AMQShortString name;
+ private final Expression jmsPropertyExpression;
- public PropertyExpression(String name)
+ public PropertyExpression(AMQShortString name)
{
this.name = name;
- jmsPropertyExpression = (SubExpression) JMS_PROPERTY_EXPRESSIONS.get(name);
+ jmsPropertyExpression = (Expression) JMS_PROPERTY_EXPRESSIONS.get(name);
}
- public Object evaluate(AMQMessage message) throws JMSException
+ public Object evaluate(AMQMessage message) throws AMQException
{
// try
// {
@@ -252,21 +196,21 @@
}
// catch (IOException ioe)
// {
-// JMSException exception = new JMSException("Could not get property: " + name + " reason: " + ioe.getMessage());
+// AMQException exception = new AMQException("Could not get property: " + name + " reason: " + ioe.getMessage());
// exception.initCause(ioe);
// throw exception;
// }
// }
// catch (IOException e)
// {
-// JMSException exception = new JMSException(e.getMessage());
+// AMQException exception = new AMQException(e.getMessage());
// exception.initCause(e);
// throw exception;
// }
}
- public String getName()
+ public AMQShortString getName()
{
return name;
}
@@ -277,7 +221,7 @@
*/
public String toString()
{
- return name;
+ return name.asString();
}
/**
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/SimpleFilterManager.java Wed Feb 14 12:02:03 2007
@@ -20,16 +20,15 @@
*/
package org.apache.qpid.server.filter;
+import org.apache.log4j.Logger;
import org.apache.qpid.server.queue.AMQMessage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.qpid.AMQException;
-import javax.jms.JMSException;
import java.util.concurrent.ConcurrentLinkedQueue;
public class SimpleFilterManager implements FilterManager
{
- private final Logger _logger = LoggerFactory.getLogger(SimpleFilterManager.class);
+ private final Logger _logger = Logger.getLogger(SimpleFilterManager.class);
private final ConcurrentLinkedQueue<MessageFilter> _filters;
@@ -60,10 +59,10 @@
return false;
}
}
- catch (JMSException e)
+ catch (AMQException e)
{
//fixme
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ e.printStackTrace();
return false;
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/UnaryExpression.java Wed Feb 14 12:02:03 2007
@@ -20,8 +20,8 @@
// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
//
+import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQMessage;
-import org.apache.qpid.server.message.jms.JMSMessage;
import java.math.BigDecimal;
import java.util.Collection;
@@ -29,8 +29,6 @@
import java.util.Iterator;
import java.util.List;
-import javax.jms.JMSException;
-
/**
* An expression which performs an operation on two expression values
*
@@ -43,7 +41,8 @@
public static Expression createNegate(Expression left) {
return new UnaryExpression(left) {
- public Object evaluate(AMQMessage message) throws JMSException {
+ public Object evaluate(AMQMessage message) throws AMQException
+ {
Object rvalue = right.evaluate(message);
if (rvalue == null) {
return null;
@@ -74,7 +73,7 @@
final Collection inList = t;
return new BooleanUnaryExpression(right) {
- public Object evaluate(AMQMessage message) throws JMSException {
+ public Object evaluate(AMQMessage message) throws AMQException {
Object rvalue = right.evaluate(message);
if (rvalue == null) {
@@ -126,7 +125,7 @@
super(left);
}
- public boolean matches(AMQMessage message) throws JMSException {
+ public boolean matches(AMQMessage message) throws AMQException {
Object object = evaluate(message);
return object!=null && object==Boolean.TRUE;
}
@@ -135,7 +134,7 @@
public static BooleanExpression createNOT(BooleanExpression left) {
return new BooleanUnaryExpression(left) {
- public Object evaluate(AMQMessage message) throws JMSException {
+ public Object evaluate(AMQMessage message) throws AMQException {
Boolean lvalue = (Boolean) right.evaluate(message);
if (lvalue == null) {
return null;
@@ -159,7 +158,7 @@
public static BooleanExpression createBooleanCast(Expression left) {
return new BooleanUnaryExpression(left) {
- public Object evaluate(AMQMessage message) throws JMSException {
+ public Object evaluate(AMQMessage message) throws AMQException {
Object rvalue = right.evaluate(message);
if (rvalue == null)
return null;
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/XPathExpression.java Wed Feb 14 12:02:03 2007
@@ -20,18 +20,14 @@
// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
//
-import java.io.IOException;
-import java.lang.reflect.Constructor;
-import java.lang.reflect.InvocationTargetException;
-
-import javax.jms.JMSException;
-
-//import org.apache.activemq.command.Message;
-//import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQMessage;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+
/**
* Used to evaluate an XPath Expression in a JMS selector.
*/
@@ -75,7 +71,7 @@
private final XPathEvaluator evaluator;
static public interface XPathEvaluator {
- public boolean evaluate(AMQMessage message) throws JMSException;
+ public boolean evaluate(AMQMessage message) throws AMQException;
}
XPathExpression(String xpath) {
@@ -97,7 +93,7 @@
}
}
- public Object evaluate(AMQMessage message) throws JMSException {
+ public Object evaluate(AMQMessage message) throws AMQException {
// try {
//FIXME this is flow to disk work
// if( message.isDropped() )
@@ -120,9 +116,10 @@
/**
* @param message
* @return true if the expression evaluates to Boolean.TRUE.
- * @throws JMSException
+ * @throws AMQException
*/
- public boolean matches(AMQMessage message) throws JMSException {
+ public boolean matches(AMQMessage message) throws AMQException
+ {
Object object = evaluate(message);
return object!=null && object==Boolean.TRUE;
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/XQueryExpression.java Wed Feb 14 12:02:03 2007
@@ -17,9 +17,9 @@
*/
package org.apache.qpid.server.filter;
+import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQMessage;
-import javax.jms.JMSException;
//
// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
//
@@ -35,7 +35,7 @@
this.xpath = xpath;
}
- public Object evaluate(AMQMessage message) throws JMSException {
+ public Object evaluate(AMQMessage message) throws AMQException {
return Boolean.FALSE;
}
@@ -46,9 +46,10 @@
/**
* @param message
* @return true if the expression evaluates to Boolean.TRUE.
- * @throws JMSException
+ * @throws AMQException
*/
- public boolean matches(AMQMessage message) throws JMSException {
+ public boolean matches(AMQMessage message) throws AMQException
+ {
Object object = evaluate(message);
return object!=null && object==Boolean.TRUE;
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/filter/XalanXPathEvaluator.java Wed Feb 14 12:02:03 2007
@@ -21,23 +21,18 @@
// Based on like named file from r450141 of the Apache ActiveMQ project <http://www.activemq.org/site/home.html>
//
-import java.io.StringReader;
-import java.io.ByteArrayInputStream;
-
-import javax.jms.BytesMessage;
-import javax.jms.JMSException;
-import javax.jms.TextMessage;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
-
-//import org.apache.activemq.command.Message;
-//import org.apache.activemq.util.ByteArrayInputStream;
-import org.apache.xpath.CachedXPathAPI;
+import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.xpath.CachedXPathAPI;
import org.w3c.dom.Document;
import org.w3c.dom.traversal.NodeIterator;
import org.xml.sax.InputSource;
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import java.io.ByteArrayInputStream;
+import java.io.StringReader;
+
public class XalanXPathEvaluator implements XPathExpression.XPathEvaluator {
private final String xpath;
@@ -46,17 +41,24 @@
this.xpath = xpath;
}
- public boolean evaluate(AMQMessage m) throws JMSException {
+ public boolean evaluate(AMQMessage m) throws AMQException
+ {
+ // TODO - we would have to check the content type and then evaluate the content
+ // here... is this really a feature we wish to implement? - RobG
+ /*
+
if( m instanceof TextMessage ) {
String text = ((TextMessage)m).getText();
- return evaluate(text);
+ return evaluate(text);
} else if ( m instanceof BytesMessage ) {
BytesMessage bm = (BytesMessage) m;
byte data[] = new byte[(int) bm.getBodyLength()];
bm.readBytes(data);
return evaluate(data);
- }
+ }
+ */
return false;
+
}
private boolean evaluate(byte[] data) {
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java Wed Feb 14 12:02:03 2007
@@ -20,18 +20,15 @@
*/
package org.apache.qpid.server.handler;
-import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ChannelCloseBody;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.log4j.Logger;
+
public class ChannelCloseHandler implements StateAwareMethodListener<ChannelCloseBody>
{
private static final Logger _logger = Logger.getLogger(ChannelCloseHandler.class);
@@ -43,16 +40,14 @@
return _instance;
}
- private ChannelCloseHandler()
- {
- }
+ private ChannelCloseHandler() {}
- public void methodReceived(AMQProtocolSession protocolSession,
- AMQMethodEvent<ChannelCloseBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelCloseBody> evt) throws AMQException
{
- ChannelCloseBody body = evt.getMethod();
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ final ChannelCloseBody body = evt.getMethod();
_logger.info("Received channel close for id " + evt.getChannelId() + " citing class " +
body.classId + " and method " + body.methodId);
- protocolSession.closeChannelResponse(evt.getChannelId(), evt.getRequestId());
+ session.closeChannelResponse(evt.getChannelId(), evt.getRequestId());
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseOkHandler.java Wed Feb 14 12:02:03 2007
@@ -23,11 +23,10 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ChannelCloseOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+
import org.apache.log4j.Logger;
public class ChannelCloseOkHandler implements StateAwareMethodListener<ChannelCloseOkBody>
@@ -41,14 +40,12 @@
return _instance;
}
- private ChannelCloseOkHandler()
- {
- }
+ private ChannelCloseOkHandler() {}
- public void methodReceived(AMQProtocolSession protocolSession,
- AMQMethodEvent<ChannelCloseOkBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelCloseOkBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
_logger.info("Received channel-close-ok for channel-id " + evt.getChannelId());
- protocolSession.removeChannel(evt.getChannelId());
+ session.removeChannel(evt.getChannelId());
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java Wed Feb 14 12:02:03 2007
@@ -20,18 +20,17 @@
*/
package org.apache.qpid.server.handler;
-import org.apache.log4j.Logger;
-import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.ChannelFlowBody;
import org.apache.qpid.framing.ChannelFlowOkBody;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQMethodEvent;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
+import org.apache.log4j.Logger;
public class ChannelFlowHandler implements StateAwareMethodListener<ChannelFlowBody>
{
@@ -44,24 +43,21 @@
return _instance;
}
- private ChannelFlowHandler()
- {
- }
+ private ChannelFlowHandler() {}
- public void methodReceived(AMQProtocolSession protocolSession,
- AMQMethodEvent<ChannelFlowBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelFlowBody> evt) throws AMQException
{
- ChannelFlowBody body = evt.getMethod();
-
- AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ final ChannelFlowBody body = evt.getMethod();
+ AMQChannel channel = session.getChannel(evt.getChannelId());
channel.setSuspended(!body.active);
_logger.debug("Channel.Flow for channel " + evt.getChannelId() + ", active=" + body.active);
// Be aware of possible changes to parameter order as versions change.
AMQMethodBody response = ChannelFlowOkBody.createMethodBody(
- protocolSession.getMajor(), // AMQP major version
- protocolSession.getMinor(), // AMQP minor version
+ session.getProtocolMajorVersion(), // AMQP major version
+ session.getProtocolMinorVersion(), // AMQP minor version
body.active); // active
- protocolSession.writeResponse(evt, response);
+ session.writeResponse(evt, response);
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java Wed Feb 14 12:02:03 2007
@@ -26,16 +26,17 @@
import org.apache.qpid.framing.ChannelOpenOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.registry.IApplicationRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+//import org.apache.log4j.Logger;
public class ChannelOpenHandler implements StateAwareMethodListener<ChannelOpenBody>
{
+ //private static final Logger _logger = Logger.getLogger(ChannelOpenHandler.class);
+
private static ChannelOpenHandler _instance = new ChannelOpenHandler();
public static ChannelOpenHandler getInstance()
@@ -43,19 +44,18 @@
return _instance;
}
- private ChannelOpenHandler()
- {
- }
+ private ChannelOpenHandler() {}
- public void methodReceived(AMQProtocolSession protocolSession,
- AMQMethodEvent<ChannelOpenBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ChannelOpenBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ VirtualHost virtualHost = session.getVirtualHost();
// Be aware of possible changes to parameter order as versions change.
// XXX: Client id
AMQMethodBody response = ChannelOpenOkBody.createMethodBody(
- protocolSession.getMajor(), // AMQP major version
- protocolSession.getMinor(), // AMQP minor version
+ session.getProtocolMajorVersion(), // AMQP major version
+ session.getProtocolMinorVersion(), // AMQP minor version
"XXX".getBytes());
- protocolSession.writeResponse(evt, response);
+ session.writeResponse(evt, response);
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java Wed Feb 14 12:02:03 2007
@@ -20,16 +20,14 @@
*/
package org.apache.qpid.server.handler;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ConnectionCloseBody;
import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.ConnectionCloseOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.state.AMQStateManager;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.server.state.AMQStateManager;
+import org.apache.qpid.server.state.StateAwareMethodListener;
+
import org.apache.log4j.Logger;
public class ConnectionCloseMethodHandler implements StateAwareMethodListener<ConnectionCloseBody>
@@ -43,16 +41,14 @@
return _instance;
}
- private ConnectionCloseMethodHandler()
- {
- }
+ private ConnectionCloseMethodHandler() {}
- public void methodReceived(AMQProtocolSession protocolSession,
- AMQMethodEvent<ConnectionCloseBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionCloseBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
final ConnectionCloseBody body = evt.getMethod();
_logger.info("ConnectionClose received with reply code/reply text " + body.replyCode +
- "/" + body.replyText + " for " + protocolSession);
- protocolSession.closeSessionResponse(evt.getRequestId());
+ "/" + body.replyText + " for " + session);
+ session.closeSessionResponse(evt.getRequestId());
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseOkMethodHandler.java Wed Feb 14 12:02:03 2007
@@ -23,12 +23,10 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ConnectionCloseOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.state.AMQState;
+
import org.apache.log4j.Logger;
public class ConnectionCloseOkMethodHandler implements StateAwareMethodListener<ConnectionCloseOkBody>
@@ -42,14 +40,12 @@
return _instance;
}
- private ConnectionCloseOkMethodHandler()
- {
- }
+ private ConnectionCloseOkMethodHandler() {}
- public void methodReceived(AMQProtocolSession protocolSession,
- AMQMethodEvent<ConnectionCloseOkBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionCloseOkBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
_logger.info("Received Connection-close-ok");
- protocolSession.closeSession();
+ session.closeSession();
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java Wed Feb 14 12:02:03 2007
@@ -22,18 +22,23 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ConnectionOpenBody;
import org.apache.qpid.framing.ConnectionOpenOkBody;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+//import org.apache.log4j.Logger;
public class ConnectionOpenMethodHandler implements StateAwareMethodListener<ConnectionOpenBody>
{
+ //private static final Logger _logger = Logger.getLogger(ConnectionOpenMethodHandler.class);
+
private static ConnectionOpenMethodHandler _instance = new ConnectionOpenMethodHandler();
public static ConnectionOpenMethodHandler getInstance()
@@ -41,32 +46,50 @@
return _instance;
}
- private ConnectionOpenMethodHandler()
- {
- }
+ private ConnectionOpenMethodHandler() {}
- private static String generateClientID()
+ private static AMQShortString generateClientID()
{
- return Long.toString(System.currentTimeMillis());
+ return new AMQShortString(Long.toString(System.currentTimeMillis()));
}
- public void methodReceived(AMQProtocolSession protocolSession,
- AMQMethodEvent<ConnectionOpenBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionOpenBody> evt) throws AMQException
{
- ConnectionOpenBody body = evt.getMethod();
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ final ConnectionOpenBody body = evt.getMethod();
+
+ //ignore leading '/'
+ String virtualHostName;
+ if((body.virtualHost != null) && body.virtualHost.charAt(0) == '/')
+ {
+ virtualHostName = new StringBuilder(body.virtualHost.subSequence(1,body.virtualHost.length())).toString();
+ }
+ else
+ {
+ virtualHostName = String.valueOf(body.virtualHost);
+ }
+
+ VirtualHost virtualHost = stateManager.getVirtualHostRegistry().getVirtualHost(virtualHostName);
+
+ if(virtualHost == null)
+ {
+ throw body.getConnectionException(AMQConstant.NOT_FOUND.getCode(), "Unknown virtual host: " + virtualHostName);
+ }
+
+ session.setVirtualHost( virtualHost );
- //todo //FIXME The virtual host must be validated by the server for the connection to open-ok
// See Spec (0.8.2). Section 3.1.2 Virtual Hosts
- if (protocolSession.getContextKey() == null)
+ if (session.getContextKey() == null)
{
- protocolSession.setContextKey(generateClientID());
+ session.setContextKey(generateClientID());
}
+
// Be aware of possible changes to parameter order as versions change.
AMQMethodBody response = ConnectionOpenOkBody.createMethodBody(
- protocolSession.getMajor(), // AMQP major version
- protocolSession.getMinor(), // AMQP minor version
+ session.getProtocolMajorVersion(), // AMQP major version
+ session.getProtocolMinorVersion(), // AMQP minor version
body.virtualHost); // knownHosts
- protocolSession.getStateManager().changeState(AMQState.CONNECTION_OPEN);
- protocolSession.writeResponse(evt, response);
+ session.getStateManager().changeState(AMQState.CONNECTION_OPEN);
+ session.writeResponse(evt, response);
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java Wed Feb 14 12:02:03 2007
@@ -21,19 +21,22 @@
package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.ConnectionSecureBody;
+import org.apache.qpid.framing.ConnectionSecureOkBody;
+import org.apache.qpid.framing.ConnectionTuneBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.protocol.HeartbeatConfig;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.auth.AuthenticationManager;
import org.apache.qpid.server.security.auth.AuthenticationResult;
-import org.apache.qpid.server.registry.ApplicationRegistry;
+
import org.apache.log4j.Logger;
import javax.security.sasl.SaslServer;
@@ -50,35 +53,34 @@
return _instance;
}
- private ConnectionSecureOkMethodHandler()
- {
- }
+ private ConnectionSecureOkMethodHandler() {}
- public void methodReceived(AMQProtocolSession protocolSession,
- AMQMethodEvent<ConnectionSecureOkBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionSecureOkBody> evt) throws AMQException
{
- ConnectionSecureOkBody body = evt.getMethod();
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ final ConnectionSecureOkBody body = evt.getMethod();
AuthenticationManager authMgr = ApplicationRegistry.getInstance().getAuthenticationManager();
- SaslServer ss = protocolSession.getSaslServer();
+ SaslServer ss = session.getSaslServer();
if (ss == null)
{
throw new AMQException("No SASL context set up in session");
}
AuthenticationResult authResult = authMgr.authenticate(ss, body.response);
- AMQStateManager stateManager = protocolSession.getStateManager();
- byte major = protocolSession.getMajor();
- byte minor = protocolSession.getMinor();
+ byte major = session.getProtocolMajorVersion();
+ byte minor = session.getProtocolMinorVersion();
switch (authResult.status)
{
case ERROR:
// Can't do this as we violate protocol. Need to send Close
// throw new AMQException(AMQConstant.NOT_ALLOWED.getCode(), AMQConstant.NOT_ALLOWED.getName());
_logger.info("Authentication failed");
- disposeSaslServer(protocolSession);
- protocolSession.closeSessionRequest(AMQConstant.NOT_ALLOWED.getCode(),
+ stateManager.changeState(AMQState.CONNECTION_CLOSING);
+ // Be aware of possible changes to parameter order as versions change.
+ session.closeSessionRequest(AMQConstant.NOT_ALLOWED.getCode(),
AMQConstant.NOT_ALLOWED.getName(), body.getClazz(), body.getMethod());
+ disposeSaslServer(session);
break;
case SUCCESS:
_logger.info("Connected as: " + ss.getAuthorizationID());
@@ -92,8 +94,8 @@
Integer.MAX_VALUE, // channelMax
ConnectionStartOkMethodHandler.getConfiguredFrameSize(), // frameMax
HeartbeatConfig.getInstance().getDelay()); // heartbeat
- protocolSession.writeResponse(evt, tune);
- disposeSaslServer(protocolSession);
+ session.writeResponse(evt, tune);
+ disposeSaslServer(session);
break;
case CONTINUE:
stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
@@ -101,7 +103,7 @@
AMQMethodBody challenge = ConnectionSecureBody.createMethodBody(
major, minor, // AMQP version (major, minor)
authResult.challenge); // challenge
- protocolSession.writeResponse(evt, challenge);
+ session.writeResponse(evt, challenge);
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java Wed Feb 14 12:02:03 2007
@@ -20,18 +20,14 @@
*/
package org.apache.qpid.server.handler;
-import org.apache.log4j.Logger;
-import org.apache.commons.configuration.Configuration;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.ConnectionSecureBody;
import org.apache.qpid.framing.ConnectionStartOkBody;
import org.apache.qpid.framing.ConnectionTuneBody;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.protocol.HeartbeatConfig;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.security.auth.AuthenticationManager;
import org.apache.qpid.server.security.auth.AuthenticationResult;
@@ -39,10 +35,12 @@
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.commons.configuration.Configuration;
+import org.apache.log4j.Logger;
+
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
-
public class ConnectionStartOkMethodHandler implements StateAwareMethodListener<ConnectionStartOkBody>
{
private static final Logger _logger = Logger.getLogger(ConnectionStartOkMethodHandler.class);
@@ -56,13 +54,11 @@
return _instance;
}
- private ConnectionStartOkMethodHandler()
- {
- }
+ private ConnectionStartOkMethodHandler() {}
- public void methodReceived(AMQProtocolSession protocolSession,
- AMQMethodEvent<ConnectionStartOkBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionStartOkBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
final ConnectionStartOkBody body = evt.getMethod();
_logger.info("SASL Mechanism selected: " + body.mechanism);
_logger.info("Locale selected: " + body.locale);
@@ -72,18 +68,17 @@
SaslServer ss = null;
try
{
- ss = authMgr.createSaslServer(body.mechanism, protocolSession.getLocalFQDN());
- protocolSession.setSaslServer(ss);
+ ss = authMgr.createSaslServer(String.valueOf(body.mechanism), session.getLocalFQDN());
+ session.setSaslServer(ss);
AuthenticationResult authResult = authMgr.authenticate(ss, body.response);
//save clientProperties
- if (protocolSession.getClientProperties() == null)
+ if (session.getClientProperties() == null)
{
- protocolSession.setClientProperties(body.clientProperties);
+ session.setClientProperties(body.clientProperties);
}
- AMQStateManager stateManager = protocolSession.getStateManager();
switch (authResult.status)
{
case ERROR:
@@ -94,26 +89,26 @@
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
// Be aware of possible changes to parameter order as versions change.
AMQMethodBody tune = ConnectionTuneBody.createMethodBody(
- protocolSession.getMajor(), // AMQP major version
- protocolSession.getMinor(), // AMQP minor version
+ session.getProtocolMajorVersion(), // AMQP major version
+ session.getProtocolMinorVersion(), // AMQP minor version
Integer.MAX_VALUE, // channelMax
getConfiguredFrameSize(), // frameMax
HeartbeatConfig.getInstance().getDelay()); // heartbeat
- protocolSession.writeRequest(evt.getChannelId(), tune, stateManager);
+ session.writeRequest(evt.getChannelId(), tune, stateManager);
break;
case CONTINUE:
stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
// Be aware of possible changes to parameter order as versions change.
AMQMethodBody challenge = ConnectionSecureBody.createMethodBody(
- protocolSession.getMajor(), // AMQP major version
- protocolSession.getMinor(), // AMQP minor version
+ session.getProtocolMajorVersion(), // AMQP major version
+ session.getProtocolMinorVersion(), // AMQP minor version
authResult.challenge); // challenge
- protocolSession.writeRequest(evt.getChannelId(), challenge, stateManager);
+ session.writeRequest(evt.getChannelId(), challenge, stateManager);
}
}
catch (SaslException e)
{
- disposeSaslServer(protocolSession);
+ disposeSaslServer(session);
throw new AMQException("SASL error: " + e, e);
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionTuneOkMethodHandler.java Wed Feb 14 12:02:03 2007
@@ -20,17 +20,16 @@
*/
package org.apache.qpid.server.handler;
-import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ConnectionTuneOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQState;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.log4j.Logger;
+
public class ConnectionTuneOkMethodHandler implements StateAwareMethodListener<ConnectionTuneOkBody>
{
private static final Logger _logger = Logger.getLogger(ConnectionTuneOkMethodHandler.class);
@@ -41,17 +40,19 @@
{
return _instance;
}
+
+ private ConnectionTuneOkMethodHandler() {}
- public void methodReceived(AMQProtocolSession protocolSession,
- AMQMethodEvent<ConnectionTuneOkBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ConnectionTuneOkBody> evt) throws AMQException
{
- ConnectionTuneOkBody body = evt.getMethod();
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ final ConnectionTuneOkBody body = evt.getMethod();
if (_logger.isDebugEnabled())
{
_logger.debug(body);
}
- protocolSession.getStateManager().changeState(AMQState.CONNECTION_NOT_OPENED);
- protocolSession.initHeartbeats(body.heartbeat);
- protocolSession.setFrameMax(body.getFrameMax());
+ stateManager.changeState(AMQState.CONNECTION_NOT_OPENED);
+ session.initHeartbeats(body.heartbeat);
+ session.setFrameMax(body.getFrameMax());
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java Wed Feb 14 12:02:03 2007
@@ -1,24 +1,28 @@
/*
*
- * Copyright (c) 2006 The Apache Software Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
*
*/
package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQMethodBody;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ExchangeBoundBody;
import org.apache.qpid.framing.ExchangeBoundOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
@@ -29,13 +33,13 @@
import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+//import org.apache.log4j.Logger;
-/**
- * @author Apache Software Foundation
- */
public class ExchangeBoundHandler implements StateAwareMethodListener<ExchangeBoundBody>
{
- private static final ExchangeBoundHandler _instance = new ExchangeBoundHandler();
+ //private static final Logger _logger = Logger.getLogger(ExchangeBoundHandler.class);
public static final int OK = 0;
@@ -51,39 +55,42 @@
public static final int SPECIFIC_QUEUE_NOT_BOUND_WITH_RK = 6;
+ private static final ExchangeBoundHandler _instance = new ExchangeBoundHandler();
+
public static ExchangeBoundHandler getInstance()
{
return _instance;
}
- private ExchangeBoundHandler()
- {
- }
+ private ExchangeBoundHandler() {}
- public void methodReceived(AMQProtocolSession protocolSession,
- AMQMethodEvent<ExchangeBoundBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ExchangeBoundBody> evt) throws AMQException
{
- byte major = protocolSession.getMajor();
- byte minor = protocolSession.getMinor();
-
- ExchangeBoundBody body = evt.getMethod();
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ final ExchangeBoundBody body = evt.getMethod();
+ VirtualHost virtualHost = session.getVirtualHost();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
- String exchangeName = body.exchange;
- String queueName = body.queue;
- String routingKey = body.routingKey;
+ byte major = session.getProtocolMajorVersion();
+ byte minor = session.getProtocolMinorVersion();
+
+ AMQShortString exchangeName = body.exchange;
+ AMQShortString queueName = body.queue;
+ AMQShortString routingKey = body.routingKey;
if (exchangeName == null)
{
throw new AMQException("Exchange exchange must not be null");
}
- Exchange exchange = protocolSession.getExchangeRegistry().getExchange(exchangeName);
+ Exchange exchange = virtualHost.getExchangeRegistry().getExchange(exchangeName);
AMQMethodBody response;
+
if (exchange == null)
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
response = ExchangeBoundOkBody.createMethodBody
(major, minor, // AMQP version (major, minor)
EXCHANGE_NOT_FOUND, // replyCode
- "Exchange " + exchangeName + " not found"); // replyText
+ new AMQShortString("Exchange " + exchangeName + " not found")); // replyText
}
else if (routingKey == null)
{
@@ -108,14 +115,14 @@
}
else
{
- AMQQueue queue = protocolSession.getQueueRegistry().getQueue(queueName);
+ AMQQueue queue = queueRegistry.getQueue(queueName);
if (queue == null)
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
response = ExchangeBoundOkBody.createMethodBody
(major, minor, // AMQP version (major, minor)
QUEUE_NOT_FOUND, // replyCode
- "Queue " + queueName + " not found"); // replyText
+ new AMQShortString("Queue " + queueName + " not found")); // replyText
}
else
{
@@ -133,21 +140,22 @@
response = ExchangeBoundOkBody.createMethodBody
(major, minor, // AMQP version (major, minor)
QUEUE_NOT_BOUND, // replyCode
- "Queue " + queueName + " not bound to exchange " + exchangeName); // replyText
+ new AMQShortString("Queue " + queueName + " not bound to exchange " +
+ exchangeName)); // replyText
}
}
}
}
else if (queueName != null)
{
- AMQQueue queue = protocolSession.getQueueRegistry().getQueue(queueName);
+ AMQQueue queue = queueRegistry.getQueue(queueName);
if (queue == null)
{
// AMQP version change: Be aware of possible changes to parameter order as versions change.
response = ExchangeBoundOkBody.createMethodBody
(major, minor, // AMQP version (major, minor)
QUEUE_NOT_FOUND, // replyCode
- "Queue " + queueName + " not found"); // replyText
+ new AMQShortString("Queue " + queueName + " not found")); // replyText
}
else
{
@@ -165,8 +173,8 @@
response = ExchangeBoundOkBody.createMethodBody
(major, minor, // AMQP version (major, minor)
SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode
- "Queue " + queueName + " not bound with routing key " +
- body.routingKey + " to exchange " + exchangeName); // replyText
+ new AMQShortString("Queue " + queueName + " not bound with routing key " +
+ body.routingKey + " to exchange " + exchangeName)); // replyText
}
}
}
@@ -186,10 +194,10 @@
response = ExchangeBoundOkBody.createMethodBody
(major, minor, // AMQP version (major, minor)
NO_QUEUE_BOUND_WITH_RK, // replyCode
- "No queue bound with routing key " + body.routingKey +
- " to exchange " + exchangeName); // replyText
+ new AMQShortString("No queue bound with routing key " + body.routingKey +
+ " to exchange " + exchangeName)); // replyText
}
}
- protocolSession.writeResponse(evt, response);
+ session.writeResponse(evt, response);
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java Wed Feb 14 12:02:03 2007
@@ -20,9 +20,8 @@
*/
package org.apache.qpid.server.handler;
-import org.apache.log4j.Logger;
-import org.apache.qpid.AMQException;
import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUnknownExchangeType;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.ExchangeDeclareBody;
@@ -30,13 +29,14 @@
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
-import org.apache.qpid.server.registry.ApplicationRegistry;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import org.apache.log4j.Logger;
public class ExchangeDeclareHandler implements StateAwareMethodListener<ExchangeDeclareBody>
{
@@ -49,26 +49,26 @@
return _instance;
}
- private final ExchangeFactory exchangeFactory;
-
- private ExchangeDeclareHandler()
- {
- exchangeFactory = ApplicationRegistry.getInstance().getExchangeFactory();
- }
+ private ExchangeDeclareHandler() {}
- public void methodReceived(AMQProtocolSession protocolSession,
- AMQMethodEvent<ExchangeDeclareBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ExchangeDeclareBody> evt) throws AMQException
{
+ AMQProtocolSession session = stateManager.getProtocolSession();
final ExchangeDeclareBody body = evt.getMethod();
+ VirtualHost virtualHost = session.getVirtualHost();
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+ ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
+
if (_logger.isDebugEnabled())
{
_logger.debug("Request to declare exchange of type " + body.type + " with name " + body.exchange);
}
- ExchangeRegistry exchangeRegistry = protocolSession.getExchangeRegistry();
synchronized(exchangeRegistry)
{
Exchange exchange = exchangeRegistry.getExchange(body.exchange);
+
+
if (exchange == null)
{
if(body.passive && ((body.type == null) || body.type.length() ==0))
@@ -85,23 +85,24 @@
}
catch(AMQUnknownExchangeType e)
{
- throw body.getConnectionException(AMQConstant.COMMAND_INVALID.getCode(), "Unknown exchange type: " + body.type,e);
+ throw body.getConnectionException(AMQConstant.COMMAND_INVALID.getCode(), "Unknown exchange type: " + body.type, e);
}
}
}
else if (!exchange.getType().equals(body.type))
{
- throw new AMQConnectionException(AMQConstant.NOT_ALLOWED.getCode(), "Attempt to redeclare exchange: " + body.exchange + " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor());
+ throw new AMQConnectionException(AMQConstant.NOT_ALLOWED.getCode(), "Attempt to redeclare exchange: " + body.exchange +
+ " of type " + exchange.getType() + " to " + body.type +".",body.getClazz(), body.getMethod(),body.getMajor(),body.getMinor());
}
}
if(!body.nowait)
{
// Be aware of possible changes to parameter order as versions change.
AMQMethodBody response = ExchangeDeclareOkBody.createMethodBody(
- protocolSession.getMajor(), // AMQP major version
- protocolSession.getMinor()); // AMQP minor version
- protocolSession.writeResponse(evt, response);
+ session.getProtocolMajorVersion(), // AMQP major version
+ session.getProtocolMinorVersion()); // AMQP minor version
+ session.writeResponse(evt, response);
}
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java Wed Feb 14 12:02:03 2007
@@ -21,19 +21,22 @@
package org.apache.qpid.server.handler;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.ExchangeDeleteBody;
import org.apache.qpid.framing.ExchangeDeleteOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.exchange.ExchangeInUseException;
import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+//import org.apache.log4j.Logger;
public class ExchangeDeleteHandler implements StateAwareMethodListener<ExchangeDeleteBody>
{
+ //private static final Logger _logger = Logger.getLogger(ExchangeDeleteHandler.class);
+
private static final ExchangeDeleteHandler _instance = new ExchangeDeleteHandler();
public static ExchangeDeleteHandler getInstance()
@@ -41,21 +44,22 @@
return _instance;
}
- private ExchangeDeleteHandler()
- {
- }
+ private ExchangeDeleteHandler() {}
- public void methodReceived(AMQProtocolSession protocolSession,
- AMQMethodEvent<ExchangeDeleteBody> evt) throws AMQException
+ public void methodReceived(AMQStateManager stateManager, AMQMethodEvent<ExchangeDeleteBody> evt) throws AMQException
{
- ExchangeDeleteBody body = evt.getMethod();
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ final ExchangeDeleteBody body = evt.getMethod();
+ VirtualHost virtualHost = session.getVirtualHost();
+ ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry();
+
try
{
- protocolSession.getExchangeRegistry().unregisterExchange(body.exchange, body.ifUnused);
+ exchangeRegistry.unregisterExchange(body.exchange, body.ifUnused);
// Be aware of possible changes to parameter order as versions change.
- protocolSession.writeResponse(evt, ExchangeDeleteOkBody.createMethodBody(
- protocolSession.getMajor(), // AMQP major version
- protocolSession.getMinor())); // AMQP minor version
+ session.writeResponse(evt, ExchangeDeleteOkBody.createMethodBody(
+ session.getProtocolMajorVersion(), // AMQP major version
+ session.getProtocolMinorVersion())); // AMQP minor version
}
catch (ExchangeInUseException e)
{
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageAppendHandler.java Wed Feb 14 12:02:03 2007
@@ -22,15 +22,19 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.MessageAppendBody;
+import org.apache.qpid.framing.MessageOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+//import org.apache.log4j.Logger;
+
public class MessageAppendHandler implements StateAwareMethodListener<MessageAppendBody>
{
+ //private static final Logger _logger = Logger.getLogger(MessageAppendHandler.class);
+
private static MessageAppendHandler _instance = new MessageAppendHandler();
public static MessageAppendHandler getInstance()
@@ -40,12 +44,14 @@
private MessageAppendHandler() {}
-
- public void methodReceived (AMQProtocolSession protocolSession,
- AMQMethodEvent<MessageAppendBody> evt)
- throws AMQException
+ public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageAppendBody> evt) throws AMQException
{
- // TODO
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ AMQChannel channel = session.getChannel(evt.getChannelId());
+ channel.addMessageAppend(evt.getMethod());
+ session.writeResponse(evt, MessageOkBody.createMethodBody(
+ session.getProtocolMajorVersion(), // AMQP major version
+ session.getProtocolMinorVersion())); // AMQP minor version
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCancelHandler.java Wed Feb 14 12:02:03 2007
@@ -26,14 +26,16 @@
import org.apache.qpid.framing.MessageOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
import org.apache.qpid.server.AMQChannel;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+//import org.apache.log4j.Logger;
+
public class MessageCancelHandler implements StateAwareMethodListener<MessageCancelBody>
{
+ //private static final Logger _logger = Logger.getLogger(MessageCancelHandler.class);
+
private static MessageCancelHandler _instance = new MessageCancelHandler();
public static MessageCancelHandler getInstance()
@@ -42,21 +44,19 @@
}
private MessageCancelHandler() {}
-
-
- public void methodReceived (AMQProtocolSession protocolSession,
- AMQMethodEvent<MessageCancelBody> evt)
- throws AMQException
+
+ public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageCancelBody> evt) throws AMQException
{
- final AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
+ AMQProtocolSession session = stateManager.getProtocolSession();
final MessageCancelBody body = evt.getMethod();
- channel.unsubscribeConsumer(protocolSession, body.destination);
+ final AMQChannel channel = session.getChannel(evt.getChannelId());
+ channel.unsubscribeConsumer(session, body.destination);
// Be aware of possible changes to parameter order as versions change.
final AMQMethodBody methodBody = MessageOkBody.createMethodBody(
- protocolSession.getMajor(), // AMQP major version
- protocolSession.getMinor()); // AMQP minor version
- protocolSession.writeResponse(evt, methodBody);
+ session.getProtocolMajorVersion(), // AMQP major version
+ session.getProtocolMinorVersion()); // AMQP minor version
+ session.writeResponse(evt, methodBody);
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCheckpointHandler.java Wed Feb 14 12:02:03 2007
@@ -23,14 +23,16 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.MessageCheckpointBody;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+//import org.apache.log4j.Logger;
+
public class MessageCheckpointHandler implements StateAwareMethodListener<MessageCheckpointBody>
{
+ //private static final Logger _logger = Logger.getLogger(MessageCheckpointHandler.class);
+
private static MessageCheckpointHandler _instance = new MessageCheckpointHandler();
public static MessageCheckpointHandler getInstance()
@@ -40,10 +42,7 @@
private MessageCheckpointHandler() {}
-
- public void methodReceived (AMQProtocolSession protocolSession,
- AMQMethodEvent<MessageCheckpointBody> evt)
- throws AMQException
+ public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageCheckpointBody> evt) throws AMQException
{
// TODO
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageCloseHandler.java Wed Feb 14 12:02:03 2007
@@ -22,15 +22,19 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.MessageCloseBody;
+import org.apache.qpid.framing.MessageOkBody;
import org.apache.qpid.protocol.AMQMethodEvent;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.protocol.AMQProtocolSession;
-import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+//import org.apache.log4j.Logger;
+
public class MessageCloseHandler implements StateAwareMethodListener<MessageCloseBody>
{
+ //private static final Logger _logger = Logger.getLogger(MessageCloseHandler.class);
+
private static MessageCloseHandler _instance = new MessageCloseHandler();
public static MessageCloseHandler getInstance()
@@ -39,13 +43,15 @@
}
private MessageCloseHandler() {}
-
-
- public void methodReceived (AMQProtocolSession protocolSession,
- AMQMethodEvent<MessageCloseBody> evt)
- throws AMQException
+
+ public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageCloseBody> evt) throws AMQException
{
- // TODO
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ AMQChannel channel = session.getChannel(evt.getChannelId());
+ channel.addMessageClose(evt.getMethod());
+ session.writeResponse(evt, MessageOkBody.createMethodBody(
+ session.getProtocolMajorVersion(), // AMQP major version
+ session.getProtocolMinorVersion())); // AMQP minor version
}
}
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java?view=diff&rev=507672&r1=507671&r2=507672
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/handler/MessageConsumeHandler.java Wed Feb 14 12:02:03 2007
@@ -20,9 +20,9 @@
*/
package org.apache.qpid.server.handler;
-import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInvalidSelectorException;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.MessageConsumeBody;
import org.apache.qpid.framing.MessageOkBody;
import org.apache.qpid.protocol.AMQConstant;
@@ -32,7 +32,11 @@
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
+import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.state.StateAwareMethodListener;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+import org.apache.log4j.Logger;
public class MessageConsumeHandler implements StateAwareMethodListener<MessageConsumeBody>
{
@@ -47,13 +51,13 @@
private MessageConsumeHandler() {}
-
- public void methodReceived (AMQProtocolSession session,
- AMQMethodEvent<MessageConsumeBody> evt)
- throws AMQException
+ public void methodReceived (AMQStateManager stateManager, AMQMethodEvent<MessageConsumeBody> evt) throws AMQException
{
- MessageConsumeBody body = evt.getMethod();
+ AMQProtocolSession session = stateManager.getProtocolSession();
+ final MessageConsumeBody body = evt.getMethod();
final int channelId = evt.getChannelId();
+ VirtualHost virtualHost = session.getVirtualHost();
+ QueueRegistry queueRegistry = virtualHost.getQueueRegistry();
AMQChannel channel = session.getChannel(channelId);
if (channel == null)
@@ -63,32 +67,33 @@
}
else
{
- AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : session.getQueueRegistry().getQueue(body.queue);
+ AMQQueue queue = body.queue == null ? channel.getDefaultQueue() : queueRegistry.getQueue(body.queue);
if (queue == null)
{
_log.info("No queue for '" + body.queue + "'");
- if(body.queue!=null)
+ if(body.queue != null)
{
session.closeChannelRequest(evt.getChannelId(), AMQConstant.NOT_FOUND.getCode(),
- "No such queue, '" + body.queue + "'");
+ new AMQShortString("No such queue, '" + body.queue + "'"));
}
else
{
session.closeSessionRequest(AMQConstant.NOT_ALLOWED.getCode(),
- "No queue name provided, no default queue defined.", body.getClazz(), body.getMethod());
+ new AMQShortString("No queue name provided, no default queue defined."),
+ body.getClazz(), body.getMethod());
}
}
else
{
try
{
- /*AMQShort*/String destination = channel.subscribeToQueue
+ AMQShortString destination = channel.subscribeToQueue
(body.destination, queue, session, !body.noAck, /*XXX*/null, body.noLocal, body.exclusive);
// Be aware of possible changes to parameter order as versions change.
session.writeResponse(evt, MessageOkBody.createMethodBody(
- session.getMajor(), // AMQP major version
- session.getMinor())); // AMQP minor version
+ session.getProtocolMajorVersion(), // AMQP major version
+ session.getProtocolMinorVersion())); // AMQP minor version
//now allow queue to start async processing of any backlog of messages
queue.deliverAsync();
@@ -97,13 +102,14 @@
{
_log.info("Closing connection due to invalid selector: " + ise.getMessage());
session.closeChannelRequest(evt.getChannelId(), AMQConstant.INVALID_SELECTOR.getCode(),
- ise.getMessage());
+ new AMQShortString(ise.getMessage()));
}
catch (ConsumerTagNotUniqueException e)
{
_log.info("Closing connection due to duplicate (non-unique) consumer tag: " + e.getMessage());
session.closeSessionRequest(AMQConstant.NOT_ALLOWED.getCode(),
- "Non-unique consumer tag, '" + body.destination + "'", body.getClazz(), body.getMethod());
+ new AMQShortString("Non-unique consumer tag, '" + body.destination + "'"),
+ body.getClazz(), body.getMethod());
}
catch (AMQQueue.ExistingExclusiveSubscription e)
{