You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ai...@apache.org on 2008/02/28 13:18:14 UTC
svn commit: r631938 [3/5] - in /incubator/qpid/branches/thegreatmerge/qpid:
./ java/ java/broker/ java/broker/src/main/java/org/apache/qpid/server/
java/broker/src/main/java/org/apache/qpid/server/exchange/
java/broker/src/main/java/org/apache/qpid/ser...
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Thu Feb 28 04:16:41 2008
@@ -26,12 +26,7 @@
import org.apache.qpid.client.message.MessageFactoryRegistry;
import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicCancelBody;
-import org.apache.qpid.framing.BasicCancelOkBody;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.*;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.Session;
import org.slf4j.Logger;
@@ -54,13 +49,13 @@
private static final Logger _logger = LoggerFactory.getLogger(BasicMessageConsumer.class);
/** The connection being used by this consumer */
- protected AMQConnection _connection;
+ protected final AMQConnection _connection;
- private String _messageSelector;
+ private final String _messageSelector;
- private boolean _noLocal;
+ private final boolean _noLocal;
- private AMQDestination _destination;
+ private final AMQDestination _destination;
/**
* When true indicates that a blocking receive call is in progress
@@ -75,7 +70,7 @@
protected AMQShortString _consumerTag;
/** We need to know the channel id when constructing frames */
- protected int _channelId;
+ protected final int _channelId;
/**
* Used in the blocking receive methods to receive a message from the Session thread. <p/> Or to notify of errors
@@ -83,40 +78,40 @@
*/
protected final ArrayBlockingQueue _synchronousQueue;
- protected MessageFactoryRegistry _messageFactory;
+ protected final MessageFactoryRegistry _messageFactory;
protected final AMQSession _session;
- protected AMQProtocolHandler _protocolHandler;
+ protected final AMQProtocolHandler _protocolHandler;
/**
* We need to store the "raw" field table so that we can resubscribe in the event of failover being required
*/
- private FieldTable _rawSelectorFieldTable;
+ private final FieldTable _rawSelectorFieldTable;
/**
* We store the high water prefetch field in order to be able to reuse it when resubscribing in the event of
* failover
*/
- private int _prefetchHigh;
+ private final int _prefetchHigh;
/**
* We store the low water prefetch field in order to be able to reuse it when resubscribing in the event of
* failover
*/
- private int _prefetchLow;
+ private final int _prefetchLow;
/**
* We store the exclusive field in order to be able to reuse it when resubscribing in the event of failover
*/
- private boolean _exclusive;
+ private final boolean _exclusive;
/**
* The acknowledge mode in force for this consumer. Note that the AMQP protocol allows different ack modes per
* consumer whereas JMS defines this at the session level, hence why we associate it with the consumer in our
* implementation.
*/
- private int _acknowledgeMode;
+ private final int _acknowledgeMode;
/**
* Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode
@@ -153,10 +148,10 @@
* autoClose denotes that the consumer will automatically cancel itself when there are no more messages to receive
* on the queue. This is used for queue browsing.
*/
- private boolean _autoClose;
+ private final boolean _autoClose;
private boolean _closeWhenNoMessages;
- private boolean _noConsume;
+ private final boolean _noConsume;
private List<StackTraceElement> _closedStack = null;
protected BasicMessageConsumer(int channelId, AMQConnection connection, AMQDestination destination,
@@ -177,7 +172,7 @@
_prefetchHigh = prefetchHigh;
_prefetchLow = prefetchLow;
_exclusive = exclusive;
- _acknowledgeMode = acknowledgeMode;
+
_synchronousQueue = new ArrayBlockingQueue(prefetchHigh, true);
_autoClose = autoClose;
_noConsume = noConsume;
@@ -187,6 +182,10 @@
{
_acknowledgeMode = Session.NO_ACKNOWLEDGE;
}
+ else
+ {
+ _acknowledgeMode = acknowledgeMode;
+ }
}
public AMQDestination getDestination()
@@ -665,9 +664,8 @@
* message listener or a synchronous receive() caller.
*
* @param messageFrame the raw unprocessed mesage
- * @param channelId channel on which this message was sent
*/
- void notifyMessage(UnprocessedMessage messageFrame, int channelId)
+ void notifyMessage(UnprocessedMessage messageFrame)
{
final boolean debug = _logger.isDebugEnabled();
@@ -678,12 +676,7 @@
try
{
- AbstractJMSMessage jmsMessage =
- _messageFactory.createMessage(messageFrame.getDeliveryTag(), messageFrame.isRedelivered(),
- messageFrame.getExchange(), messageFrame.getRoutingKey(),
- (ContentHeaderBody) messageFrame.getContentHeader(),
- messageFrame.getBodies());
-
+ AbstractJMSMessage jmsMessage = createJMSMessageFromUnprocessedMessage(messageFrame);
if (debug)
{
_logger.debug("Message is of type: " + jmsMessage.getClass().getName());
@@ -694,11 +687,9 @@
// if (!_closed.get())
{
- jmsMessage.setConsumer(this);
-
preDeliver(jmsMessage);
- notifyMessage(jmsMessage, channelId);
+ notifyMessage(jmsMessage);
}
// else
// {
@@ -727,9 +718,8 @@
/**
* @param jmsMessage this message has already been processed so can't redo preDeliver
- * @param channelId
*/
- public void notifyMessage(AbstractJMSMessage jmsMessage, int channelId)
+ public void notifyMessage(AbstractJMSMessage jmsMessage)
{
try
{
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer_0_10.java Thu Feb 28 04:16:41 2008
@@ -135,7 +135,7 @@
}
if (messageOk)
{
- super.notifyMessage(jmsMessage, channelId);
+ super.notifyMessage(jmsMessage);
}
}
@@ -150,7 +150,7 @@
{
int channelId = getSession().getChannelId();
long deliveryId = message.getMessageTransferId();
- String consumerTag = getConsumerTag().toString();
+ AMQShortString consumerTag = getConsumerTag();
AMQShortString exchange;
AMQShortString routingKey;
boolean redelivered = false;
@@ -264,7 +264,7 @@
}
((UnprocessedMessage_0_10) messageFrame).setReplyToURL(replyToUrl);
}
- super.notifyMessage(messageFrame, channelId);
+ super.notifyMessage(messageFrame);
}
public AbstractJMSMessage createJMSMessageFromUnprocessedMessage(
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Thu Feb 28 04:16:41 2008
@@ -553,6 +553,18 @@
frames[0] = publishFrame;
frames[1] = contentHeaderFrame;
CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
+
+ try
+ {
+ _session.checkFlowControl();
+ }
+ catch (InterruptedException e)
+ {
+ JMSException jmsEx = new JMSException("Interrupted while waiting for flow control to be removed");
+ jmsEx.setLinkedException(e);
+ throw jmsEx;
+ }
+
_protocolHandler.writeFrame(compositeFrame, wait);
if (message != origMessage)
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicDeliverMethodHandler.java Thu Feb 28 04:16:41 2008
@@ -21,12 +21,12 @@
package org.apache.qpid.client.handler;
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.message.UnprocessedMessage_0_8;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.BasicDeliverBody;
-import org.apache.qpid.protocol.AMQMethodEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,7 +45,13 @@
throws AMQException
{
final AMQProtocolSession session = stateManager.getProtocolSession();
- final UnprocessedMessage_0_8 msg = new UnprocessedMessage_0_8(channelId, body);
+ final UnprocessedMessage_0_8 msg = new UnprocessedMessage_0_8(
+ channelId,
+ body.getDeliveryTag(),
+ body.getConsumerTag(),
+ body.getExchange(),
+ body.getRoutingKey(),
+ body.getRedelivered());
_logger.debug("New JmsDeliver method received");
session.unprocessedMessageReceived(msg);
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicReturnMethodHandler.java Thu Feb 28 04:16:41 2008
@@ -21,11 +21,15 @@
package org.apache.qpid.client.handler;
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.message.ReturnMessage;
+import org.apache.qpid.client.message.UnprocessedMessage;
import org.apache.qpid.client.message.UnprocessedMessage_0_8;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.StateAwareMethodListener;
import org.apache.qpid.framing.BasicReturnBody;
+import org.apache.qpid.protocol.AMQMethodEvent;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,16 +44,20 @@
return _instance;
}
+
public void methodReceived(AMQStateManager stateManager, BasicReturnBody body, int channelId)
- throws AMQException
+ throws AMQException
{
_logger.debug("New JmsBounce method received");
final AMQProtocolSession session = stateManager.getProtocolSession();
- /** FIXME: TGM AS SRSLY 4RL */
- final UnprocessedMessage_0_8 msg = new UnprocessedMessage_0_8(channelId, body);
+ final ReturnMessage msg = new ReturnMessage(channelId,
+ body.getExchange(),
+ body.getRoutingKey(),
+ body.getReplyText(),
+ body.getReplyCode()
+ );
session.unprocessedMessageReceived(msg);
}
-
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Thu Feb 28 04:16:41 2008
@@ -43,6 +43,7 @@
import java.util.Map;
import java.util.UUID;
import java.io.IOException;
+import java.net.URISyntaxException;
public abstract class AbstractJMSMessage extends AMQMessage implements org.apache.qpid.jms.Message
{
@@ -56,8 +57,8 @@
protected boolean _changedData = true;
private Destination _destination;
private JMSHeaderAdapter _headerAdapter;
- private BasicMessageConsumer _consumer;
- private boolean _strictAMQP;
+ private static final boolean STRICT_AMQP_COMPLIANCE =
+ Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
/**
* This is 0_10 specific
@@ -101,6 +102,7 @@
return _010message;
}
+
protected AbstractJMSMessage(ByteBuffer data)
{
super(new BasicContentHeaderProperties());
@@ -115,8 +117,6 @@
_changedData = (data == null);
_headerAdapter = new JMSHeaderAdapter(((BasicContentHeaderProperties) _contentHeaderProperties).getHeaders());
- _strictAMQP =
- Boolean.parseBoolean(System.getProperties().getProperty(AMQSession.STRICT_AMQP, AMQSession.STRICT_AMQP_DEFAULT));
}
protected AbstractJMSMessage(long deliveryTag, BasicContentHeaderProperties contentHeader, AMQShortString exchange,
@@ -164,7 +164,10 @@
{
if (getContentHeaderProperties().getMessageIdAsString() == null)
{
- getContentHeaderProperties().setMessageId("ID:" + UUID.randomUUID());
+ StringBuilder b = new StringBuilder(39);
+ b.append("ID");
+ b.append(UUID.randomUUID());
+ getContentHeaderProperties().setMessageId(b.toString());
}
return getContentHeaderProperties().getMessageIdAsString();
@@ -222,7 +225,7 @@
BindingURL binding = new AMQBindingURL(replyToEncoding);
dest = AMQDestination.createDestination(binding);
}
- catch (URLSyntaxException e)
+ catch (URISyntaxException e)
{
throw new JMSAMQException("Illegal value in JMS_ReplyTo property: " + replyToEncoding, e);
}
@@ -344,7 +347,7 @@
public boolean getBooleanProperty(AMQShortString propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -354,7 +357,7 @@
public boolean getBooleanProperty(String propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -364,7 +367,7 @@
public byte getByteProperty(String propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -374,7 +377,7 @@
public byte[] getBytesProperty(AMQShortString propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -384,7 +387,7 @@
public short getShortProperty(String propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -394,7 +397,7 @@
public int getIntProperty(String propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -404,7 +407,7 @@
public long getLongProperty(String propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -414,7 +417,7 @@
public float getFloatProperty(String propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -424,7 +427,7 @@
public double getDoubleProperty(String propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -434,12 +437,25 @@
public String getStringProperty(String propertyName) throws JMSException
{
- if (_strictAMQP)
+ if (propertyName.startsWith("JMSX"))
{
- throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ //NOTE: if the JMSX Property is a non AMQP property then we must check _strictAMQP and throw as below.
+ if (propertyName.equals(CustomJMSXProperty.JMSXUserID.toString()))
+ {
+ return ((BasicContentHeaderProperties) _contentHeaderProperties).getUserIdAsString();
+ }
+
+ return null;
}
+ else
+ {
+ if (STRICT_AMQP_COMPLIANCE)
+ {
+ throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
+ }
- return getJmsHeaders().getString(propertyName);
+ return getJmsHeaders().getString(propertyName);
+ }
}
public Object getObjectProperty(String propertyName) throws JMSException
@@ -454,7 +470,7 @@
public void setBooleanProperty(AMQShortString propertyName, boolean b) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -465,7 +481,7 @@
public void setBooleanProperty(String propertyName, boolean b) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -476,7 +492,7 @@
public void setByteProperty(String propertyName, byte b) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -487,7 +503,7 @@
public void setBytesProperty(AMQShortString propertyName, byte[] bytes) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -498,7 +514,7 @@
public void setShortProperty(String propertyName, short i) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -516,7 +532,7 @@
public void setLongProperty(String propertyName, long l) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -527,7 +543,7 @@
public void setFloatProperty(String propertyName, float f) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -538,7 +554,7 @@
public void setDoubleProperty(String propertyName, double v) throws JMSException
{
- if (_strictAMQP)
+ if (STRICT_AMQP_COMPLIANCE)
{
throw new UnsupportedOperationException("JMS Proprerties not supported in AMQP");
}
@@ -732,11 +748,6 @@
{
return 0;
}
- }
-
- public void setConsumer(BasicMessageConsumer basicMessageConsumer)
- {
- _consumer = basicMessageConsumer;
}
public void receivedFromServer()
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java Thu Feb 28 04:16:41 2008
@@ -56,7 +56,11 @@
JMSMapMessage(ByteBuffer data) throws JMSException
{
super(data); // this instantiates a content header
- populateMapFromData();
+ if(data != null)
+ {
+ populateMapFromData();
+ }
+
}
JMSMapMessage(long messageNbr, BasicContentHeaderProperties contentHeader, AMQShortString exchange, AMQShortString routingKey,
@@ -77,7 +81,7 @@
public String toBodyString() throws JMSException
{
- return _map.toString();
+ return _map == null ? "" : _map.toString();
}
public AMQShortString getMimeTypeAsShortString()
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/ReturnMessage.java Thu Feb 28 04:16:41 2008
@@ -9,7 +9,7 @@
public ReturnMessage(int channelId,AMQShortString exchange,AMQShortString routingKey,AMQShortString replyText,int replyCode)
{
- super(channelId,-1,"",exchange,routingKey,false);
+ super(channelId,-1,null,exchange,routingKey,false);
_replyText = replyText;
_replyCode = replyCode;
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java Thu Feb 28 04:16:41 2008
@@ -36,12 +36,12 @@
{
private final int _channelId;
private final long _deliveryId;
- private final String _consumerTag;
+ private final AMQShortString _consumerTag;
protected AMQShortString _exchange;
protected AMQShortString _routingKey;
protected boolean _redelivered;
- public UnprocessedMessage(int channelId,long deliveryId,String consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered)
+ public UnprocessedMessage(int channelId,long deliveryId,AMQShortString consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered)
{
_channelId = channelId;
_deliveryId = deliveryId;
@@ -65,7 +65,7 @@
return _deliveryId;
}
- public String getConsumerTag()
+ public AMQShortString getConsumerTag()
{
return _consumerTag;
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_10.java Thu Feb 28 04:16:41 2008
@@ -43,7 +43,7 @@
/** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
private List<ByteBuffer> _bodies = new ArrayList<ByteBuffer>();
- public UnprocessedMessage_0_10(int channelId,long deliveryId,String consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered)
+ public UnprocessedMessage_0_10(int channelId,long deliveryId,AMQShortString consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered)
{
super(channelId,deliveryId,consumerTag,exchange,routingKey,redelivered);
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage_0_8.java Thu Feb 28 04:16:41 2008
@@ -47,7 +47,7 @@
/** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
private List<ContentBody> _bodies;
- public UnprocessedMessage_0_8(int channelId,long deliveryId,String consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered)
+ public UnprocessedMessage_0_8(int channelId,long deliveryId,AMQShortString consumerTag,AMQShortString exchange,AMQShortString routingKey,boolean redelivered)
{
super(channelId,deliveryId,consumerTag,exchange,routingKey,redelivered);
}
@@ -60,7 +60,7 @@
public UnprocessedMessage_0_8(int channelId, BasicDeliverBody body)
{
- super(channelId, body.getDeliveryTag(), body.getConsumerTag().toString(), body.getExchange(), body.getRoutingKey(), false);
+ super(channelId, body.getDeliveryTag(), body.getConsumerTag(), body.getExchange(), body.getRoutingKey(), false);
}
public void receiveBody(ContentBody body)
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Thu Feb 28 04:16:41 2008
@@ -361,6 +361,7 @@
// this will attemp failover
sessionClosed(session);
+ _connection.exceptionReceived(cause);
}
// FIXME Need to correctly handle other exceptions. Things like ...
@@ -392,7 +393,7 @@
*/
public void propagateExceptionToWaiters(Exception e)
{
- getStateManager().error(e);
+
if (!_frameListeners.isEmpty())
{
final Iterator it = _frameListeners.iterator();
@@ -424,78 +425,7 @@
HeartbeatDiagnostics.received(bodyFrame instanceof HeartbeatBody);
- switch (bodyFrame.getFrameType())
- {
- case AMQMethodBody.TYPE:
-
- if (debug)
- {
- _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame);
- }
-
- final AMQMethodEvent<AMQMethodBody> evt =
- new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
-
- try
- {
-
- boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
- if (!_frameListeners.isEmpty())
- {
- Iterator it = _frameListeners.iterator();
- while (it.hasNext())
- {
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
- }
- }
-
- if (!wasAnyoneInterested)
- {
- throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener. Listeners:"
- + _frameListeners, null);
- }
- }
- catch (AMQException e)
- {
- getStateManager().error(e);
- if (!_frameListeners.isEmpty())
- {
- Iterator it = _frameListeners.iterator();
- while (it.hasNext())
- {
- final AMQMethodListener listener = (AMQMethodListener) it.next();
- listener.error(e);
- }
- }
-
- exceptionCaught(session, e);
- }
-
- break;
-
- case ContentHeaderBody.TYPE:
-
- _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame);
- break;
-
- case ContentBody.TYPE:
-
- _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame);
- break;
-
- case HeartbeatBody.TYPE:
-
- if (debug)
- {
- _logger.debug("Received heartbeat");
- }
-
- break;
-
- default:
-
- }
+ bodyFrame.handle(frame.getChannel(),_protocolSession);
_connection.bytesReceived(_protocolSession.getIoSession().getReadBytes());
}
@@ -511,6 +441,55 @@
// get round a bug in old versions of qpid whereby the connection is not closed
_stateManager.changeState(AMQState.CONNECTION_CLOSED);
}
+ }
+
+ public void methodBodyReceived(final int channelId, final AMQBody bodyFrame, IoSession session)//, final IoSession session)
+ throws AMQException
+ {
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + bodyFrame);
+ }
+
+ final AMQMethodEvent<AMQMethodBody> evt =
+ new AMQMethodEvent<AMQMethodBody>(channelId, (AMQMethodBody) bodyFrame);
+
+ try
+ {
+
+ boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
+ if (!_frameListeners.isEmpty())
+ {
+ Iterator it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final AMQMethodListener listener = (AMQMethodListener) it.next();
+ wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+ }
+ }
+
+ if (!wasAnyoneInterested)
+ {
+ throw new AMQException(null, "AMQMethodEvent " + evt + " was not processed by any listener. Listeners:"
+ + _frameListeners, null);
+ }
+ }
+ catch (AMQException e)
+ {
+ if (!_frameListeners.isEmpty())
+ {
+ Iterator it = _frameListeners.iterator();
+ while (it.hasNext())
+ {
+ final AMQMethodListener listener = (AMQMethodListener) it.next();
+ listener.error(e);
+ }
+ }
+
+ exceptionCaught(session, e);
+ }
+
}
private static int _messagesOut;
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java Thu Feb 28 04:16:41 2008
@@ -86,7 +86,8 @@
* Maps from a channel id to an unprocessed message. This is used to tie together the JmsDeliverBody (which arrives
* first) with the subsequent content header and content bodies.
*/
- protected ConcurrentMap<Integer,UnprocessedMessage_0_8> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer,UnprocessedMessage_0_8>();
+ private final ConcurrentMap<Integer,UnprocessedMessage> _channelId2UnprocessedMsgMap = new ConcurrentHashMap<Integer,UnprocessedMessage>();
+ private final UnprocessedMessage[] _channelId2UnprocessedMsgArray = new UnprocessedMessage[16];
/** Counter to ensure unique queue names */
protected int _queueId = 1;
@@ -104,7 +105,8 @@
private MethodDispatcher _methodDispatcher;
- private final AMQConnection _connection;
+ private final AMQConnection _connection;
+ private static final int FAST_CHANNEL_ACCESS_MASK = 0xFFFFFFF0;
public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession protocolSession, AMQConnection connection)
{
@@ -233,14 +235,25 @@
*
* @throws AMQException if this was not expected
*/
- public void unprocessedMessageReceived(UnprocessedMessage_0_8 message) throws AMQException
+ public void unprocessedMessageReceived(UnprocessedMessage message) throws AMQException
{
- _channelId2UnprocessedMsgMap.put(message.getChannelId(), message);
+ final int channelId = message.getChannelId();
+ if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ {
+ _channelId2UnprocessedMsgArray[channelId] = message;
+ }
+ else
+ {
+ _channelId2UnprocessedMsgMap.put(channelId, message);
+ }
}
- public void messageContentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException
+ public void contentHeaderReceived(int channelId, ContentHeaderBody contentHeader) throws AMQException
{
- UnprocessedMessage_0_8 msg = (UnprocessedMessage_0_8) _channelId2UnprocessedMsgMap.get(channelId);
+ final UnprocessedMessage msg = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0 ? _channelId2UnprocessedMsgArray[channelId]
+ : _channelId2UnprocessedMsgMap.get(channelId);
+
+
if (msg == null)
{
throw new AMQException(null, "Error: received content header without having received a BasicDeliver frame first", null);
@@ -258,9 +271,19 @@
}
}
- public void messageContentBodyReceived(int channelId, ContentBody contentBody) throws AMQException
+ public void contentBodyReceived(final int channelId, ContentBody contentBody) throws AMQException
{
- UnprocessedMessage_0_8 msg = _channelId2UnprocessedMsgMap.get(channelId);
+ UnprocessedMessage_0_8 msg;
+ final boolean fastAccess = (channelId & FAST_CHANNEL_ACCESS_MASK) == 0;
+ if(fastAccess)
+ {
+ msg = (UnprocessedMessage_0_8) _channelId2UnprocessedMsgArray[channelId];
+ }
+ else
+ {
+ msg = (UnprocessedMessage_0_8) _channelId2UnprocessedMsgMap.get(channelId);
+ }
+
if (msg == null)
{
throw new AMQException(null, "Error: received content body without having received a JMSDeliver frame first", null);
@@ -268,7 +291,14 @@
if (msg.getContentHeader() == null)
{
- _channelId2UnprocessedMsgMap.remove(channelId);
+ if(fastAccess)
+ {
+ _channelId2UnprocessedMsgArray[channelId] = null;
+ }
+ else
+ {
+ _channelId2UnprocessedMsgMap.remove(channelId);
+ }
throw new AMQException(null, "Error: received content body without having received a ContentHeader frame first", null);
}
@@ -288,6 +318,11 @@
}
}
+ public void heartbeatBodyReceived(int channelId, HeartbeatBody body) throws AMQException
+ {
+
+ }
+
/**
* Deliver a message to the appropriate session, removing the unprocessed message from our map
*
@@ -298,7 +333,14 @@
{
AMQSession session = getSession(channelId);
session.messageReceived(msg);
- _channelId2UnprocessedMsgMap.remove(channelId);
+ if((channelId & FAST_CHANNEL_ACCESS_MASK) == 0)
+ {
+ _channelId2UnprocessedMsgArray[channelId] = null;
+ }
+ else
+ {
+ _channelId2UnprocessedMsgMap.remove(channelId);
+ }
}
protected AMQSession getSession(int channelId)
@@ -488,10 +530,19 @@
final AMQSession session = getSession(channelId);
session.setTicket(ticket);
}
-
-
public void setMethodDispatcher(MethodDispatcher methodDispatcher)
{
_methodDispatcher = methodDispatcher;
+ }
+
+ public void setFlowControl(final int channelId, final boolean active)
+ {
+ final AMQSession session = getSession(channelId);
+ session.setFlowControl(active);
+ }
+
+ public void methodFrameReceived(final int channel, final AMQMethodBody amqMethodBody) throws AMQException
+ {
+ _protocolHandler.methodBodyReceived(channel, amqMethodBody, _minaProtocolSession);
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Thu Feb 28 04:16:41 2008
@@ -36,7 +36,7 @@
* The state manager is responsible for managing the state of the protocol session. <p/> For each AMQProtocolHandler
* there is a separate state manager.
*/
-public class AMQStateManager implements AMQMethodListener
+public class AMQStateManager
{
private static final Logger _logger = LoggerFactory.getLogger(AMQStateManager.class);
@@ -51,7 +51,7 @@
* AMQFrame.
*/
- private final CopyOnWriteArraySet _stateListeners = new CopyOnWriteArraySet();
+
private final Object _stateLock = new Object();
private static final long MAXIMUM_STATE_WAIT_TIME = Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000"));
@@ -90,19 +90,6 @@
}
}
- public void error(Exception e)
- {
- _logger.debug("State manager receive error notification: " + e);
- synchronized (_stateListeners)
- {
- final Iterator it = _stateListeners.iterator();
- while (it.hasNext())
- {
- final StateListener l = (StateListener) it.next();
- l.error(e);
- }
- }
- }
public <B extends AMQMethodBody> boolean methodReceived(AMQMethodEvent<B> evt) throws AMQException
{
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/client/transport/TransportConnection.java Thu Feb 28 04:16:41 2008
@@ -85,7 +85,7 @@
throw new AMQNoTransportForProtocolException(details, null, null);
}
- if (transport == _currentInstance)
+ /* if (transport == _currentInstance)
{
if (transport == VM)
{
@@ -100,21 +100,23 @@
}
}
- _currentInstance = transport;
+ _currentInstance = transport;*/
+ ITransportConnection instance;
switch (transport)
{
case SOCKET:
- _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
- {
- public IoConnector newSocketConnector()
- {
- return new ExistingSocketConnector();
- }
- });
+ instance =
+ new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
+ {
+ public IoConnector newSocketConnector()
+ {
+ return new ExistingSocketConnector();
+ }
+ });
break;
case TCP:
- _instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
+ instance = new SocketTransportConnection(new SocketTransportConnection.SocketConnectorFactory()
{
public IoConnector newSocketConnector()
{
@@ -142,12 +144,15 @@
break;
case VM:
{
- _instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
+ instance = getVMTransport(details, Boolean.getBoolean("amqj.AutoCreateVMBroker"));
break;
}
+ default:
+ // FIXME: TGM
+ throw new AMQNoTransportForProtocolException(details, null, null);
}
- return _instance;
+ return instance;
}
private static int getTransport(String transport)
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jms/FailoverPolicy.java Thu Feb 28 04:16:41 2008
@@ -34,7 +34,6 @@
private static final long MINUTE = 60000L;
private static final long DEFAULT_METHOD_TIMEOUT = 1 * MINUTE;
- private static final long DEFAULT_FAILOVER_TIMEOUT = 4 * MINUTE;
private FailoverMethod[] _methods = new FailoverMethod[1];
@@ -161,16 +160,7 @@
}
else
{
- if ((now - _lastFailTime) >= DEFAULT_FAILOVER_TIMEOUT)
- {
- _logger.info("Failover timeout");
-
- return false;
- }
- else
- {
- _lastMethodTime = now;
- }
+ _lastMethodTime = now;
}
}
else
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/main/java/org/apache/qpid/jndi/PropertiesFileInitialContextFactory.java Thu Feb 28 04:16:41 2008
@@ -45,6 +45,7 @@
import java.io.BufferedInputStream;
import java.io.FileInputStream;
import java.io.IOException;
+import java.net.URISyntaxException;
import java.util.Hashtable;
import java.util.Iterator;
import java.util.Map;
@@ -216,7 +217,7 @@
{
binding = new AMQBindingURL(bindingURL);
}
- catch (URLSyntaxException urlse)
+ catch (URISyntaxException urlse)
{
_logger.warn("Unable to destination:" + urlse);
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/connection/ConnectionTest.java Thu Feb 28 04:16:41 2008
@@ -165,6 +165,22 @@
}
}
+ public void testUnresolvedVirtualHostFailure() throws Exception
+ {
+ try
+ {
+ new AMQConnection("amqp://guest:guest@clientid/rubbishhost?brokerlist='" + _broker + "?retries='0''");
+ fail("Connection should not be established");
+ }
+ catch (AMQException amqe)
+ {
+ if (!(amqe instanceof AMQConnectionFailureException))
+ {
+ fail("Correct exception not thrown. Excpected 'AMQConnectionFailureException' got: " + amqe);
+ }
+ }
+ }
+
public void testClientIdCannotBeChanged() throws Exception
{
Connection connection = new AMQConnection(_broker, "guest", "guest",
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/CloseBeforeAckTest.java Thu Feb 28 04:16:41 2008
@@ -29,8 +29,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import uk.co.thebadgerset.junit.concurrency.TestRunnable;
-import uk.co.thebadgerset.junit.concurrency.ThreadTestCoordinator;
+import org.apache.qpid.junit.concurrency.TestRunnable;
+import org.apache.qpid.junit.concurrency.ThreadTestCoordinator;
import javax.jms.Connection;
import javax.jms.Message;
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/AMQConnectionFailureException.java Thu Feb 28 04:16:41 2008
@@ -1,40 +1,47 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-package org.apache.qpid;
-
-/**
- * AMQConnectionFailureException indicates that a connection to a broker could not be formed.
- *
- * <p/><table id="crc"><caption>CRC Card</caption>
- * <tr><th> Responsibilities <th> Collaborations
- * <tr><td> Represents failure to connect to a broker.
- * </table>
- *
- * @todo Not an AMQP exception as no status code.
- */
-public class AMQConnectionFailureException extends AMQException
-{
- public AMQConnectionFailureException(String message, Throwable cause)
- {
- super(null, message, cause);
- }
-}
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid;
+
+import org.apache.qpid.protocol.AMQConstant;
+
+/**
+ * AMQConnectionFailureException indicates that a connection to a broker could not be formed.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represents failure to connect to a broker.
+ * </table>
+ *
+ * @todo Not an AMQP exception as no status code.
+ */
+public class AMQConnectionFailureException extends AMQException
+{
+ public AMQConnectionFailureException(String message, Throwable cause)
+ {
+ super(null, message, cause);
+ }
+
+ public AMQConnectionFailureException(AMQConstant errorCode, String message, Throwable cause)
+ {
+ super(errorCode, message, cause);
+ }
+}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java Thu Feb 28 04:16:41 2008
@@ -22,6 +22,7 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.IoSession;
+import org.apache.mina.common.SimpleByteBufferAllocator;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
@@ -48,6 +49,9 @@
*/
public class AMQDecoder extends CumulativeProtocolDecoder
{
+
+ private static final String BUFFER = AMQDecoder.class.getName() + ".Buffer";
+
/** Holds the 'normal' AMQP data decoder. */
private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder();
@@ -171,4 +175,97 @@
{
_expectProtocolInitiation = expectProtocolInitiation;
}
+
+
+ /**
+ * Cumulates content of <tt>in</tt> into internal buffer and forwards
+ * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
+ * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
+ * and the cumulative buffer is compacted after decoding ends.
+ *
+ * @throws IllegalStateException if your <tt>doDecode()</tt> returned
+ * <tt>true</tt> not consuming the cumulative buffer.
+ */
+ public void decode( IoSession session, ByteBuffer in,
+ ProtocolDecoderOutput out ) throws Exception
+ {
+ ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
+ // if we have a session buffer, append data to that otherwise
+ // use the buffer read from the network directly
+ if( buf != null )
+ {
+ buf.put( in );
+ buf.flip();
+ }
+ else
+ {
+ buf = in;
+ }
+
+ for( ;; )
+ {
+ int oldPos = buf.position();
+ boolean decoded = doDecode( session, buf, out );
+ if( decoded )
+ {
+ if( buf.position() == oldPos )
+ {
+ throw new IllegalStateException(
+ "doDecode() can't return true when buffer is not consumed." );
+ }
+
+ if( !buf.hasRemaining() )
+ {
+ break;
+ }
+ }
+ else
+ {
+ break;
+ }
+ }
+
+ // if there is any data left that cannot be decoded, we store
+ // it in a buffer in the session and next time this decoder is
+ // invoked the session buffer gets appended to
+ if ( buf.hasRemaining() )
+ {
+ storeRemainingInSession( buf, session );
+ }
+ else
+ {
+ removeSessionBuffer( session );
+ }
+ }
+
+ /**
+ * Releases the cumulative buffer used by the specified <tt>session</tt>.
+ * Please don't forget to call <tt>super.dispose( session )</tt> when
+ * you override this method.
+ */
+ public void dispose( IoSession session ) throws Exception
+ {
+ removeSessionBuffer( session );
+ }
+
+ private void removeSessionBuffer(IoSession session)
+ {
+ ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
+ if( buf != null )
+ {
+ buf.release();
+ session.removeAttribute( BUFFER );
+ }
+ }
+
+ private static final SimpleByteBufferAllocator SIMPLE_BYTE_BUFFER_ALLOCATOR = new SimpleByteBufferAllocator();
+
+ private void storeRemainingInSession(ByteBuffer buf, IoSession session)
+ {
+ ByteBuffer remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate( buf.remaining(), false );
+ remainingBuf.setAutoExpand( true );
+ remainingBuf.put( buf );
+ session.setAttribute( BUFFER, remainingBuf );
+ }
+
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java Thu Feb 28 04:16:41 2008
@@ -21,6 +21,8 @@
package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.AMQException;
public interface AMQBody
{
@@ -36,4 +38,6 @@
//public void populateFromBuffer(ByteBuffer buffer, long size)
// throws AMQFrameDecodingException, AMQProtocolVersionException;
+
+ void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) throws AMQException;
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java Thu Feb 28 04:16:41 2008
@@ -27,7 +27,7 @@
private final int _channel;
private final AMQBody _bodyFrame;
-
+ public static final byte FRAME_END_BYTE = (byte) 0xCE;
public AMQFrame(final int channel, final AMQBody bodyFrame)
@@ -47,13 +47,19 @@
return 1 + 2 + 4 + _bodyFrame.getSize() + 1;
}
+ public static final int getFrameOverhead()
+ {
+ return 1 + 2 + 4 + 1;
+ }
+
+
public void writePayload(ByteBuffer buffer)
{
buffer.put(_bodyFrame.getFrameType());
EncodingUtils.writeUnsignedShort(buffer, _channel);
EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize());
_bodyFrame.writePayload(buffer);
- buffer.put((byte) 0xCE);
+ buffer.put(FRAME_END_BYTE);
}
public final int getChannel()
@@ -66,10 +72,54 @@
return _bodyFrame;
}
-
-
public String toString()
{
return "Frame channelId: " + _channel + ", bodyFrame: " + String.valueOf(_bodyFrame);
}
+
+ public static void writeFrame(ByteBuffer buffer, final int channel, AMQBody body)
+ {
+ buffer.put(body.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, body.getSize());
+ body.writePayload(buffer);
+ buffer.put(FRAME_END_BYTE);
+
+ }
+
+ public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2)
+ {
+ buffer.put(body1.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, body1.getSize());
+ body1.writePayload(buffer);
+ buffer.put(FRAME_END_BYTE);
+ buffer.put(body2.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, body2.getSize());
+ body2.writePayload(buffer);
+ buffer.put(FRAME_END_BYTE);
+
+ }
+
+ public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2, AMQBody body3)
+ {
+ buffer.put(body1.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, body1.getSize());
+ body1.writePayload(buffer);
+ buffer.put(FRAME_END_BYTE);
+ buffer.put(body2.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, body2.getSize());
+ body2.writePayload(buffer);
+ buffer.put(FRAME_END_BYTE);
+ buffer.put(body3.getFrameType());
+ EncodingUtils.writeUnsignedShort(buffer, channel);
+ EncodingUtils.writeUnsignedInteger(buffer, body3.getSize());
+ body3.writePayload(buffer);
+ buffer.put(FRAME_END_BYTE);
+
+ }
+
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java Thu Feb 28 04:16:41 2008
@@ -24,7 +24,9 @@
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
+import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
public abstract class AMQMethodBodyImpl implements AMQMethodBody
{
@@ -84,6 +86,11 @@
public AMQConnectionException getConnectionException(AMQConstant code, String message, Throwable cause)
{
return new AMQConnectionException(code, message, getClazz(), getMethod(), getMajor(), getMinor(), cause);
+ }
+
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession session) throws AMQException
+ {
+ session.methodFrameReceived(channelId, this);
}
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java Thu Feb 28 04:16:41 2008
@@ -26,8 +26,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Map;
-import java.util.WeakHashMap;
+import java.util.*;
import java.lang.ref.WeakReference;
/**
@@ -38,6 +37,62 @@
*/
public final class AMQShortString implements CharSequence, Comparable<AMQShortString>
{
+ private static final byte MINUS = (byte)'-';
+ private static final byte ZERO = (byte) '0';
+
+
+
+ private final class TokenizerImpl implements AMQShortStringTokenizer
+ {
+ private final byte _delim;
+ private int _count = -1;
+ private int _pos = 0;
+
+ public TokenizerImpl(final byte delim)
+ {
+ _delim = delim;
+ }
+
+ public int countTokens()
+ {
+ if(_count == -1)
+ {
+ _count = 1 + AMQShortString.this.occurences(_delim);
+ }
+ return _count;
+ }
+
+ public AMQShortString nextToken()
+ {
+ if(_pos <= AMQShortString.this.length())
+ {
+ int nextDelim = AMQShortString.this.indexOf(_delim, _pos);
+ if(nextDelim == -1)
+ {
+ nextDelim = AMQShortString.this.length();
+ }
+
+ AMQShortString nextToken = AMQShortString.this.substring(_pos, nextDelim++);
+ _pos = nextDelim;
+ return nextToken;
+ }
+ else
+ {
+ return null;
+ }
+ }
+
+ public boolean hasMoreTokens()
+ {
+ return _pos <= AMQShortString.this.length();
+ }
+ }
+
+ private AMQShortString substring(final int from, final int to)
+ {
+ return new AMQShortString(_data, from, to);
+ }
+
private static final ThreadLocal<Map<AMQShortString, WeakReference<AMQShortString>>> _localInternMap =
new ThreadLocal<Map<AMQShortString, WeakReference<AMQShortString>>>()
@@ -53,7 +108,8 @@
private static final Logger _logger = LoggerFactory.getLogger(AMQShortString.class);
- private final ByteBuffer _data;
+ private final byte[] _data;
+ private final int _offset;
private int _hashCode;
private final int _length;
private static final char[] EMPTY_CHAR_ARRAY = new char[0];
@@ -65,17 +121,25 @@
public AMQShortString(byte[] data)
{
- _data = ByteBuffer.wrap(data);
+ _data = data.clone();
_length = data.length;
+ _offset = 0;
+ }
+
+ public AMQShortString(byte[] data, int pos)
+ {
+ final int size = data[pos++];
+ final byte[] dataCopy = new byte[size];
+ System.arraycopy(data,pos,dataCopy,0,size);
+ _length = size;
+ _data = dataCopy;
+ _offset = 0;
}
public AMQShortString(String data)
{
this((data == null) ? EMPTY_CHAR_ARRAY : data.toCharArray());
- if (data != null)
- {
- _hashCode = data.hashCode();
- }
+
}
public AMQShortString(char[] data)
@@ -87,14 +151,17 @@
final int length = data.length;
final byte[] stringBytes = new byte[length];
+ int hash = 0;
for (int i = 0; i < length; i++)
{
stringBytes[i] = (byte) (0xFF & data[i]);
+ hash = (31 * hash) + stringBytes[i];
}
+ _hashCode = hash;
+ _data = stringBytes;
- _data = ByteBuffer.wrap(stringBytes);
- _data.rewind();
_length = length;
+ _offset = 0;
}
@@ -110,20 +177,31 @@
}
- _data = ByteBuffer.wrap(stringBytes);
- _data.rewind();
+ _data = stringBytes;
_hashCode = hash;
_length = length;
+ _offset = 0;
}
- private AMQShortString(ByteBuffer data)
+ private AMQShortString(ByteBuffer data, final int length)
{
- _data = data;
- _length = data.limit();
+ byte[] dataBytes = new byte[length];
+ data.get(dataBytes);
+ _data = dataBytes;
+ _length = length;
+ _offset = 0;
}
+ private AMQShortString(final byte[] data, final int from, final int to)
+ {
+ _offset = from;
+ _length = to - from;
+ _data = data;
+ }
+
+
/**
* Get the length of the short string
* @return length of the underlying byte array
@@ -136,7 +214,7 @@
public char charAt(int index)
{
- return (char) _data.get(index);
+ return (char) _data[_offset + index];
}
@@ -148,27 +226,24 @@
public int writeToByteArray(byte[] encoding, int pos)
{
final int size = length();
- encoding[pos++] = (byte) length();
- for (int i = 0; i < size; i++)
- {
- encoding[pos++] = _data.get(i);
- }
-
- return pos;
+ encoding[pos++] = (byte) size;
+ System.arraycopy(_data,_offset,encoding,pos,size);
+ return pos+size;
}
public static AMQShortString readFromByteArray(byte[] byteEncodedDestination, int pos)
{
- final byte len = byteEncodedDestination[pos];
- if (len == 0)
+
+ final AMQShortString shortString = new AMQShortString(byteEncodedDestination, pos);
+ if(shortString.length() == 0)
{
return null;
}
-
- ByteBuffer data = ByteBuffer.wrap(byteEncodedDestination, pos + 1, len).slice();
-
- return new AMQShortString(data);
+ else
+ {
+ return shortString;
+ }
}
public static AMQShortString readFromBuffer(ByteBuffer buffer)
@@ -180,89 +255,58 @@
}
else
{
- ByteBuffer data = buffer.slice();
- data.limit(length);
- data.rewind();
- buffer.skip(length);
- return new AMQShortString(data);
+ return new AMQShortString(buffer, length);
}
}
public byte[] getBytes()
{
-
- if (_data.buf().hasArray() && (_data.arrayOffset() == 0))
+ if(_offset == 0 && _length == _data.length)
{
- return _data.array();
+ return _data.clone();
}
else
{
- final int size = length();
- byte[] b = new byte[size];
- ByteBuffer buf = _data.duplicate();
- buf.rewind();
- buf.get(b);
-
- return b;
+ byte[] data = new byte[_length];
+ System.arraycopy(_data,_offset,data,0,_length);
+ return data;
}
-
}
public void writeToBuffer(ByteBuffer buffer)
{
final int size = length();
- if (size != 0)
- {
-
- buffer.put((byte) size);
- if (_data.buf().hasArray())
- {
- buffer.put(_data.array(), _data.arrayOffset(), length());
- }
- else
- {
-
- for (int i = 0; i < size; i++)
- {
-
- buffer.put(_data.get(i));
- }
- }
- }
- else
- {
- // really writing out unsigned byte
- buffer.put((byte) 0);
- }
-
+ //buffer.setAutoExpand(true);
+ buffer.put((byte) size);
+ buffer.put(_data, _offset, size);
}
private final class CharSubSequence implements CharSequence
{
- private final int _offset;
+ private final int _sequenceOffset;
private final int _end;
public CharSubSequence(final int offset, final int end)
{
- _offset = offset;
+ _sequenceOffset = offset;
_end = end;
}
public int length()
{
- return _end - _offset;
+ return _end - _sequenceOffset;
}
public char charAt(int index)
{
- return AMQShortString.this.charAt(index + _offset);
+ return AMQShortString.this.charAt(index + _sequenceOffset);
}
public CharSequence subSequence(int start, int end)
{
- return new CharSubSequence(start + _offset, end + _offset);
+ return new CharSubSequence(start + _sequenceOffset, end + _sequenceOffset);
}
}
@@ -273,12 +317,11 @@
final int size = length();
chars = new char[size];
- for (int i = 0; i < size; i++)
- {
- chars[i] = (char) _data.get(i);
+ for (int i = 0; i < size; i++)
+ {
+ chars[i] = (char) _data[i + _offset];
}
- }
-
+ }
return chars;
}
@@ -294,6 +337,17 @@
public boolean equals(Object o)
{
+
+
+ if(o instanceof AMQShortString)
+ {
+ return equals((AMQShortString)o);
+ }
+ if(o instanceof CharSequence)
+ {
+ return equals((CharSequence)o);
+ }
+
if (o == null)
{
return false;
@@ -304,26 +358,40 @@
return true;
}
- if (o instanceof AMQShortString)
- {
- final AMQShortString otherString = (AMQShortString) o;
+ return false;
- if ((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode))
- {
- return false;
- }
+ }
- return _data.equals(otherString._data);
+ public boolean equals(final AMQShortString otherString)
+ {
+ if (otherString == this)
+ {
+ return true;
+ }
+
+ if (otherString == null)
+ {
+ return false;
+ }
+ if ((_hashCode != 0) && (otherString._hashCode != 0) && (_hashCode != otherString._hashCode))
+ {
+ return false;
}
- return (o instanceof CharSequence) && equals((CharSequence) o);
+ return (_offset == 0 && otherString._offset == 0 && _length == _data.length && otherString._length == otherString._data.length && Arrays.equals(_data,otherString._data))
+ || Arrays.equals(getBytes(),otherString.getBytes());
}
public boolean equals(CharSequence s)
{
+ if(s instanceof AMQShortString)
+ {
+ return equals((AMQShortString)s);
+ }
+
if (s == null)
{
return false;
@@ -354,7 +422,7 @@
for (int i = 0; i < size; i++)
{
- hash = (31 * hash) + _data.get(i);
+ hash = (31 * hash) + _data[i+_offset];
}
_hashCode = hash;
@@ -389,8 +457,8 @@
for (int i = 0; i < length(); i++)
{
- final byte d = _data.get(i);
- final byte n = name._data.get(i);
+ final byte d = _data[i+_offset];
+ final byte n = name._data[i+name._offset];
if (d < n)
{
return -1;
@@ -406,6 +474,12 @@
}
}
+ public AMQShortStringTokenizer tokenize(byte delim)
+ {
+ return new TokenizerImpl(delim);
+ }
+
+
public AMQShortString intern()
{
@@ -443,4 +517,111 @@
return internString;
}
+
+ private int occurences(final byte delim)
+ {
+ int count = 0;
+ final int end = _offset + _length;
+ for(int i = _offset ; i < end ; i++ )
+ {
+ if(_data[i] == delim)
+ {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ private int indexOf(final byte val, final int pos)
+ {
+
+ for(int i = pos; i < length(); i++)
+ {
+ if(_data[_offset+i] == val)
+ {
+ return i;
+ }
+ }
+ return -1;
+ }
+
+
+ public static AMQShortString join(final Collection<AMQShortString> terms,
+ final AMQShortString delim)
+ {
+ if(terms.size() == 0)
+ {
+ return EMPTY_STRING;
+ }
+
+ int size = delim.length() * (terms.size() - 1);
+ for(AMQShortString term : terms)
+ {
+ size += term.length();
+ }
+
+ byte[] data = new byte[size];
+ int pos = 0;
+ final byte[] delimData = delim._data;
+ final int delimOffset = delim._offset;
+ final int delimLength = delim._length;
+
+
+ for(AMQShortString term : terms)
+ {
+
+ if(pos!=0)
+ {
+ System.arraycopy(delimData, delimOffset,data,pos, delimLength);
+ pos+=delimLength;
+ }
+ System.arraycopy(term._data,term._offset,data,pos,term._length);
+ pos+=term._length;
+ }
+
+
+
+ return new AMQShortString(data,0,size);
+ }
+
+ public int toIntValue()
+ {
+ int pos = 0;
+ int val = 0;
+
+
+ boolean isNegative = (_data[pos] == MINUS);
+ if(isNegative)
+ {
+ pos++;
+ }
+ while(pos < _length)
+ {
+ int digit = (int) (_data[pos++] - ZERO);
+ if((digit < 0) || (digit > 9))
+ {
+ throw new NumberFormatException("\""+toString()+"\" is not a valid number");
+ }
+ val = val * 10;
+ val += digit;
+ }
+ if(isNegative)
+ {
+ val = val * -1;
+ }
+ return val;
+ }
+
+ public boolean contains(final byte b)
+ {
+ for(int i = 0; i < _length; i++)
+ {
+ if(_data[i] == b)
+ {
+ return true;
+ }
+ }
+ return false; //To change body of created methods use File | Settings | File Templates.
+ }
+
}
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/CompositeAMQDataBlock.java Thu Feb 28 04:16:41 2008
@@ -24,7 +24,6 @@
public class CompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock
{
- private AMQDataBlock _firstFrame;
private AMQDataBlock[] _blocks;
@@ -33,27 +32,12 @@
_blocks = blocks;
}
- /**
- * The encoded block will be logically first before the AMQDataBlocks which are encoded
- * into the buffer afterwards.
- * @param encodedBlock already-encoded data
- * @param blocks some blocks to be encoded.
- */
- public CompositeAMQDataBlock(AMQDataBlock encodedBlock, AMQDataBlock[] blocks)
- {
- this(blocks);
- _firstFrame = encodedBlock;
- }
public AMQDataBlock[] getBlocks()
{
return _blocks;
}
- public AMQDataBlock getFirstFrame()
- {
- return _firstFrame;
- }
public long getSize()
{
@@ -62,19 +46,11 @@
{
frameSize += _blocks[i].getSize();
}
- if (_firstFrame != null)
- {
- frameSize += _firstFrame.getSize();
- }
return frameSize;
}
public void writePayload(ByteBuffer buffer)
{
- if (_firstFrame != null)
- {
- _firstFrame.writePayload(buffer);
- }
for (int i = 0; i < _blocks.length; i++)
{
_blocks[i].writePayload(buffer);
@@ -90,7 +66,7 @@
else
{
StringBuilder buf = new StringBuilder(this.getClass().getName());
- buf.append("{encodedBlock=").append(_firstFrame);
+ buf.append("{");
for (int i = 0 ; i < _blocks.length; i++)
{
buf.append(" ").append(i).append("=[").append(_blocks[i].toString()).append("]");
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java Thu Feb 28 04:16:41 2008
@@ -21,6 +21,8 @@
package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.AMQException;
public class ContentBody implements AMQBody
{
@@ -66,6 +68,12 @@
ByteBuffer copy = payload.duplicate();
buffer.put(copy.rewind());
}
+ }
+
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
+ throws AMQException
+ {
+ session.contentBodyReceived(channelId, this);
}
protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentHeaderBody.java Thu Feb 28 04:16:41 2008
@@ -21,6 +21,8 @@
package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.AMQException;
public class ContentHeaderBody implements AMQBody
{
@@ -108,6 +110,12 @@
buffer.putLong(bodySize);
EncodingUtils.writeUnsignedShort(buffer, properties.getPropertyFlags());
properties.writePropertyListPayload(buffer);
+ }
+
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
+ throws AMQException
+ {
+ session.contentHeaderReceived(channelId, this);
}
public static AMQFrame createAMQFrame(int channelId, int classId, int weight, BasicContentHeaderProperties properties,
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java Thu Feb 28 04:16:41 2008
@@ -21,6 +21,8 @@
package org.apache.qpid.framing;
import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.AMQException;
public class HeartbeatBody implements AMQBody
{
@@ -53,6 +55,12 @@
public void writePayload(ByteBuffer buffer)
{
+ }
+
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession session)
+ throws AMQException
+ {
+ session.heartbeatBodyReceived(channelId, this);
}
protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException
Modified: incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java?rev=631938&r1=631937&r2=631938&view=diff
==============================================================================
--- incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java (original)
+++ incubator/qpid/branches/thegreatmerge/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java Thu Feb 28 04:16:41 2008
@@ -70,7 +70,7 @@
return new MethodConverter_0_9.MessagePublishInfoImpl(exchange,
publishBody.getImmediate(),
publishBody.getMandatory(),
- routingKey == null ? null : routingKey.intern());
+ routingKey);
}