You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/01/16 16:16:55 UTC
svn commit: r496725 [3/11] - in /incubator/qpid/branches/perftesting/qpid:
gentools/ gentools/src/org/apache/qpid/gentools/ gentools/templ.cpp/
gentools/templ.java/ java/ java/broker/ java/broker/etc/
java/broker/src/main/java/org/apache/qpid/server/ j...
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersBinding.java Tue Jan 16 07:16:39 2007
@@ -21,12 +21,11 @@
package org.apache.qpid.server.exchange;
import org.apache.log4j.Logger;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.AMQTypedValue;
-import java.util.Collections;
-import java.util.Map;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Set;
+import java.util.*;
/**
* Defines binding and matching based on a set of headers.
@@ -35,11 +34,53 @@
{
private static final Logger _logger = Logger.getLogger(HeadersBinding.class);
- private final Map _mappings = new HashMap();
- private final Set<Object> required = new HashSet<Object>();
- private final Set<Map.Entry> matches = new HashSet<Map.Entry>();
+ private final FieldTable _mappings = new FieldTable();
+ private final Set<String> required = new HashSet<String>();
+ private final Map<String,Object> matches = new HashMap<String,Object>();
private boolean matchAny;
+ private final class MatchesOrProcessor implements FieldTable.FieldTableElementProcessor
+ {
+ private Boolean _result = Boolean.FALSE;
+
+ public boolean processElement(String propertyName, AMQTypedValue value)
+ {
+ if((value != null) && (value.getValue() != null) && value.getValue().equals(matches.get(propertyName)))
+ {
+ _result = Boolean.TRUE;
+ return false;
+ }
+ return true;
+ }
+
+ public Object getResult()
+ {
+ return _result;
+ }
+ }
+
+ private final class RequiredOrProcessor implements FieldTable.FieldTableElementProcessor
+ {
+ Boolean _result = Boolean.FALSE;
+
+ public boolean processElement(String propertyName, AMQTypedValue value)
+ {
+ if(required.contains(propertyName))
+ {
+ _result = Boolean.TRUE;
+ return false;
+ }
+ return true;
+ }
+
+ public Object getResult()
+ {
+ return _result;
+ }
+ }
+
+
+
/**
* Creates a binding for a set of mappings. Those mappings whose value is
* null or the empty string are assumed only to be required headers, with
@@ -47,33 +88,50 @@
* define a required match of value.
* @param mappings the defined mappings this binding should use
*/
- HeadersBinding(Map mappings)
+
+ HeadersBinding(FieldTable mappings)
{
- //noinspection unchecked
- this(mappings == null ? new HashSet<Map.Entry>() : mappings.entrySet());
- _mappings.putAll(mappings);
+ Enumeration propertyNames = mappings.getPropertyNames();
+ while(propertyNames.hasMoreElements())
+ {
+ String propName = (String) propertyNames.nextElement();
+ _mappings.put(propName, mappings.getObject(propName));
+ }
+ initMappings();
}
- private HeadersBinding(Set<Map.Entry> entries)
+ private void initMappings()
{
- for (Map.Entry e : entries)
+
+ _mappings.processOverElements(new FieldTable.FieldTableElementProcessor()
{
- if (isSpecial(e.getKey()))
- {
- processSpecial((String) e.getKey(), e.getValue());
- }
- else if (e.getValue() == null || e.getValue().equals(""))
+
+ public boolean processElement(String propertyName, AMQTypedValue value)
{
- required.add(e.getKey());
+ if (isSpecial(propertyName))
+ {
+ processSpecial(propertyName, value.getValue());
+ }
+ else if (value.getValue() == null || value.getValue().equals(""))
+ {
+ required.add(propertyName);
+ }
+ else
+ {
+ matches.put(propertyName,value.getValue());
+ }
+
+ return true;
}
- else
+
+ public Object getResult()
{
- matches.add(e);
+ return null;
}
- }
+ });
}
- protected Map getMappings()
+ protected FieldTable getMappings()
{
return _mappings;
}
@@ -84,7 +142,7 @@
* @return true if the headers define any required keys and match any required
* values
*/
- public boolean matches(Map headers)
+ public boolean matches(FieldTable headers)
{
if(headers == null)
{
@@ -96,18 +154,37 @@
}
}
- private boolean and(Map headers)
+ private boolean and(FieldTable headers)
{
- //need to match all the defined mapping rules:
- return headers.keySet().containsAll(required)
- && headers.entrySet().containsAll(matches);
+ if(headers.keys().containsAll(required))
+ {
+ for(Map.Entry<String, Object> e : matches.entrySet())
+ {
+ if(!e.getValue().equals(headers.getObject(e.getKey())))
+ {
+ return false;
+ }
+ }
+ return true;
+ }
+ else
+ {
+ return false;
+ }
}
- private boolean or(Map headers)
+
+ private boolean or(final FieldTable headers)
{
- //only need to match one mapping rule:
- return !Collections.disjoint(headers.keySet(), required)
- || !Collections.disjoint(headers.entrySet(), matches);
+ if(required.isEmpty() || !(Boolean) headers.processOverElements(new RequiredOrProcessor()))
+ {
+ return ((!matches.isEmpty()) && (Boolean) headers.processOverElements(new MatchesOrProcessor()))
+ || (required.isEmpty() && matches.isEmpty());
+ }
+ else
+ {
+ return true;
+ }
}
private void processSpecial(String key, Object value)
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Tue Jan 16 07:16:39 2007
@@ -22,10 +22,7 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FieldTableFactory;
+import org.apache.qpid.framing.*;
import org.apache.qpid.server.management.MBeanConstructor;
import org.apache.qpid.server.management.MBeanDescription;
import org.apache.qpid.server.queue.AMQMessage;
@@ -34,10 +31,7 @@
import javax.management.JMException;
import javax.management.openmbean.*;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.concurrent.CopyOnWriteArrayList;
/**
@@ -119,16 +113,24 @@
String queueName = registration.queue.getName();
HeadersBinding headers = registration.binding;
- Map<Object, Object> headerMappings = headers.getMappings();
- List<String> mappingList = new ArrayList<String>();
+ FieldTable headerMappings = headers.getMappings();
+ final List<String> mappingList = new ArrayList<String>();
- for (Map.Entry<Object, Object> en : headerMappings.entrySet())
+ headerMappings.processOverElements(new FieldTable.FieldTableElementProcessor()
{
- String key = en.getKey().toString();
- String value = en.getValue().toString();
- mappingList.add(key + "=" + value);
- }
+ public boolean processElement(String propertyName, AMQTypedValue value)
+ {
+ mappingList.add(propertyName + "=" + value.getValue());
+ return true;
+ }
+
+ public Object getResult()
+ {
+ return mappingList;
+ }
+ });
+
Object[] bindingItemValues = {count++, queueName, mappingList.toArray(new String[0])};
CompositeData bindingData = new CompositeDataSupport(_bindingDataType, _bindingItemNames, bindingItemValues);
@@ -155,7 +157,7 @@
}
String[] bindings = binding.split(",");
- FieldTable fieldTable = FieldTableFactory.newFieldTable();
+ FieldTable bindingMap = new FieldTable();
for (int i = 0; i < bindings.length; i++)
{
String[] keyAndValue = bindings[i].split("=");
@@ -163,10 +165,10 @@
{
throw new JMException("Format for headers binding should be \"<attribute1>=<value1>,<attribute2>=<value2>\" ");
}
- fieldTable.put(keyAndValue[0], keyAndValue[1]);
+ bindingMap.setString(keyAndValue[0], keyAndValue[1]);
}
- _bindings.add(new Registration(new HeadersBinding(fieldTable), queue));
+ _bindings.add(new Registration(new HeadersBinding(bindingMap), queue));
}
} // End of MBean class
@@ -185,7 +187,7 @@
public void route(AMQMessage payload) throws AMQException
{
- Map headers = getHeaders(payload.getContentHeaderBody());
+ FieldTable headers = getHeaders(payload.getContentHeaderBody());
if (_logger.isDebugEnabled())
{
_logger.debug("Exchange " + getName() + ": routing message with headers " + headers);
@@ -248,7 +250,7 @@
return !_bindings.isEmpty();
}
- protected Map getHeaders(ContentHeaderBody contentHeaderFrame)
+ protected FieldTable getHeaders(ContentHeaderBody contentHeaderFrame)
{
//what if the content type is not 'basic'? 'file' and 'stream' content classes also define headers,
//but these are not yet implemented.
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicCancelMethodHandler.java Tue Jan 16 07:16:39 2007
@@ -54,7 +54,12 @@
channel.unsubscribeConsumer(protocolSession, body.consumerTag);
if(!body.nowait)
{
- final AMQFrame responseFrame = BasicCancelOkBody.createAMQFrame(evt.getChannelId(), body.consumerTag);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQFrame responseFrame = BasicCancelOkBody.createAMQFrame(evt.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ body.consumerTag); // consumerTag
protocolSession.writeFrame(responseFrame);
}
}
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java Tue Jan 16 07:16:39 2007
@@ -81,7 +81,12 @@
body.arguments, body.noLocal);
if (!body.nowait)
{
- session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId, consumerTag));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(BasicConsumeOkBody.createAMQFrame(channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ consumerTag)); // consumerTag
}
//now allow queue to start async processing of any backlog of messages
@@ -90,16 +95,28 @@
catch (AMQInvalidSelectorException ise)
{
_log.info("Closing connection due to invalid selector");
- session.writeFrame(ChannelCloseBody.createAMQFrame(channelId, AMQConstant.INVALID_SELECTOR.getCode(),
- ise.getMessage(), BasicConsumeBody.CLASS_ID,
- BasicConsumeBody.METHOD_ID));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(ChannelCloseBody.createAMQFrame(channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ BasicConsumeBody.getClazz((byte)8, (byte)0), // classId
+ BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId
+ AMQConstant.INVALID_SELECTOR.getCode(), // replyCode
+ ise.getMessage())); // replyText
}
catch (ConsumerTagNotUniqueException e)
{
String msg = "Non-unique consumer tag, '" + body.consumerTag + "'";
- session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId, AMQConstant.NOT_ALLOWED.getCode(), msg,
- BasicConsumeBody.CLASS_ID,
- BasicConsumeBody.METHOD_ID));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(ConnectionCloseBody.createAMQFrame(channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ BasicConsumeBody.getClazz((byte)8, (byte)0), // classId
+ BasicConsumeBody.getMethod((byte)8, (byte)0), // methodId
+ AMQConstant.NOT_ALLOWED.getCode(), // replyCode
+ msg)); // replyText
}
}
}
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java Tue Jan 16 07:16:39 2007
@@ -64,7 +64,15 @@
protocolSession.closeChannel(evt.getChannelId());
// TODO: modify code gen to make getClazz and getMethod public methods rather than protected
// then we can remove the hardcoded 0,0
- AMQFrame cf = ChannelCloseBody.createAMQFrame(evt.getChannelId(), 500, "Unknown exchange name", 0, 0);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame cf = ChannelCloseBody.createAMQFrame(evt.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ ChannelCloseBody.getClazz((byte)8, (byte)0), // classId
+ ChannelCloseBody.getMethod((byte)8, (byte)0), // methodId
+ 500, // replyCode
+ "Unknown exchange name"); // replyText
protocolSession.writeFrame(cf);
}
else
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicQosHandler.java Tue Jan 16 07:16:39 2007
@@ -44,6 +44,9 @@
AMQProtocolSession session, AMQMethodEvent<BasicQosBody> evt) throws AMQException
{
session.getChannel(evt.getChannelId()).setPrefetchCount(evt.getMethod().prefetchCount);
- session.writeFrame(new AMQFrame(evt.getChannelId(), new BasicQosOkBody()));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(new AMQFrame(evt.getChannelId(), new BasicQosOkBody((byte)8, (byte)0)));
}
}
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelCloseHandler.java Tue Jan 16 07:16:39 2007
@@ -55,7 +55,10 @@
_logger.info("Received channel close for id " + evt.getChannelId() + " citing class " + body.classId +
" and method " + body.methodId);
protocolSession.closeChannel(evt.getChannelId());
- AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame response = ChannelCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
protocolSession.writeFrame(response);
}
}
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelFlowHandler.java Tue Jan 16 07:16:39 2007
@@ -58,6 +58,12 @@
channel.setSuspended(!body.active);
_logger.debug("Channel.Flow for channel " + evt.getChannelId() + ", active=" + body.active);
- AMQFrame response = ChannelFlowOkBody.createAMQFrame(evt.getChannelId(), body.active);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame response = ChannelFlowOkBody.createAMQFrame(evt.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ body.active); // active
protocolSession.writeFrame(response);
- }}
+ }
+}
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java Tue Jan 16 07:16:39 2007
@@ -55,7 +55,10 @@
final AMQChannel channel = new AMQChannel(evt.getChannelId(), registry.getMessageStore(),
exchangeRegistry);
protocolSession.addChannel(channel);
- AMQFrame response = ChannelOpenOkBody.createAMQFrame(evt.getChannelId());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame response = ChannelOpenOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
protocolSession.writeFrame(response);
}
}
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionCloseMethodHandler.java Tue Jan 16 07:16:39 2007
@@ -62,7 +62,10 @@
{
_logger.error("Error closing protocol session: " + e, e);
}
- final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQFrame response = ConnectionCloseOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
protocolSession.writeFrame(response);
}
}
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java Tue Jan 16 07:16:39 2007
@@ -64,7 +64,12 @@
contextKey = generateClientID();
}
protocolSession.setContextKey(contextKey);
- AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0, contextKey);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame response = ConnectionOpenOkBody.createAMQFrame((short)0,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ contextKey); // knownHosts
stateManager.changeState(AMQState.CONNECTION_OPEN);
protocolSession.writeFrame(response);
}
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionSecureOkMethodHandler.java Tue Jan 16 07:16:39 2007
@@ -75,25 +75,43 @@
// throw new AMQException(AMQConstant.NOT_ALLOWED.getCode(), AMQConstant.NOT_ALLOWED.getName());
_logger.info("Authentication failed");
stateManager.changeState(AMQState.CONNECTION_CLOSING);
- AMQFrame close = ConnectionCloseBody.createAMQFrame(0, AMQConstant.NOT_ALLOWED.getCode(),
- AMQConstant.NOT_ALLOWED.getName(),
- ConnectionCloseBody.CLASS_ID,
- ConnectionCloseBody.METHOD_ID);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame close = ConnectionCloseBody.createAMQFrame(0,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ ConnectionCloseBody.getClazz((byte)8, (byte)0), // classId
+ ConnectionCloseBody.getMethod((byte)8, (byte)0), // methodId
+ AMQConstant.NOT_ALLOWED.getCode(), // replyCode
+ AMQConstant.NOT_ALLOWED.getName()); // replyText
protocolSession.writeFrame(close);
disposeSaslServer(protocolSession);
break;
case SUCCESS:
_logger.info("Connected as: " + ss.getAuthorizationID());
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
- AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, Integer.MAX_VALUE,
- ConnectionStartOkMethodHandler.getConfiguredFrameSize(),
- HeartbeatConfig.getInstance().getDelay());
+ // TODO: Check the value of channelMax here: This should be the max
+ // value of a 2-byte unsigned integer (as channel is only 2 bytes on the wire),
+ // not Integer.MAX_VALUE (which is signed 4 bytes).
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame tune = ConnectionTuneBody.createAMQFrame(0,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ Integer.MAX_VALUE, // channelMax
+ ConnectionStartOkMethodHandler.getConfiguredFrameSize(), // frameMax
+ HeartbeatConfig.getInstance().getDelay()); // heartbeat
protocolSession.writeFrame(tune);
disposeSaslServer(protocolSession);
break;
case CONTINUE:
stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
- AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0, authResult.challenge);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ authResult.challenge); // challenge
protocolSession.writeFrame(challenge);
}
}
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionStartOkMethodHandler.java Tue Jan 16 07:16:39 2007
@@ -92,13 +92,24 @@
_logger.info("Connected as: " + ss.getAuthorizationID());
stateManager.changeState(AMQState.CONNECTION_NOT_TUNED);
- AMQFrame tune = ConnectionTuneBody.createAMQFrame(0, Integer.MAX_VALUE, getConfiguredFrameSize(),
- HeartbeatConfig.getInstance().getDelay());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame tune = ConnectionTuneBody.createAMQFrame(0,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ Integer.MAX_VALUE, // channelMax
+ getConfiguredFrameSize(), // frameMax
+ HeartbeatConfig.getInstance().getDelay()); // heartbeat
protocolSession.writeFrame(tune);
break;
case CONTINUE:
stateManager.changeState(AMQState.CONNECTION_NOT_AUTH);
- AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0, authResult.challenge);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame challenge = ConnectionSecureBody.createAMQFrame(0,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ authResult.challenge); // challenge
protocolSession.writeFrame(challenge);
}
}
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeBoundHandler.java Tue Jan 16 07:16:39 2007
@@ -64,6 +64,11 @@
ExchangeRegistry exchangeRegistry, AMQProtocolSession protocolSession,
AMQMethodEvent<ExchangeBoundBody> evt) throws AMQException
{
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ byte major = (byte)8;
+ byte minor = (byte)0;
+
ExchangeBoundBody body = evt.getMethod();
String exchangeName = body.exchange;
@@ -77,8 +82,11 @@
AMQFrame response;
if (exchange == null)
{
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), EXCHANGE_NOT_FOUND,
- "Exchange " + exchangeName + " not found");
+ // AMQP version change: Be aware of possible changes to parameter order as versions change.
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ major, minor, // AMQP version (major, minor)
+ EXCHANGE_NOT_FOUND, // replyCode
+ "Exchange " + exchangeName + " not found"); // replyText
}
else if (routingKey == null)
{
@@ -86,11 +94,19 @@
{
if (exchange.hasBindings())
{
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, null);
+ // AMQP version change: Be aware of possible changes to parameter order as versions change.
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ major, minor, // AMQP version (major, minor)
+ OK, // replyCode
+ null); // replyText
}
else
{
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), NO_BINDINGS, null);
+ // AMQP version change: Be aware of possible changes to parameter order as versions change.
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ major, minor, // AMQP version (major, minor)
+ NO_BINDINGS, // replyCode
+ null); // replyText
}
}
else
@@ -98,20 +114,29 @@
AMQQueue queue = queueRegistry.getQueue(queueName);
if (queue == null)
{
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_FOUND,
- "Queue " + queueName + " not found");
+ // AMQP version change: Be aware of possible changes to parameter order as versions change.
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ major, minor, // AMQP version (major, minor)
+ QUEUE_NOT_FOUND, // replyCode
+ "Queue " + queueName + " not found"); // replyText
}
else
{
if (exchange.isBound(queue))
{
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK, null);
+ // AMQP version change: Be aware of possible changes to parameter order as versions change.
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ major, minor, // AMQP version (major, minor)
+ OK, // replyCode
+ null); // replyText
}
else
{
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_BOUND,
- "Queue " + queueName + " not bound to exchange " +
- exchangeName);
+ // AMQP version change: Be aware of possible changes to parameter order as versions change.
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ major, minor, // AMQP version (major, minor)
+ QUEUE_NOT_BOUND, // replyCode
+ "Queue " + queueName + " not bound to exchange " + exchangeName); // replyText
}
}
}
@@ -121,24 +146,30 @@
AMQQueue queue = queueRegistry.getQueue(queueName);
if (queue == null)
{
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), QUEUE_NOT_FOUND,
- "Queue " + queueName + " not found");
+ // AMQP version change: Be aware of possible changes to parameter order as versions change.
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ major, minor, // AMQP version (major, minor)
+ QUEUE_NOT_FOUND, // replyCode
+ "Queue " + queueName + " not found"); // replyText
}
else
{
if (exchange.isBound(body.routingKey, queue))
{
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK,
- null);
+ // AMQP version change: Be aware of possible changes to parameter order as versions change.
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ major, minor, // AMQP version (major, minor)
+ OK, // replyCode
+ null); // replyText
}
else
{
+ // AMQP version change: Be aware of possible changes to parameter order as versions change.
response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- SPECIFIC_QUEUE_NOT_BOUND_WITH_RK,
- "Queue " + queueName +
- " not bound with routing key " +
- body.routingKey + " to exchange " +
- exchangeName);
+ major, minor, // AMQP version (major, minor)
+ SPECIFIC_QUEUE_NOT_BOUND_WITH_RK, // replyCode
+ "Queue " + queueName + " not bound with routing key " +
+ body.routingKey + " to exchange " + exchangeName); // replyText
}
}
}
@@ -146,16 +177,20 @@
{
if (exchange.isBound(body.routingKey))
{
- response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(), OK,
- null);
+ // AMQP version change: Be aware of possible changes to parameter order as versions change.
+ response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
+ major, minor, // AMQP version (major, minor)
+ OK, // replyCode
+ null); // replyText
}
else
{
+ // AMQP version change: Be aware of possible changes to parameter order as versions change.
response = ExchangeBoundOkBody.createAMQFrame(evt.getChannelId(),
- NO_QUEUE_BOUND_WITH_RK,
- "No queue bound with routing key " +
- body.routingKey + " to exchange " +
- exchangeName);
+ major, minor, // AMQP version (major, minor)
+ NO_QUEUE_BOUND_WITH_RK, // replyCode
+ "No queue bound with routing key " + body.routingKey +
+ " to exchange " + exchangeName); // replyText
}
}
protocolSession.writeFrame(response);
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java Tue Jan 16 07:16:39 2007
@@ -75,7 +75,10 @@
}
if(!body.nowait)
{
- AMQFrame response = ExchangeDeclareOkBody.createAMQFrame(evt.getChannelId());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame response = ExchangeDeclareOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
protocolSession.writeFrame(response);
}
}
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeleteHandler.java Tue Jan 16 07:16:39 2007
@@ -53,7 +53,10 @@
try
{
exchangeRegistry.unregisterExchange(body.exchange, body.ifUnused);
- AMQFrame response = ExchangeDeleteOkBody.createAMQFrame(evt.getChannelId());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame response = ExchangeDeleteOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
protocolSession.writeFrame(response);
}
catch (ExchangeInUseException e)
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java Tue Jan 16 07:16:39 2007
@@ -90,7 +90,10 @@
}
if (!body.nowait)
{
- final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQFrame response = QueueBindOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0);
protocolSession.writeFrame(response);
}
}
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java Tue Jan 16 07:16:39 2007
@@ -102,7 +102,14 @@
}
if (!body.nowait)
{
- AMQFrame response = QueueDeclareOkBody.createAMQFrame(evt.getChannelId(), body.queue, 0L, 0L);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame response = QueueDeclareOkBody.createAMQFrame(evt.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ 0L, // consumerCount
+ 0L, // messageCount
+ body.queue); // queue
_log.info("Queue " + body.queue + " declared successfully");
protocolSession.writeFrame(response);
}
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeleteHandler.java Tue Jan 16 07:16:39 2007
@@ -81,7 +81,12 @@
{
int purged = queue.delete(body.ifUnused, body.ifEmpty);
_store.removeQueue(queue.getName());
- session.writeFrame(QueueDeleteOkBody.createAMQFrame(evt.getChannelId(), purged));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ session.writeFrame(QueueDeleteOkBody.createAMQFrame(evt.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ purged)); // messageCount
}
}
}
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxCommitHandler.java Tue Jan 16 07:16:39 2007
@@ -52,7 +52,10 @@
try{
AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
channel.commit();
- protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId()));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ protocolSession.writeFrame(TxCommitOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
channel.processReturns(protocolSession);
}catch(AMQException e){
throw evt.getMethod().getChannelException(e.getErrorCode(), "Failed to commit: " + e.getMessage());
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxRollbackHandler.java Tue Jan 16 07:16:39 2007
@@ -51,7 +51,10 @@
try{
AMQChannel channel = protocolSession.getChannel(evt.getChannelId());
channel.rollback();
- protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId()));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ protocolSession.writeFrame(TxRollbackOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
//Now resend all the unacknowledged messages back to the original subscribers.
//(Must be done after the TxnRollback-ok response).
channel.resend(protocolSession);
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/TxSelectHandler.java Tue Jan 16 07:16:39 2007
@@ -48,6 +48,9 @@
AMQMethodEvent<TxSelectBody> evt) throws AMQException
{
protocolSession.getChannel(evt.getChannelId()).setTransactional(true);
- protocolSession.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId()));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ protocolSession.writeFrame(TxSelectOkBody.createAMQFrame(evt.getChannelId(), (byte)8, (byte)0));
}
}
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java Tue Jan 16 07:16:39 2007
@@ -165,8 +165,14 @@
_minor = pi.protocolMinor;
String mechanisms = ApplicationRegistry.getInstance().getAuthenticationManager().getMechanisms();
String locales = "en_US";
- AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0, pi.protocolMajor, pi.protocolMinor, null,
- mechanisms.getBytes(), locales.getBytes());
+ // Interfacing with generated code - be aware of possible changes to parameter order as versions change.
+ AMQFrame response = ConnectionStartBody.createAMQFrame((short) 0,
+ _major, _minor, // AMQP version (major, minor)
+ locales.getBytes(), // locales
+ mechanisms.getBytes(), // mechanisms
+ null, // serverProperties
+ (short)_major, // versionMajor
+ (short)_minor); // versionMinor
_minaProtocolSession.write(response);
}
catch (AMQException e)
@@ -316,8 +322,13 @@
return _channelMap.get(channelId);
}
- public void addChannel(AMQChannel channel)
+ public void addChannel(AMQChannel channel) throws AMQException
{
+ if (_closed)
+ {
+ throw new AMQException("Session is closed");
+ }
+
_channelMap.put(channel.getChannelId(), channel);
checkForNotification();
}
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java Tue Jan 16 07:16:39 2007
@@ -168,11 +168,20 @@
}
else if(throwable instanceof IOException)
{
- _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable, throwable);
+ _logger.error("IOException caught in" + session + ", session closed implictly: " + throwable, throwable);
}
else
{
- protocolSession.write(ConnectionCloseBody.createAMQFrame(0, 200, throwable.getMessage(), 0, 0));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ protocolSession.write(ConnectionCloseBody.createAMQFrame(0,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ 0, // classId
+ 0, // methodId
+ 200, // replyCode
+ throwable.getMessage() // replyText
+ ));
_logger.error("Exception caught in" + session + ", closing session explictly: " + throwable, throwable);
protocolSession.close();
}
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSession.java Tue Jan 16 07:16:39 2007
@@ -70,7 +70,7 @@
* @param channel the channel to associate with this session. It is an error to
* associate the same channel with more than one session but this is not validated.
*/
- void addChannel(AMQChannel channel);
+ void addChannel(AMQChannel channel) throws AMQException;
/**
* Close a specific channel. This will remove any resources used by the channel, including:
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQProtocolSessionMBean.java Tue Jan 16 07:16:39 2007
@@ -18,6 +18,9 @@
package org.apache.qpid.server.protocol;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.framing.ConnectionCloseBody;
+import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.MBeanConstructor;
@@ -183,33 +186,25 @@
}
/**
- * @see AMQMinaProtocolSession#closeChannel(int)
- */
- public void closeChannel(int id) throws JMException
- {
- try
- {
- AMQChannel channel = _session.getChannel(id);
- if (channel == null)
- {
- throw new JMException("The channel (channel Id = " + id + ") does not exist");
- }
-
- _session.closeChannel(id);
- }
- catch (AMQException ex)
- {
- throw new MBeanException(ex, ex.toString());
- }
- }
-
- /**
* closes the connection. The administrator can use this management operation to close connection to free up
* resources.
* @throws JMException
*/
public void closeConnection() throws JMException
{
+
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ final AMQFrame response = ConnectionCloseBody.createAMQFrame(0,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ 0, // classId
+ 0, // methodId
+ AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+ "Broker Management Console has closing the connection." // replyText
+ );
+ _session.writeFrame(response);
+
try
{
_session.closeSession();
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/ManagedConnection.java Tue Jan 16 07:16:39 2007
@@ -114,15 +114,6 @@
void rollbackTransactions(@MBeanOperationParameter(name="channel Id", description="channel Id")int channelId) throws JMException;
/**
- * Unsubscribes the consumers and unregisters the channel from managed objects.
- */
- @MBeanOperation(name="closeChannel",
- description="Closes the channel with given channel Id and connected consumers will be unsubscribed",
- impact= MBeanOperationInfo.ACTION)
- void closeChannel(@MBeanOperationParameter(name="channel Id", description="channel Id")int channelId)
- throws Exception;
-
- /**
* Closes all the related channels and unregisters this connection from managed objects.
*/
@MBeanOperation(name="closeConnection",
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Tue Jan 16 07:16:39 2007
@@ -157,10 +157,20 @@
public CompositeAMQDataBlock getDataBlock(int channel, String consumerTag, long deliveryTag)
{
+
AMQFrame[] allFrames = new AMQFrame[2 + _contentBodies.size()];
- allFrames[0] = BasicDeliverBody.createAMQFrame(channel, consumerTag, deliveryTag, _redelivered,
- getExchangeName(), getRoutingKey());
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ allFrames[0] = BasicDeliverBody.createAMQFrame(channel,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ consumerTag, // consumerTag
+ deliveryTag, // deliveryTag
+ getExchangeName(), // exchange
+ _redelivered, // redelivered
+ getRoutingKey() // routingKey
+ );
allFrames[1] = ContentHeaderBody.createAMQFrame(channel, _contentHeaderBody);
for (int i = 2; i < allFrames.length; i++)
{
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java Tue Jan 16 07:16:39 2007
@@ -213,7 +213,10 @@
{
return;
}
- _log.info("Async Delivery Message:" + message + " to :" + sub);
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Async Delivery Message:" + message + " to :" + sub);
+ }
sub.send(message, _queue);
@@ -278,7 +281,10 @@
public void deliver(String name, AMQMessage msg) throws FailedDequeueException
{
- _log.info(id() + "deliver :" + System.identityHashCode(msg));
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + "deliver :" + System.identityHashCode(msg));
+ }
//Check if we have someone to deliver the message to.
_lock.lock();
@@ -288,7 +294,10 @@
if (s == null) //no-one can take the message right now.
{
- _log.info(id() + "Testing Message(" + System.identityHashCode(msg) + ") for Queued Delivery");
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + "Testing Message(" + System.identityHashCode(msg) + ") for Queued Delivery");
+ }
if (!msg.isImmediate())
{
addMessageToQueue(msg);
@@ -297,21 +306,33 @@
_lock.unlock();
//Pre Deliver to all subscriptions
- _log.info(id() + "We have " + _subscriptions.getSubscriptions().size() + " subscribers to give the message to.");
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + "We have " + _subscriptions.getSubscriptions().size() +
+ " subscribers to give the message to.");
+ }
for (Subscription sub : _subscriptions.getSubscriptions())
{
// stop if the message gets delivered whilst PreDelivering if we have a shared queue.
if (_queue.isShared() && msg.getDeliveredToConsumer())
{
- _log.info(id() + "Stopping PreDelivery as message(" + System.identityHashCode(msg) + ") is already delivered.");
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + "Stopping PreDelivery as message(" + System.identityHashCode(msg) +
+ ") is already delivered.");
+ }
continue;
}
// Only give the message to those that want them.
if (sub.hasInterest(msg))
{
- _log.info(id() + "Queuing message(" + System.identityHashCode(msg) + ") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")");
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + "Queuing message(" + System.identityHashCode(msg) +
+ ") for PreDelivery for subscriber(" + System.identityHashCode(sub) + ")");
+ }
sub.enqueueForPreDelivery(msg);
}
}
@@ -322,7 +343,11 @@
//release lock now
_lock.unlock();
- _log.info(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" + System.identityHashCode(s) + ") :" + s);
+ if (_log.isDebugEnabled())
+ {
+ _log.debug(id() + "Delivering Message:" + System.identityHashCode(msg) + " to(" +
+ System.identityHashCode(s) + ") :" + s);
+ }
//Deliver the message
s.send(msg, _queue);
}
@@ -330,7 +355,7 @@
finally
{
//ensure lock is released
- if (_lock.isLocked())
+ if (_lock.isHeldByCurrentThread())
{
_lock.unlock();
}
@@ -371,9 +396,12 @@
public void processAsync(Executor executor)
{
- _log.info("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" +
- " Active:" + _subscriptions.hasActiveSubscribers() +
- " Processing:" + _processing.get());
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Processing Async. Queued:" + hasQueuedMessages() + "(" + getQueueMessageCount() + ")" +
+ " Active:" + _subscriptions.hasActiveSubscribers() +
+ " Processing:" + _processing.get());
+ }
if (hasQueuedMessages() && _subscriptions.hasActiveSubscribers())
{
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Tue Jan 16 07:16:39 2007
@@ -379,7 +379,13 @@
if (!_closed)
{
_logger.info("Closing autoclose subscription:" + this);
- protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(), consumerTag));
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ protocolSession.writeFrame(BasicCancelOkBody.createAMQFrame(channel.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ consumerTag // consumerTag
+ ));
_closed = true;
}
}
@@ -392,9 +398,17 @@
private ByteBuffer createEncodedDeliverFrame(long deliveryTag, String routingKey, String exchange)
{
- AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(), consumerTag,
- deliveryTag, false, exchange,
- routingKey);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ AMQFrame deliverFrame = BasicDeliverBody.createAMQFrame(channel.getChannelId(),
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ consumerTag, // consumerTag
+ deliveryTag, // deliveryTag
+ exchange, // exchange
+ false, // redelivered
+ routingKey // routingKey
+ );
ByteBuffer buf = ByteBuffer.allocate((int) deliverFrame.getSize()); // XXX: Could cast be a problem?
deliverFrame.writePayload(buf);
buf.flip();
Modified: incubator/qpid/branches/perftesting/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/HeadersBindingTest.java Tue Jan 16 07:16:39 2007
@@ -24,17 +24,18 @@
import java.util.HashMap;
import junit.framework.TestCase;
+import org.apache.qpid.framing.FieldTable;
/**
*/
public class HeadersBindingTest extends TestCase
{
- private Map<String, String> bindHeaders = new HashMap<String, String>();
- private Map<String, String> matchHeaders = new HashMap<String, String>();
+ private FieldTable bindHeaders = new FieldTable();
+ private FieldTable matchHeaders = new FieldTable();
public void testDefault_1()
{
- bindHeaders.put("A", "Value of A");
+ bindHeaders.setString("A", "Value of A");
matchHeaders.put("A", "Value of A");
Modified: incubator/qpid/branches/perftesting/qpid/java/client/pom.xml
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/pom.xml?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/pom.xml (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/pom.xml Tue Jan 16 07:16:39 2007
@@ -35,7 +35,6 @@
<properties>
<topDirectoryLocation>..</topDirectoryLocation>
- <amqj.logging.level>warn</amqj.logging.level>
</properties>
<dependencies>
@@ -96,6 +95,11 @@
<build>
<plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-antrun-plugin</artifactId>
+ </plugin>
+
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
Modified: incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=496725&r1=496724&r2=496725
==============================================================================
--- incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/perftesting/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Tue Jan 16 07:16:39 2007
@@ -465,12 +465,25 @@
private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
throws AMQException
{
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
_protocolHandler.syncWrite(
- ChannelOpenBody.createAMQFrame(channelId, null), ChannelOpenOkBody.class);
+ ChannelOpenBody.createAMQFrame(channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ null), // outOfBand
+ ChannelOpenOkBody.class);
//todo send low water mark when protocol allows.
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
_protocolHandler.syncWrite(
- BasicQosBody.createAMQFrame(channelId, 0, prefetchHigh, false),
+ BasicQosBody.createAMQFrame(channelId,
+ (byte)8, (byte)0, // AMQP version (major, minor)
+ false, // global
+ prefetchHigh, // prefetchCount
+ 0), // prefetchSize
BasicQosOkBody.class);
if (transacted)
@@ -479,7 +492,10 @@
{
_logger.debug("Issuing TxSelect for " + channelId);
}
- _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId), TxSelectOkBody.class);
+ // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
+ // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
+ // Be aware of possible changes to parameter order as versions change.
+ _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, (byte)8, (byte)0), TxSelectOkBody.class);
}
}