You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2006/12/21 15:24:40 UTC
svn commit: r489368 - in /incubator/qpid/branches/new_persistence/java:
broker/src/main/java/org/apache/qpid/server/exchange/ client/example/
client/example/src/ client/example/src/main/ client/example/src/main/java/
client/example/src/main/java/org/ c...
Author: rgreig
Date: Thu Dec 21 06:24:38 2006
New Revision: 489368
URL: http://svn.apache.org/viewvc?view=rev&rev=489368
Log:
Merge from trunk up to rev 486165
Added:
incubator/qpid/branches/new_persistence/java/client/example/
- copied from r486165, incubator/qpid/trunk/qpid/java/client/example/
incubator/qpid/branches/new_persistence/java/client/example/pom.xml
- copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/pom.xml
incubator/qpid/branches/new_persistence/java/client/example/src/
- copied from r486165, incubator/qpid/trunk/qpid/java/client/example/src/
incubator/qpid/branches/new_persistence/java/client/example/src/main/
- copied from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/
incubator/qpid/branches/new_persistence/java/client/example/src/main/java/
- copied from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/
incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/
- copied from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/
incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/
- copied from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/
incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/
- copied from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/
incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/
- copied from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/
incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/log4j.xml
- copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/log4j.xml
incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/publisher/
- copied from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/
incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java
- copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageDispatcher.java
incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java
- copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/FileMessageFactory.java
incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java
- copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MessageFactoryException.java
incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java
- copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorMessageDispatcher.java
incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java
- copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/MonitorPublisher.java
incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java
- copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/Publisher.java
incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java
- copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/publisher/UndeliveredMessageException.java
incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/shared/
- copied from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/
incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java
- copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/ConnectionException.java
incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java
- copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/ContextException.java
incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/shared/FileUtils.java
- copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/FileUtils.java
incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java
- copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/InitialContextHelper.java
incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/shared/Statics.java
- copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/Statics.java
incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties
- copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/shared/example.properties
incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/subscriber/
- copied from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/
incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java
- copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriber.java
incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java
- copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/MonitoredSubscriptionWrapper.java
incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/subscriber/Subscriber.java
- copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/Subscriber.java
incubator/qpid/branches/new_persistence/java/client/example/src/main/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java
- copied unchanged from r486165, incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/subscriber/SubscriptionWrapper.java
incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
- copied unchanged from r486165, incubator/qpid/trunk/qpid/java/systests/src/test/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
Removed:
incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/example/
Modified:
incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindConnectionFactory.java
incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java
incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java
incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java
incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
Modified: incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java?view=diff&rev=489368&r1=489367&r2=489368
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java Thu Dec 21 06:24:38 2006
@@ -206,7 +206,18 @@
}
if (!routed)
{
- _logger.warn("Exchange " + getName() + ": message not routable.");
+
+ String msg = "Exchange " + getName() + ": message not routable.";
+
+ if (payload.getPublishBody().mandatory)
+ {
+ throw new NoRouteException(msg, payload);
+ }
+ else
+ {
+ _logger.warn(msg);
+ }
+
}
}
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java?view=diff&rev=489368&r1=489367&r2=489368
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesMessage.java Thu Dec 21 06:24:38 2006
@@ -21,23 +21,20 @@
package org.apache.qpid.client.message;
import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
import javax.jms.JMSException;
-import javax.jms.MessageNotReadableException;
import javax.jms.MessageEOFException;
-import javax.jms.MessageNotWriteableException;
import java.io.IOException;
import java.nio.charset.Charset;
-import java.nio.charset.CharacterCodingException;
/**
* @author Apache Software Foundation
*/
public abstract class AbstractBytesMessage extends AbstractJMSMessage
-{
+{
/**
* The default initial size of the buffer. The buffer expands automatically.
@@ -79,7 +76,7 @@
{
_data.clear();
}
-
+
public String toBodyString() throws JMSException
{
checkReadable();
@@ -124,7 +121,7 @@
return data;
}
}
-
+
/**
* Check that there is at least a certain number of bytes available to read
*
@@ -138,10 +135,4 @@
throw new MessageEOFException("Unable to read " + len + " bytes");
}
}
-
- public void reset() throws JMSException
- {
- super.reset();
- _data.flip();
- }
}
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=489368&r1=489367&r2=489368
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Thu Dec 21 06:24:38 2006
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -384,15 +384,15 @@
}
public void acknowledge() throws JMSException
- {
+ {
// the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
// is not specified. In our case, we only set the session field where client acknowledge mode is specified.
if (_session != null)
{
if (_session.getAMQConnection().isClosed()){
throw new javax.jms.IllegalStateException("Connection is already closed");
- }
-
+ }
+
// we set multiple to true here since acknowledgement implies acknowledge of all previous messages
// received on the session
_session.acknowledgeMessage(_deliveryTag, true);
@@ -546,7 +546,14 @@
public void reset() throws JMSException
{
- _readableMessage = true;
+ if (_readableMessage)
+ {
+ _data.rewind();
+ }
+ else
+ {
+ _data.flip();
+ _readableMessage = true;
+ }
}
-
}
Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java?view=diff&rev=489368&r1=489367&r2=489368
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java Thu Dec 21 06:24:38 2006
@@ -33,7 +33,7 @@
*/
public class JMSStreamMessage extends AbstractBytesMessage implements StreamMessage
{
- public static final String MIME_TYPE="jms/stream-message";
+ public static final String MIME_TYPE="jms/stream-message";
private static final byte BOOLEAN_TYPE = (byte) 1;
@@ -55,6 +55,8 @@
private static final byte STRING_TYPE = (byte) 10;
+ private static final byte NULL_STRING_TYPE = (byte) 11;
+
/**
* This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read
* a byte array in multiple chunks, hence this is used to track how much is left to be read
@@ -89,7 +91,7 @@
return MIME_TYPE;
}
- private byte readAndCheckType() throws MessageFormatException, MessageEOFException,
+ private byte readWireType() throws MessageFormatException, MessageEOFException,
MessageNotReadableException
{
checkReadable();
@@ -105,22 +107,32 @@
public boolean readBoolean() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
boolean result;
- switch (wireType)
+ try
{
- case BOOLEAN_TYPE:
- checkAvailable(1);
- result = readBooleanImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Boolean.parseBoolean(readStringImpl());
- break;
- default:
- throw new MessageFormatException("Unable to convert " + wireType + " to a boolean");
+ switch (wireType)
+ {
+ case BOOLEAN_TYPE:
+ checkAvailable(1);
+ result = readBooleanImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Boolean.parseBoolean(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a boolean");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
- return result;
}
private boolean readBooleanImpl()
@@ -130,20 +142,30 @@
public byte readByte() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
byte result;
- switch (wireType)
+ try
+ {
+ switch (wireType)
+ {
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Byte.parseByte(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a byte");
+ }
+ }
+ catch (RuntimeException e)
{
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Byte.parseByte(readStringImpl());
- break;
- default:
- throw new MessageFormatException("Unable to convert " + wireType + " to a byte");
+ _data.position(position);
+ throw e;
}
return result;
}
@@ -155,24 +177,34 @@
public short readShort() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
short result;
- switch (wireType)
+ try
{
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Short.parseShort(readStringImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- default:
- throw new MessageFormatException("Unable to convert " + wireType + " to a short");
+ switch (wireType)
+ {
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Short.parseShort(readStringImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a short");
+ }
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
return result;
}
@@ -190,15 +222,25 @@
*/
public char readChar() throws JMSException
{
- byte wireType = readAndCheckType();
- if (wireType != CHAR_TYPE)
+ int position = _data.position();
+ byte wireType = readWireType();
+ try
{
- throw new MessageFormatException("Unable to convert " + wireType + " to a char");
+ if (wireType != CHAR_TYPE)
+ {
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a char");
+ }
+ else
+ {
+ checkAvailable(2);
+ return readCharImpl();
+ }
}
- else
+ catch (RuntimeException e)
{
- checkAvailable(2);
- return readCharImpl();
+ _data.position(position);
+ throw e;
}
}
@@ -209,30 +251,40 @@
public int readInt() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
int result;
- switch (wireType)
+ try
{
- case INT_TYPE:
- checkAvailable(4);
- result = readIntImpl();
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Integer.parseInt(readStringImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- default:
- throw new MessageFormatException("Unable to convert " + wireType + " to an int");
+ switch (wireType)
+ {
+ case INT_TYPE:
+ checkAvailable(4);
+ result = readIntImpl();
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Integer.parseInt(readStringImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to an int");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
- return result;
}
private int readIntImpl()
@@ -242,34 +294,44 @@
public long readLong() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
long result;
- switch (wireType)
+ try
{
- case LONG_TYPE:
- checkAvailable(8);
- result = readLongImpl();
- break;
- case INT_TYPE:
- checkAvailable(4);
- result = readIntImpl();
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Long.parseLong(readStringImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- default:
- throw new MessageFormatException("Unable to convert " + wireType + " to a long");
+ switch (wireType)
+ {
+ case LONG_TYPE:
+ checkAvailable(8);
+ result = readLongImpl();
+ break;
+ case INT_TYPE:
+ checkAvailable(4);
+ result = readIntImpl();
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Long.parseLong(readStringImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a long");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
- return result;
}
private long readLongImpl()
@@ -279,22 +341,32 @@
public float readFloat() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
float result;
- switch (wireType)
+ try
+ {
+ switch (wireType)
+ {
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = readFloatImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Float.parseFloat(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a float");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
{
- case FLOAT_TYPE:
- checkAvailable(4);
- result = readFloatImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Float.parseFloat(readStringImpl());
- break;
- default:
- throw new MessageFormatException("Unable to convert " + wireType + " to a float");
+ _data.position(position);
+ throw e;
}
- return result;
}
private float readFloatImpl()
@@ -304,26 +376,36 @@
public double readDouble() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
double result;
- switch (wireType)
+ try
+ {
+ switch (wireType)
+ {
+ case DOUBLE_TYPE:
+ checkAvailable(8);
+ result = readDoubleImpl();
+ break;
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = readFloatImpl();
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = Double.parseDouble(readStringImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a double");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
{
- case DOUBLE_TYPE:
- checkAvailable(8);
- result = readDoubleImpl();
- break;
- case FLOAT_TYPE:
- checkAvailable(4);
- result = readFloatImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Double.parseDouble(readStringImpl());
- break;
- default:
- throw new MessageFormatException("Unable to convert " + wireType + " to a double");
+ _data.position(position);
+ throw e;
}
- return result;
}
private double readDoubleImpl()
@@ -333,50 +415,63 @@
public String readString() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
String result;
- switch (wireType)
+ try
{
- case STRING_TYPE:
- checkAvailable(1);
- result = readStringImpl();
- break;
- case BOOLEAN_TYPE:
- checkAvailable(1);
- result = String.valueOf(readBooleanImpl());
- break;
- case LONG_TYPE:
- checkAvailable(8);
- result = String.valueOf(readLongImpl());
- break;
- case INT_TYPE:
- checkAvailable(4);
- result = String.valueOf(readIntImpl());
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = String.valueOf(readShortImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = String.valueOf(readByteImpl());
- break;
- case FLOAT_TYPE:
- checkAvailable(4);
- result = String.valueOf(readFloatImpl());
- break;
- case DOUBLE_TYPE:
- checkAvailable(8);
- result = String.valueOf(readDoubleImpl());
- break;
- case CHAR_TYPE:
- checkAvailable(2);
- result = String.valueOf(readCharImpl());
- break;
- default:
- throw new MessageFormatException("Unable to convert " + wireType + " to a String");
+ switch (wireType)
+ {
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = readStringImpl();
+ break;
+ case NULL_STRING_TYPE:
+ result = null;
+ break;
+ case BOOLEAN_TYPE:
+ checkAvailable(1);
+ result = String.valueOf(readBooleanImpl());
+ break;
+ case LONG_TYPE:
+ checkAvailable(8);
+ result = String.valueOf(readLongImpl());
+ break;
+ case INT_TYPE:
+ checkAvailable(4);
+ result = String.valueOf(readIntImpl());
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = String.valueOf(readShortImpl());
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = String.valueOf(readByteImpl());
+ break;
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = String.valueOf(readFloatImpl());
+ break;
+ case DOUBLE_TYPE:
+ checkAvailable(8);
+ result = String.valueOf(readDoubleImpl());
+ break;
+ case CHAR_TYPE:
+ checkAvailable(2);
+ result = String.valueOf(readCharImpl());
+ break;
+ default:
+ _data.position(position);
+ throw new MessageFormatException("Unable to convert " + wireType + " to a String");
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
- return result;
}
private String readStringImpl() throws JMSException
@@ -406,7 +501,7 @@
// type discriminator checked separately so you get a MessageFormatException rather than
// an EOF even in the case where both would be applicable
checkAvailable(1);
- byte wireType = readAndCheckType();
+ byte wireType = readWireType();
if (wireType != BYTEARRAY_TYPE)
{
throw new MessageFormatException("Unable to convert " + wireType + " to a byte array");
@@ -431,18 +526,25 @@
}
}
}
+ else if (_byteArrayRemaining == 0)
+ {
+ _byteArrayRemaining = -1;
+ return -1;
+ }
- return readBytesImpl(bytes);
+ int returnedSize = readBytesImpl(bytes);
+ if (returnedSize < bytes.length)
+ {
+ _byteArrayRemaining = -1;
+ }
+ return returnedSize;
}
private int readBytesImpl(byte[] bytes)
{
int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining);
_byteArrayRemaining -= count;
- if (_byteArrayRemaining == 0)
- {
- _byteArrayRemaining = -1;
- }
+
if (count == 0)
{
return 0;
@@ -456,62 +558,74 @@
public Object readObject() throws JMSException
{
- byte wireType = readAndCheckType();
+ int position = _data.position();
+ byte wireType = readWireType();
Object result = null;
- switch (wireType)
+ try
{
- case BOOLEAN_TYPE:
- checkAvailable(1);
- result = readBooleanImpl();
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- case BYTEARRAY_TYPE:
- checkAvailable(4);
- int size = _data.getInt();
- if (size == -1)
- {
+ switch (wireType)
+ {
+ case BOOLEAN_TYPE:
+ checkAvailable(1);
+ result = readBooleanImpl();
+ break;
+ case BYTE_TYPE:
+ checkAvailable(1);
+ result = readByteImpl();
+ break;
+ case BYTEARRAY_TYPE:
+ checkAvailable(4);
+ int size = _data.getInt();
+ if (size == -1)
+ {
+ result = null;
+ }
+ else
+ {
+ _byteArrayRemaining = size;
+ result = new byte[size];
+ readBytesImpl(new byte[size]);
+ }
+ break;
+ case SHORT_TYPE:
+ checkAvailable(2);
+ result = readShortImpl();
+ break;
+ case CHAR_TYPE:
+ checkAvailable(2);
+ result = readCharImpl();
+ break;
+ case INT_TYPE:
+ checkAvailable(4);
+ result = readIntImpl();
+ break;
+ case LONG_TYPE:
+ checkAvailable(8);
+ result = readLongImpl();
+ break;
+ case FLOAT_TYPE:
+ checkAvailable(4);
+ result = readFloatImpl();
+ break;
+ case DOUBLE_TYPE:
+ checkAvailable(8);
+ result = readDoubleImpl();
+ break;
+ case NULL_STRING_TYPE:
result = null;
- }
- else
- {
- _byteArrayRemaining = size;
- result = new byte[size];
- readBytesImpl(new byte[size]);
- }
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case CHAR_TYPE:
- checkAvailable(2);
- result = readCharImpl();
- break;
- case INT_TYPE:
- checkAvailable(4);
- result = readIntImpl();
- break;
- case LONG_TYPE:
- checkAvailable(8);
- result = readLongImpl();
- break;
- case FLOAT_TYPE:
- checkAvailable(4);
- result = readFloatImpl();
- break;
- case DOUBLE_TYPE:
- checkAvailable(8);
- result = readDoubleImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = readStringImpl();
- break;
+ break;
+ case STRING_TYPE:
+ checkAvailable(1);
+ result = readStringImpl();
+ break;
+ }
+ return result;
+ }
+ catch (RuntimeException e)
+ {
+ _data.position(position);
+ throw e;
}
- return result;
}
public void writeBoolean(boolean b) throws JMSException
@@ -564,18 +678,25 @@
public void writeString(String string) throws JMSException
{
- writeTypeDiscriminator(STRING_TYPE);
- try
+ if (string == null)
{
- _data.putString(string, Charset.forName("UTF-8").newEncoder());
- // we must write the null terminator ourselves
- _data.put((byte)0);
+ writeTypeDiscriminator(NULL_STRING_TYPE);
}
- catch (CharacterCodingException e)
+ else
{
- JMSException ex = new JMSException("Unable to encode string: " + e);
- ex.setLinkedException(e);
- throw ex;
+ writeTypeDiscriminator(STRING_TYPE);
+ try
+ {
+ _data.putString(string, Charset.forName("UTF-8").newEncoder());
+ // we must write the null terminator ourselves
+ _data.put((byte)0);
+ }
+ catch (CharacterCodingException e)
+ {
+ JMSException ex = new JMSException("Unable to encode string: " + e);
+ ex.setLinkedException(e);
+ throw ex;
+ }
}
}
@@ -601,11 +722,17 @@
public void writeObject(Object object) throws JMSException
{
checkWritable();
+ Class clazz = null;
if (object == null)
{
- throw new NullPointerException("Argument must not be null");
+ // string handles the output of null values
+ clazz = String.class;
+ }
+ else
+ {
+ clazz = object.getClass();
}
- Class clazz = object.getClass();
+
if (clazz == Byte.class)
{
writeByte((Byte) object);
Modified: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindConnectionFactory.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindConnectionFactory.java?view=diff&rev=489368&r1=489367&r2=489368
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindConnectionFactory.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindConnectionFactory.java Thu Dec 21 06:24:38 2006
@@ -36,7 +36,7 @@
{
public static final String CONNECTION_FACTORY_BINDING = "amq.ConnectionFactory";
- public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + "/IBMPerfTestsJNDI";
+ public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "IBMPerfTestsJNDI";
public static final String PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH;
public static final String FSCONTEXT_FACTORY = "com.sun.jndi.fscontext.RefFSContextFactory";
public static final String DEFAULT_CONNECTION_URL = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'";
@@ -154,7 +154,7 @@
}
catch (NamingException e)
{
-
+ System.out.println("Operation failed: " + e);
}
// Perform the bind
@@ -169,11 +169,11 @@
}
catch (NamingException amqe)
{
-
+ System.out.println("Operation failed: " + amqe);
}
catch (URLSyntaxException e)
{
-
+ System.out.println("Operation failed: " + e);
}
}
Modified: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java?view=diff&rev=489368&r1=489367&r2=489368
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindQueue.java Thu Dec 21 06:24:38 2006
@@ -20,24 +20,25 @@
*/
package org.apache.qpid.IBMPerfTest;
-import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQQueue;
-import org.apache.log4j.Logger;
-import org.apache.log4j.Level;
+import org.apache.qpid.client.AMQSession;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.Session;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
-import javax.jms.*;
-import java.util.Hashtable;
import java.io.File;
-import java.net.MalformedURLException;
+import java.util.Hashtable;
public class JNDIBindQueue
-{
- public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + "/IBMPerfTestsJNDI";
+{
+ public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "IBMPerfTestsJNDI";
public static final String PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH;
public static final String FSCONTEXT_FACTORY = "com.sun.jndi.fscontext.RefFSContextFactory";
@@ -98,7 +99,7 @@
}
catch (JMSException closeE)
{
-
+ System.out.println("Connection closing failed: " + closeE);
}
}
Modified: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java?view=diff&rev=489368&r1=489367&r2=489368
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/IBMPerfTest/JNDIBindTopic.java Thu Dec 21 06:24:38 2006
@@ -20,24 +20,25 @@
*/
package org.apache.qpid.IBMPerfTest;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.client.AMQTopic;
-import org.apache.log4j.Logger;
-import org.apache.log4j.Level;
-import javax.jms.*;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.Topic;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
-import java.util.Hashtable;
import java.io.File;
-import java.net.MalformedURLException;
+import java.util.Hashtable;
public class JNDIBindTopic
-{
- public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + "/IBMPerfTestsJNDI";
+{
+ public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + File.separator + "IBMPerfTestsJNDI";
public static final String PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH;
public static final String FSCONTEXT_FACTORY = "com.sun.jndi.fscontext.RefFSContextFactory";
@@ -99,11 +100,9 @@
}
catch (JMSException closeE)
{
-
+ System.out.println("Operation failed: " + closeE);
}
}
-
-
}
Modified: incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java?view=diff&rev=489368&r1=489367&r2=489368
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/test/java/org/apache/qpid/test/unit/client/message/StreamMessageTest.java Thu Dec 21 06:24:38 2006
@@ -168,24 +168,6 @@
}
}
- public void testWriteObjectThrowsNPE() throws Exception
- {
- try
- {
- JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
- bm.writeObject(null);
- fail("expected exception did not occur");
- }
- catch (NullPointerException n)
- {
- // ok
- }
- catch (Exception e)
- {
- fail("expected NullPointerException, got " + e);
- }
- }
-
public void testReadBoolean() throws Exception
{
JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
@@ -221,9 +203,34 @@
len = bm.readBytes(bytes);
assertEquals(-1, len);
len = bm.readBytes(bytes);
+ assertEquals(-1, len);
+ len = bm.readBytes(bytes);
assertEquals(0, len);
}
+ public void testReadBytesFollowedByPrimitive() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeBytes(new byte[]{2, 3, 4, 5, 6, 7, 8});
+ bm.writeBytes(new byte[]{2, 3, 4, 5, 6, 7});
+ bm.writeString("Foo");
+ bm.reset();
+ int len;
+ do
+ {
+ len = bm.readBytes(new byte[2]);
+ }
+ while (len == 2);
+
+ do
+ {
+ len = bm.readBytes(new byte[2]);
+ }
+ while (len == 2);
+
+ assertEquals("Foo", bm.readString());
+ }
+
public void testReadMultipleByteArrays() throws Exception
{
JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
@@ -577,11 +584,11 @@
bm = TestMessageHelper.newJMSStreamMessage();
bm.writeString("2");
bm.reset();
- assertEquals((byte)2, bm.readByte());
+ assertEquals((byte)2, bm.readByte());
bm.reset();
assertEquals((short)2, bm.readShort());
bm.reset();
- assertEquals((int)2, bm.readInt());
+ assertEquals(2, bm.readInt());
bm.reset();
assertEquals((long)2, bm.readLong());
bm = TestMessageHelper.newJMSStreamMessage();
@@ -590,6 +597,16 @@
assertEquals(5.7f, bm.readFloat());
bm.reset();
assertEquals(5.7d, bm.readDouble());
+ }
+
+ public void testNulls() throws Exception
+ {
+ JMSStreamMessage bm = TestMessageHelper.newJMSStreamMessage();
+ bm.writeString(null);
+ bm.writeObject(null);
+ bm.reset();
+ assertNull(bm.readObject());
+ assertNull(bm.readObject());
}
public static junit.framework.Test suite()
Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?view=diff&rev=489368&r1=489367&r2=489368
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Thu Dec 21 06:24:38 2006
@@ -94,25 +94,45 @@
protected void routeAndTest(Message m, TestQueue... expected) throws AMQException
{
- routeAndTest(m, Arrays.asList(expected));
+ routeAndTest(m, false, Arrays.asList(expected));
+ }
+
+ protected void routeAndTest(Message m, boolean expectReturn, TestQueue... expected) throws AMQException
+ {
+ routeAndTest(m, expectReturn, Arrays.asList(expected));
}
protected void routeAndTest(Message m, List<TestQueue> expected) throws AMQException
{
- route(m);
- for (TestQueue q : queues)
+ routeAndTest(m, false, expected);
+ }
+
+ protected void routeAndTest(Message m, boolean expectReturn, List<TestQueue> expected) throws AMQException
+ {
+ try
{
- if (expected.contains(q))
+ route(m);
+ assertFalse("Expected "+m+" to be returned due to manadatory flag, and lack of routing",expectReturn);
+ for (TestQueue q : queues)
{
- assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q));
- //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q;
- }
- else
- {
- assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q));
- //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q;
+ if (expected.contains(q))
+ {
+ assertTrue("Expected " + m + " to be delivered to " + q, m.isInQueue(q));
+ //assert m.isInQueue(q) : "Expected " + m + " to be delivered to " + q;
+ }
+ else
+ {
+ assertFalse("Did not expect " + m + " to be delivered to " + q, m.isInQueue(q));
+ //assert !m.isInQueue(q) : "Did not expect " + m + " to be delivered to " + q;
+ }
}
}
+
+ catch (NoRouteException ex)
+ {
+ assertTrue("Expected "+m+" not to be returned",expectReturn);
+ }
+
}
static FieldTable getHeaders(String... entries)
Modified: incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java?view=diff&rev=489368&r1=489367&r2=489368
==============================================================================
--- incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/systests/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java Thu Dec 21 06:24:38 2006
@@ -23,6 +23,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.util.TestApplicationRegistry;
+import org.apache.qpid.framing.BasicPublishBody;
public class HeadersExchangeTest extends AbstractHeadersExchangeTestBase
{
@@ -52,6 +53,19 @@
routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"),
q1, q2, q3, q4, q5, q6, q7, q8);
routeAndTest(new Message("Message6", "F0002"));
+
+ Message m7 = new Message("Message7", "XXXXX");
+
+ BasicPublishBody pb7 = m7.getPublishBody();
+ pb7.mandatory = true;
+ routeAndTest(m7,true);
+
+ Message m8 = new Message("Message8", "F0000");
+ BasicPublishBody pb8 = m8.getPublishBody();
+ pb8.mandatory = true;
+ routeAndTest(m8,false,q1);
+
+
}
public void testAny() throws AMQException
@@ -69,6 +83,20 @@
routeAndTest(new Message("Message4", "F0000", "F0001=Bear"), q1, q2, q3, q4, q6);
routeAndTest(new Message("Message5", "F0000=Aardvark", "F0001=Bear"), q1, q2, q3, q4, q6);
routeAndTest(new Message("Message6", "F0002"));
+ }
+
+ public void testMandatory() throws AMQException
+ {
+ TestQueue q1 = bindDefault("F0000");
+ Message m1 = new Message("Message1", "XXXXX");
+ Message m2 = new Message("Message2", "F0000");
+ BasicPublishBody pb1 = m1.getPublishBody();
+ pb1.mandatory = true;
+ BasicPublishBody pb2 = m1.getPublishBody();
+ pb2.mandatory = true;
+ routeAndTest(m1,true);
+
+
}
public static junit.framework.Test suite()