You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/09/19 17:13:38 UTC
svn commit: r1172657 [13/21] - in /qpid/branches/qpid-3346/qpid: ./ cpp/
cpp/bindings/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qpid/dotnet/
cpp/bindings/qpid/dotnet/examples/csharp.direct.receiver/Properties/
cpp/bindings/qpid/dotnet/examples/csha...
Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/XAResourceImpl.java Mon Sep 19 15:13:18 2011
@@ -21,10 +21,14 @@ import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
-import org.apache.qpid.AMQInvalidArgumentException;
import org.apache.qpid.dtx.XidImpl;
-import org.apache.qpid.transport.*;
-
+import org.apache.qpid.transport.DtxXaStatus;
+import org.apache.qpid.transport.ExecutionErrorCode;
+import org.apache.qpid.transport.Future;
+import org.apache.qpid.transport.Option;
+import org.apache.qpid.transport.RecoverResult;
+import org.apache.qpid.transport.SessionException;
+import org.apache.qpid.transport.XaResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -211,9 +215,28 @@ public class XAResourceImpl implements X
* @throws XAException An error has occurred. Possible exception values are XAER_RMERR, XAER_RMFAIL.
*/
public boolean isSameRM(XAResource xaResource) throws XAException
- {
- // TODO : get the server identity of xaResource and compare it with our own one
- return false;
+ {
+ if(this == xaResource)
+ {
+ return true;
+ }
+ if(!(xaResource instanceof XAResourceImpl))
+ {
+ return false;
+ }
+
+ XAResourceImpl other = (XAResourceImpl)xaResource;
+
+ String myUUID = ((AMQSession_0_10)_xaSession).getAMQConnection().getBrokerUUID();
+ String otherUUID = ((AMQSession_0_10)other._xaSession).getAMQConnection().getBrokerUUID();
+
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Comparing my UUID " + myUUID + " with other UUID " + otherUUID);
+ }
+
+ return (myUUID != null && otherUUID != null && myUUID.equals(otherUUID));
+
}
/**
Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/XASessionImpl.java Mon Sep 19 15:13:18 2011
@@ -52,7 +52,7 @@ public class XASessionImpl extends AMQSe
{
super(qpidConnection, con, channelId, false, // this is not a transacted session
Session.AUTO_ACKNOWLEDGE, // the ack mode is transacted
- MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow);
+ MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow,null);
createSession();
_xaResource = new XAResourceImpl(this);
}
Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java Mon Sep 19 15:13:18 2011
@@ -59,8 +59,8 @@ import org.slf4j.LoggerFactory;
* <tr><td> Automatically retry the continuation accross fail-overs until it succeeds, or raises an exception.
* </table>
*
- * @todo Another continuation. Could use an interface Continuation (as described in other todos, for example, see
- * {@link org.apache.qpid.pool.Job}). Then have a wrapping continuation (this), which blocks on an arbitrary
+ * @todo Another continuation. Could use an interface Continuation (as described in other todos)
+ * Then have a wrapping continuation (this), which blocks on an arbitrary
* Condition or Latch (specified in constructor call), that this blocks on before calling the wrapped Continuation.
* Must work on Java 1.4, so check retrotranslator works on Lock/Condition or latch first. Argument and return type
* to match wrapped condition as type parameters. Rename to AsyncConditionalContinuation or something like that.
Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ConnectionStartMethodHandler.java Mon Sep 19 15:13:18 2011
@@ -20,6 +20,13 @@
*/
package org.apache.qpid.client.handler;
+import java.io.UnsupportedEncodingException;
+import java.util.StringTokenizer;
+
+import javax.security.sasl.Sasl;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
import org.apache.qpid.client.security.AMQCallbackHandler;
@@ -34,18 +41,9 @@ import org.apache.qpid.framing.Connectio
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.FieldTableFactory;
import org.apache.qpid.framing.ProtocolVersion;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
-
-import java.io.UnsupportedEncodingException;
-import java.util.HashSet;
-import java.util.StringTokenizer;
-
public class ConnectionStartMethodHandler implements StateAwareMethodListener<ConnectionStartBody>
{
private static final Logger _log = LoggerFactory.getLogger(ConnectionStartMethodHandler.class);
@@ -197,40 +195,20 @@ public class ConnectionStartMethodHandle
private String chooseMechanism(byte[] availableMechanisms) throws UnsupportedEncodingException
{
final String mechanisms = new String(availableMechanisms, "utf8");
- StringTokenizer tokenizer = new StringTokenizer(mechanisms, " ");
- HashSet mechanismSet = new HashSet();
- while (tokenizer.hasMoreTokens())
- {
- mechanismSet.add(tokenizer.nextToken());
- }
-
- String preferredMechanisms = CallbackHandlerRegistry.getInstance().getMechanisms();
- StringTokenizer prefTokenizer = new StringTokenizer(preferredMechanisms, " ");
- while (prefTokenizer.hasMoreTokens())
- {
- String mech = prefTokenizer.nextToken();
- if (mechanismSet.contains(mech))
- {
- return mech;
- }
- }
-
- return null;
+ return CallbackHandlerRegistry.getInstance().selectMechanism(mechanisms);
}
private AMQCallbackHandler createCallbackHandler(String mechanism, AMQProtocolSession protocolSession)
throws AMQException
{
- Class mechanismClass = CallbackHandlerRegistry.getInstance().getCallbackHandlerClass(mechanism);
try
{
- Object instance = mechanismClass.newInstance();
- AMQCallbackHandler cbh = (AMQCallbackHandler) instance;
- cbh.initialise(protocolSession.getAMQConnection().getConnectionURL());
+ AMQCallbackHandler instance = CallbackHandlerRegistry.getInstance().createCallbackHandler(mechanism);
+ instance.initialise(protocolSession.getAMQConnection().getConnectionURL());
- return cbh;
+ return instance;
}
- catch (Exception e)
+ catch (IllegalArgumentException e)
{
throw new AMQException(null, "Unable to create callback handler: " + e, e);
}
Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegateFactory.java Mon Sep 19 15:13:18 2011
@@ -21,11 +21,6 @@
package org.apache.qpid.client.message;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.AMQException;
-
public interface AMQMessageDelegateFactory<D extends AMQMessageDelegate>
{
public static AMQMessageDelegateFactory DEFAULT_FACTORY = null;
Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java Mon Sep 19 15:13:18 2011
@@ -55,6 +55,9 @@ import org.apache.qpid.transport.Message
import org.apache.qpid.transport.MessageDeliveryPriority;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.ReplyTo;
+import org.apache.qpid.transport.TransportException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* This extends AbstractAMQMessageDelegate which contains common code between
@@ -63,6 +66,7 @@ import org.apache.qpid.transport.ReplyTo
*/
public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
{
+ private static final Logger _logger = LoggerFactory.getLogger(AMQMessageDelegate_0_10.class);
private static final Map<ReplyTo, SoftReference<Destination>> _destinationCache = Collections.synchronizedMap(new HashMap<ReplyTo, SoftReference<Destination>>());
public static final String JMS_TYPE = "x-jms-type";
@@ -95,8 +99,22 @@ public class AMQMessageDelegate_0_10 ext
AMQDestination dest;
- dest = generateDestination(new AMQShortString(_deliveryProps.getExchange()),
+ if (AMQDestination.getDefaultDestSyntax() == AMQDestination.DestSyntax.BURL)
+ {
+ dest = generateDestination(new AMQShortString(_deliveryProps.getExchange()),
new AMQShortString(_deliveryProps.getRoutingKey()));
+ }
+ else
+ {
+ String subject = null;
+ if (messageProps != null && messageProps.getApplicationHeaders() != null)
+ {
+ subject = (String)messageProps.getApplicationHeaders().get(QpidMessageProperties.QPID_SUBJECT);
+ }
+ dest = (AMQDestination) convertToAddressBasedDestination(_deliveryProps.getExchange(),
+ _deliveryProps.getRoutingKey(), subject);
+ }
+
setJMSDestination(dest);
}
@@ -242,13 +260,50 @@ public class AMQMessageDelegate_0_10 ext
String exchange = replyTo.getExchange();
String routingKey = replyTo.getRoutingKey();
- dest = generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey));
+ if (AMQDestination.getDefaultDestSyntax() == AMQDestination.DestSyntax.BURL)
+ {
+
+ dest = generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey));
+ }
+ else
+ {
+ dest = convertToAddressBasedDestination(exchange,routingKey,null);
+ }
_destinationCache.put(replyTo, new SoftReference<Destination>(dest));
}
return dest;
}
}
+
+ private Destination convertToAddressBasedDestination(String exchange, String routingKey, String subject)
+ {
+ String addr;
+ if ("".equals(exchange)) // type Queue
+ {
+ subject = (subject == null) ? "" : "/" + subject;
+ addr = routingKey + subject;
+ }
+ else
+ {
+ addr = exchange + "/" + routingKey;
+ }
+
+ try
+ {
+ return AMQDestination.createDestination("ADDR:" + addr.toString());
+ }
+ catch(Exception e)
+ {
+ // An exception is only thrown here if the address syntax is invalid.
+ // Logging the exception, but not throwing as this is only important to Qpid developers.
+ // An exception here means a bug in the code.
+ _logger.error("Exception when constructing an address string from the ReplyTo struct");
+
+ // falling back to the old way of doing it to ensure the application continues.
+ return generateDestination(new AMQShortString(exchange), new AMQShortString(routingKey));
+ }
+ }
public void setJMSReplyTo(Destination destination) throws JMSException
{
@@ -287,6 +342,14 @@ public class AMQMessageDelegate_0_10 ext
e.setLinkedException(ex);
throw e;
}
+ catch (TransportException e)
+ {
+ JMSException jmse = new JMSException("Exception occured while figuring out the node type:" + e.getMessage());
+ jmse.initCause(e);
+ jmse.setLinkedException(e);
+ throw jmse;
+ }
+
}
final ReplyTo replyTo = new ReplyTo(amqd.getExchangeName().toString(), amqd.getRoutingKey().toString());
@@ -337,7 +400,7 @@ public class AMQMessageDelegate_0_10 ext
Destination replyTo = getJMSReplyTo();
if(replyTo != null)
{
- return ((AMQDestination)replyTo).toURL();
+ return ((AMQDestination)replyTo).toString();
}
else
{
@@ -634,11 +697,16 @@ public class AMQMessageDelegate_0_10 ext
{
return new String(_messageProps.getUserId());
}
- else if ("x-amqp-0-10.app-id".equals(propertyName) &&
+ else if (QpidMessageProperties.AMQP_0_10_APP_ID.equals(propertyName) &&
_messageProps.getAppId() != null)
{
return new String(_messageProps.getAppId());
}
+ else if (QpidMessageProperties.AMQP_0_10_ROUTING_KEY.equals(propertyName) &&
+ _deliveryProps.getRoutingKey() != null)
+ {
+ return _deliveryProps.getRoutingKey();
+ }
else
{
checkPropertyName(propertyName);
@@ -745,7 +813,7 @@ public class AMQMessageDelegate_0_10 ext
{
checkPropertyName(propertyName);
checkWritableProperties();
- if ("x-amqp-0-10.app-id".equals(propertyName))
+ if (QpidMessageProperties.AMQP_0_10_APP_ID.equals(propertyName))
{
_messageProps.setAppId(value.getBytes());
}
Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java Mon Sep 19 15:13:18 2011
@@ -499,7 +499,6 @@ public class AMQMessageDelegate_0_8 exte
{
throw new MessageNotWriteableException("You need to call clearProperties() to make the message writable");
}
- _contentHeaderProperties.updated();
}
Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java Mon Sep 19 15:13:18 2011
@@ -24,11 +24,11 @@ package org.apache.qpid.client.message;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+import java.nio.ByteBuffer;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.transport.codec.BBDecoder;
import org.apache.qpid.transport.codec.BBEncoder;
@@ -81,18 +81,19 @@ public class AMQPEncodedMapMessage exten
@ Override
public ByteBuffer getData()
{
- writeMapToData();
- return _data;
+ BBEncoder encoder = new BBEncoder(1024);
+ encoder.writeMap(_map);
+ return encoder.segment();
}
@ Override
- protected void populateMapFromData() throws JMSException
+ protected void populateMapFromData(ByteBuffer data) throws JMSException
{
- if (_data != null)
+ if (data != null)
{
- _data.rewind();
+ data.rewind();
BBDecoder decoder = new BBDecoder();
- decoder.init(_data.buf());
+ decoder.init(data);
_map = decoder.readMap();
}
else
@@ -101,16 +102,8 @@ public class AMQPEncodedMapMessage exten
}
}
- @ Override
- protected void writeMapToData()
- {
- BBEncoder encoder = new BBEncoder(1024);
- encoder.writeMap(_map);
- _data = ByteBuffer.wrap(encoder.segment());
- }
-
// for testing
- Map<String,Object> getMap()
+ public Map<String,Object> getMap()
{
return _map;
}
Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java Mon Sep 19 15:13:18 2011
@@ -1,6 +1,6 @@
package org.apache.qpid.client.message;
/*
- *
+ *
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -8,22 +8,23 @@ package org.apache.qpid.client.message;
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
- *
+ *
*/
import javax.jms.JMSException;
-import org.apache.mina.common.ByteBuffer;
+import java.nio.ByteBuffer;
+
import org.apache.qpid.AMQException;
public class AMQPEncodedMapMessageFactory extends AbstractJMSMessageFactory
@@ -36,7 +37,7 @@ public class AMQPEncodedMapMessageFactor
return new AMQPEncodedMapMessage(delegate,data);
}
- @Override
+
public AbstractJMSMessage createMessage(
AMQMessageDelegateFactory delegateFactory) throws JMSException
{
Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractBytesTypedMessage.java Mon Sep 19 15:13:18 2011
@@ -21,784 +21,96 @@
package org.apache.qpid.client.message;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
+import java.nio.ByteBuffer;
import javax.jms.JMSException;
-import javax.jms.MessageEOFException;
-import javax.jms.MessageFormatException;
import javax.jms.MessageNotReadableException;
import javax.jms.MessageNotWriteableException;
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.transport.util.Functions;
/**
* @author Apache Software Foundation
*/
-public abstract class AbstractBytesTypedMessage extends AbstractBytesMessage
+public abstract class AbstractBytesTypedMessage extends AbstractJMSMessage
{
+ protected boolean _readableMessage = false;
- protected static final byte BOOLEAN_TYPE = (byte) 1;
-
- protected static final byte BYTE_TYPE = (byte) 2;
-
- protected static final byte BYTEARRAY_TYPE = (byte) 3;
-
- protected static final byte SHORT_TYPE = (byte) 4;
-
- protected static final byte CHAR_TYPE = (byte) 5;
-
- protected static final byte INT_TYPE = (byte) 6;
-
- protected static final byte LONG_TYPE = (byte) 7;
-
- protected static final byte FLOAT_TYPE = (byte) 8;
-
- protected static final byte DOUBLE_TYPE = (byte) 9;
-
- protected static final byte STRING_TYPE = (byte) 10;
-
- protected static final byte NULL_STRING_TYPE = (byte) 11;
-
- /**
- * This is set when reading a byte array. The readBytes(byte[]) method supports multiple calls to read
- * a byte array in multiple chunks, hence this is used to track how much is left to be read
- */
- private int _byteArrayRemaining = -1;
-
- AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory)
- {
-
- this(delegateFactory, null);
- }
-
- /**
- * Construct a stream message with existing data.
- *
- * @param delegateFactory
- * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
- */
- AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
+ AbstractBytesTypedMessage(AMQMessageDelegateFactory delegateFactory, boolean fromReceivedMessage)
{
- super(delegateFactory, data); // this instanties a content header
+ super(delegateFactory, fromReceivedMessage); // this instanties a content header
+ _readableMessage = fromReceivedMessage;
}
- AbstractBytesTypedMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
+ AbstractBytesTypedMessage(AMQMessageDelegate delegate, boolean fromReceivedMessage) throws AMQException
{
- super(delegate, data);
- }
+ super(delegate, fromReceivedMessage);
+ _readableMessage = fromReceivedMessage;
-
- protected byte readWireType() throws MessageFormatException, MessageEOFException,
- MessageNotReadableException
- {
- checkReadable();
- checkAvailable(1);
- return _data.get();
}
- protected void writeTypeDiscriminator(byte type) throws MessageNotWriteableException
+ protected void checkReadable() throws MessageNotReadableException
{
- checkWritable();
- _data.put(type);
- _changedData = true;
- }
-
- protected boolean readBoolean() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- boolean result;
- try
+ if (!_readableMessage)
{
- switch (wireType)
- {
- case BOOLEAN_TYPE:
- checkAvailable(1);
- result = readBooleanImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Boolean.parseBoolean(readStringImpl());
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a boolean");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
+ throw new MessageNotReadableException("You need to call reset() to make the message readable");
}
}
- private boolean readBooleanImpl()
+ @Override
+ protected void checkWritable() throws MessageNotWriteableException
{
- return _data.get() != 0;
- }
-
- protected byte readByte() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- byte result;
- try
+ super.checkWritable();
+ if(_readableMessage)
{
- switch (wireType)
- {
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Byte.parseByte(readStringImpl());
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a byte");
- }
+ throw new MessageNotWriteableException("You need to call clearBody() to make the message writable");
}
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- return result;
}
- private byte readByteImpl()
+ public void clearBody() throws JMSException
{
- return _data.get();
+ super.clearBody();
+ _readableMessage = false;
}
- protected short readShort() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- short result;
- try
- {
- switch (wireType)
- {
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Short.parseShort(readStringImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a short");
- }
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- return result;
- }
- private short readShortImpl()
+ public String toBodyString() throws JMSException
{
- return _data.getShort();
- }
-
- /**
- * Note that this method reads a unicode character as two bytes from the stream
- *
- * @return the character read from the stream
- * @throws javax.jms.JMSException
- */
- protected char readChar() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
try
{
- if(wireType == NULL_STRING_TYPE){
- throw new NullPointerException();
+ ByteBuffer data = getData();
+ if (data != null)
+ {
+ return Functions.str(data, 100, 0);
+ }
+ else
+ {
+ return "";
}
- if (wireType != CHAR_TYPE)
- {
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a char");
- }
- else
- {
- checkAvailable(2);
- return readCharImpl();
- }
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- private char readCharImpl()
- {
- return _data.getChar();
- }
-
- protected int readInt() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- int result;
- try
- {
- switch (wireType)
- {
- case INT_TYPE:
- checkAvailable(4);
- result = readIntImpl();
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Integer.parseInt(readStringImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to an int");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- protected int readIntImpl()
- {
- return _data.getInt();
- }
-
- protected long readLong() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- long result;
- try
- {
- switch (wireType)
- {
- case LONG_TYPE:
- checkAvailable(8);
- result = readLongImpl();
- break;
- case INT_TYPE:
- checkAvailable(4);
- result = readIntImpl();
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Long.parseLong(readStringImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a long");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- private long readLongImpl()
- {
- return _data.getLong();
- }
-
- protected float readFloat() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- float result;
- try
- {
- switch (wireType)
- {
- case FLOAT_TYPE:
- checkAvailable(4);
- result = readFloatImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Float.parseFloat(readStringImpl());
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a float");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- private float readFloatImpl()
- {
- return _data.getFloat();
- }
-
- protected double readDouble() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- double result;
- try
- {
- switch (wireType)
- {
- case DOUBLE_TYPE:
- checkAvailable(8);
- result = readDoubleImpl();
- break;
- case FLOAT_TYPE:
- checkAvailable(4);
- result = readFloatImpl();
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = Double.parseDouble(readStringImpl());
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a double");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- private double readDoubleImpl()
- {
- return _data.getDouble();
- }
-
- protected String readString() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- String result;
- try
- {
- switch (wireType)
- {
- case STRING_TYPE:
- checkAvailable(1);
- result = readStringImpl();
- break;
- case NULL_STRING_TYPE:
- result = null;
- throw new NullPointerException("data is null");
- case BOOLEAN_TYPE:
- checkAvailable(1);
- result = String.valueOf(readBooleanImpl());
- break;
- case LONG_TYPE:
- checkAvailable(8);
- result = String.valueOf(readLongImpl());
- break;
- case INT_TYPE:
- checkAvailable(4);
- result = String.valueOf(readIntImpl());
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = String.valueOf(readShortImpl());
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = String.valueOf(readByteImpl());
- break;
- case FLOAT_TYPE:
- checkAvailable(4);
- result = String.valueOf(readFloatImpl());
- break;
- case DOUBLE_TYPE:
- checkAvailable(8);
- result = String.valueOf(readDoubleImpl());
- break;
- case CHAR_TYPE:
- checkAvailable(2);
- result = String.valueOf(readCharImpl());
- break;
- default:
- _data.position(position);
- throw new MessageFormatException("Unable to convert " + wireType + " to a String");
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- protected String readStringImpl() throws JMSException
- {
- try
- {
- return _data.getString(Charset.forName("UTF-8").newDecoder());
}
- catch (CharacterCodingException e)
+ catch (Exception e)
{
- JMSException jmse = new JMSException("Error decoding byte stream as a UTF8 string: " + e);
+ JMSException jmse = new JMSException(e.toString());
jmse.setLinkedException(e);
jmse.initCause(e);
throw jmse;
}
- }
-
- protected int readBytes(byte[] bytes) throws JMSException
- {
- if (bytes == null)
- {
- throw new IllegalArgumentException("byte array must not be null");
- }
- checkReadable();
- // first call
- if (_byteArrayRemaining == -1)
- {
- // type discriminator checked separately so you get a MessageFormatException rather than
- // an EOF even in the case where both would be applicable
- checkAvailable(1);
- byte wireType = readWireType();
- if (wireType != BYTEARRAY_TYPE)
- {
- throw new MessageFormatException("Unable to convert " + wireType + " to a byte array");
- }
- checkAvailable(4);
- int size = _data.getInt();
- // length of -1 indicates null
- if (size == -1)
- {
- return -1;
- }
- else
- {
- if (size > _data.remaining())
- {
- throw new MessageEOFException("Byte array has stated length " + size + " but message only contains " +
- _data.remaining() + " bytes");
- }
- else
- {
- _byteArrayRemaining = size;
- }
- }
- }
- else if (_byteArrayRemaining == 0)
- {
- _byteArrayRemaining = -1;
- return -1;
- }
-
- int returnedSize = readBytesImpl(bytes);
- if (returnedSize < bytes.length)
- {
- _byteArrayRemaining = -1;
- }
- return returnedSize;
- }
-
- private int readBytesImpl(byte[] bytes)
- {
- int count = (_byteArrayRemaining >= bytes.length ? bytes.length : _byteArrayRemaining);
- _byteArrayRemaining -= count;
-
- if (count == 0)
- {
- return 0;
- }
- else
- {
- _data.get(bytes, 0, count);
- return count;
- }
- }
-
- protected Object readObject() throws JMSException
- {
- int position = _data.position();
- byte wireType = readWireType();
- Object result = null;
- try
- {
- switch (wireType)
- {
- case BOOLEAN_TYPE:
- checkAvailable(1);
- result = readBooleanImpl();
- break;
- case BYTE_TYPE:
- checkAvailable(1);
- result = readByteImpl();
- break;
- case BYTEARRAY_TYPE:
- checkAvailable(4);
- int size = _data.getInt();
- if (size == -1)
- {
- result = null;
- }
- else
- {
- _byteArrayRemaining = size;
- byte[] bytesResult = new byte[size];
- readBytesImpl(bytesResult);
- result = bytesResult;
- }
- break;
- case SHORT_TYPE:
- checkAvailable(2);
- result = readShortImpl();
- break;
- case CHAR_TYPE:
- checkAvailable(2);
- result = readCharImpl();
- break;
- case INT_TYPE:
- checkAvailable(4);
- result = readIntImpl();
- break;
- case LONG_TYPE:
- checkAvailable(8);
- result = readLongImpl();
- break;
- case FLOAT_TYPE:
- checkAvailable(4);
- result = readFloatImpl();
- break;
- case DOUBLE_TYPE:
- checkAvailable(8);
- result = readDoubleImpl();
- break;
- case NULL_STRING_TYPE:
- result = null;
- break;
- case STRING_TYPE:
- checkAvailable(1);
- result = readStringImpl();
- break;
- }
- return result;
- }
- catch (RuntimeException e)
- {
- _data.position(position);
- throw e;
- }
- }
-
- protected void writeBoolean(boolean b) throws JMSException
- {
- writeTypeDiscriminator(BOOLEAN_TYPE);
- _data.put(b ? (byte) 1 : (byte) 0);
- }
-
- protected void writeByte(byte b) throws JMSException
- {
- writeTypeDiscriminator(BYTE_TYPE);
- _data.put(b);
- }
-
- protected void writeShort(short i) throws JMSException
- {
- writeTypeDiscriminator(SHORT_TYPE);
- _data.putShort(i);
- }
-
- protected void writeChar(char c) throws JMSException
- {
- writeTypeDiscriminator(CHAR_TYPE);
- _data.putChar(c);
- }
-
- protected void writeInt(int i) throws JMSException
- {
- writeTypeDiscriminator(INT_TYPE);
- writeIntImpl(i);
- }
-
- protected void writeIntImpl(int i)
- {
- _data.putInt(i);
- }
-
- protected void writeLong(long l) throws JMSException
- {
- writeTypeDiscriminator(LONG_TYPE);
- _data.putLong(l);
- }
- protected void writeFloat(float v) throws JMSException
- {
- writeTypeDiscriminator(FLOAT_TYPE);
- _data.putFloat(v);
}
- protected void writeDouble(double v) throws JMSException
- {
- writeTypeDiscriminator(DOUBLE_TYPE);
- _data.putDouble(v);
- }
- protected void writeString(String string) throws JMSException
- {
- if (string == null)
- {
- writeTypeDiscriminator(NULL_STRING_TYPE);
- }
- else
- {
- writeTypeDiscriminator(STRING_TYPE);
- try
- {
- writeStringImpl(string);
- }
- catch (CharacterCodingException e)
- {
- JMSException jmse = new JMSException("Unable to encode string: " + e);
- jmse.setLinkedException(e);
- jmse.initCause(e);
- throw jmse;
- }
- }
- }
-
- protected void writeStringImpl(String string)
- throws CharacterCodingException
- {
- _data.putString(string, Charset.forName("UTF-8").newEncoder());
- // we must write the null terminator ourselves
- _data.put((byte) 0);
- }
+ abstract public void reset();
- protected void writeBytes(byte[] bytes) throws JMSException
- {
- writeBytes(bytes, 0, bytes == null ? 0 : bytes.length);
- }
- protected void writeBytes(byte[] bytes, int offset, int length) throws JMSException
- {
- writeTypeDiscriminator(BYTEARRAY_TYPE);
- if (bytes == null)
- {
- _data.putInt(-1);
- }
- else
- {
- _data.putInt(length);
- _data.put(bytes, offset, length);
- }
- }
- protected void writeObject(Object object) throws JMSException
- {
- checkWritable();
- Class clazz;
- if (object == null)
- {
- // string handles the output of null values
- clazz = String.class;
- }
- else
- {
- clazz = object.getClass();
- }
-
- if (clazz == Byte.class)
- {
- writeByte((Byte) object);
- }
- else if (clazz == Boolean.class)
- {
- writeBoolean((Boolean) object);
- }
- else if (clazz == byte[].class)
- {
- writeBytes((byte[]) object);
- }
- else if (clazz == Short.class)
- {
- writeShort((Short) object);
- }
- else if (clazz == Character.class)
- {
- writeChar((Character) object);
- }
- else if (clazz == Integer.class)
- {
- writeInt((Integer) object);
- }
- else if (clazz == Long.class)
- {
- writeLong((Long) object);
- }
- else if (clazz == Float.class)
- {
- writeFloat((Float) object);
- }
- else if (clazz == Double.class)
- {
- writeDouble((Double) object);
- }
- else if (clazz == String.class)
- {
- writeString((String) object);
- }
- else
- {
- throw new MessageFormatException("Only primitives plus byte arrays and String are valid types");
- }
- }
}
Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Mon Sep 19 15:13:18 2011
@@ -20,66 +20,38 @@
*/
package org.apache.qpid.client.message;
-import java.io.IOException;
+import java.nio.ByteBuffer;
import java.util.Enumeration;
import java.util.UUID;
import javax.jms.Destination;
import javax.jms.JMSException;
-import javax.jms.MessageNotReadableException;
import javax.jms.MessageNotWriteableException;
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
public abstract class AbstractJMSMessage implements org.apache.qpid.jms.Message
{
-
- protected ByteBuffer _data;
- protected boolean _readableMessage = false;
- protected boolean _changedData = true;
-
/** If the acknowledge mode is CLIENT_ACKNOWLEDGE the session is required */
-
-
-
protected AMQMessageDelegate _delegate;
private boolean _redelivered;
+ private boolean _receivedFromServer;
- protected AbstractJMSMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
+ protected AbstractJMSMessage(AMQMessageDelegateFactory delegateFactory, boolean fromReceivedData)
{
_delegate = delegateFactory.createDelegate();
- _data = data;
- if (_data != null)
- {
- _data.acquire();
- }
-
-
- _readableMessage = (data != null);
- _changedData = (data == null);
-
+ setContentType(getMimeType());
}
- protected AbstractJMSMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
+ protected AbstractJMSMessage(AMQMessageDelegate delegate, boolean fromReceivedData) throws AMQException
{
_delegate = delegate;
-
- _data = data;
- if (_data != null)
- {
- _data.acquire();
- }
-
- _readableMessage = data != null;
-
+ setContentType(getMimeType());
}
public String getJMSMessageID() throws JMSException
@@ -329,12 +301,9 @@ public abstract class AbstractJMSMessage
public void clearBody() throws JMSException
{
- clearBodyImpl();
- _readableMessage = false;
-
+ _receivedFromServer = false;
}
-
public void acknowledgeThis() throws JMSException
{
_delegate.acknowledgeThis();
@@ -345,14 +314,7 @@ public abstract class AbstractJMSMessage
_delegate.acknowledge();
}
- /**
- * This forces concrete classes to implement clearBody()
- *
- * @throws JMSException
- */
- public abstract void clearBodyImpl() throws JMSException;
-
- /**
+ /*
* Get a String representation of the body of the message. Used in the toString() method which outputs this before
* message properties.
*/
@@ -413,63 +375,24 @@ public abstract class AbstractJMSMessage
return _delegate;
}
- public ByteBuffer getData()
- {
- // make sure we rewind the data just in case any method has moved the
- // position beyond the start
- if (_data != null)
- {
- reset();
- }
+ abstract public ByteBuffer getData() throws JMSException;
- return _data;
- }
-
- protected void checkReadable() throws MessageNotReadableException
- {
- if (!_readableMessage)
- {
- throw new MessageNotReadableException("You need to call reset() to make the message readable");
- }
- }
protected void checkWritable() throws MessageNotWriteableException
{
- if (_readableMessage)
+ if (_receivedFromServer)
{
throw new MessageNotWriteableException("You need to call clearBody() to make the message writable");
}
}
- public void reset()
- {
- if (!_changedData)
- {
- _data.rewind();
- }
- else
- {
- _data.flip();
- _changedData = false;
- }
- }
- public int getContentLength()
+ public void setReceivedFromServer()
{
- if(_data != null)
- {
- return _data.remaining();
- }
- else
- {
- return 0;
- }
+ _receivedFromServer = true;
}
- public void receivedFromServer()
- {
- _changedData = false;
- }
+
/**
* The session is set when CLIENT_ACKNOWLEDGE mode is used so that the CHANNEL ACK can be sent when the user calls
Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessageFactory.java Mon Sep 19 15:13:18 2011
@@ -20,8 +20,6 @@
*/
package org.apache.qpid.client.message;
-import org.apache.mina.common.ByteBuffer;
-
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.ContentBody;
@@ -38,6 +36,8 @@ import javax.jms.JMSException;
import java.util.Iterator;
import java.util.List;
+import java.nio.ByteBuffer;
+
public abstract class AbstractJMSMessageFactory implements MessageFactory
{
private static final Logger _logger = LoggerFactory.getLogger(AbstractJMSMessageFactory.class);
@@ -57,7 +57,7 @@ public abstract class AbstractJMSMessage
_logger.debug("Non-fragmented message body (bodySize=" + contentHeader.bodySize + ")");
}
- data = ((ContentBody) bodies.get(0)).payload;
+ data = ByteBuffer.wrap(((ContentBody) bodies.get(0))._payload);
}
else if (bodies != null)
{
@@ -72,7 +72,7 @@ public abstract class AbstractJMSMessage
while (it.hasNext())
{
ContentBody cb = (ContentBody) it.next();
- final ByteBuffer payload = cb.payload;
+ final ByteBuffer payload = ByteBuffer.wrap(cb._payload);
if(payload.isDirect() || payload.isReadOnly())
{
data.put(payload);
@@ -82,7 +82,6 @@ public abstract class AbstractJMSMessage
data.put(payload.array(), payload.arrayOffset(), payload.limit());
}
- payload.release();
}
data.flip();
@@ -109,7 +108,7 @@ public abstract class AbstractJMSMessage
protected AbstractJMSMessage create010MessageWithBody(long messageNbr, MessageProperties msgProps,
- DeliveryProperties deliveryProps,
+ DeliveryProperties deliveryProps,
java.nio.ByteBuffer body) throws AMQException
{
ByteBuffer data;
@@ -118,7 +117,7 @@ public abstract class AbstractJMSMessage
if (body != null)
{
- data = ByteBuffer.wrap(body);
+ data = body;
}
else // body == null
{
@@ -155,7 +154,7 @@ public abstract class AbstractJMSMessage
{
final AbstractJMSMessage msg = create08MessageWithBody(messageNbr, contentHeader, exchange, routingKey, bodies);
msg.setJMSRedelivered(redelivered);
- msg.receivedFromServer();
+ msg.setReceivedFromServer();
return msg;
}
@@ -166,7 +165,7 @@ public abstract class AbstractJMSMessage
final AbstractJMSMessage msg =
create010MessageWithBody(messageNbr,msgProps,deliveryProps, body);
msg.setJMSRedelivered(redelivered);
- msg.receivedFromServer();
+ msg.setReceivedFromServer();
return msg;
}
Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java Mon Sep 19 15:13:18 2011
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.client.message;
+import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.nio.charset.CharacterCodingException;
import java.nio.charset.Charset;
@@ -28,47 +29,56 @@ import java.nio.charset.CharsetEncoder;
import javax.jms.BytesMessage;
import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
import javax.jms.MessageFormatException;
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
-public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage
+public class JMSBytesMessage extends AbstractBytesTypedMessage implements BytesMessage
{
public static final String MIME_TYPE = "application/octet-stream";
+ private TypedBytesContentReader _typedBytesContentReader;
+ private TypedBytesContentWriter _typedBytesContentWriter;
- public JMSBytesMessage(AMQMessageDelegateFactory delegateFactory)
- {
- this(delegateFactory,null);
-
- }
- /**
- * Construct a bytes message with existing data.
- *
- * @param delegateFactory
- * @param data the data that comprises this message. If data is null, you get a 1024 byte buffer that is
- */
- JMSBytesMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
+ public JMSBytesMessage(AMQMessageDelegateFactory delegateFactory)
{
-
- super(delegateFactory, data); // this instanties a content header
+ super(delegateFactory,false);
+ _typedBytesContentWriter = new TypedBytesContentWriter();
}
JMSBytesMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
{
- super(delegate, data);
+ super(delegate, data!=null);
+ _typedBytesContentReader = new TypedBytesContentReader(data);
}
public void reset()
{
- super.reset();
_readableMessage = true;
+
+ if(_typedBytesContentReader != null)
+ {
+ _typedBytesContentReader.reset();
+ }
+ else if (_typedBytesContentWriter != null)
+ {
+ _typedBytesContentReader = new TypedBytesContentReader(_typedBytesContentWriter.getData());
+ }
+ }
+
+ @Override
+ public void clearBody() throws JMSException
+ {
+ super.clearBody();
+ _typedBytesContentReader = null;
+ _typedBytesContentWriter = new TypedBytesContentWriter();
+
}
protected String getMimeType()
@@ -76,45 +86,57 @@ public class JMSBytesMessage extends Abs
return MIME_TYPE;
}
+ @Override
+ public java.nio.ByteBuffer getData() throws JMSException
+ {
+ return _typedBytesContentWriter == null ? _typedBytesContentReader.getData() : _typedBytesContentWriter.getData();
+ }
+
public long getBodyLength() throws JMSException
{
checkReadable();
- return _data.limit();
+ return _typedBytesContentReader.size();
}
public boolean readBoolean() throws JMSException
{
checkReadable();
checkAvailable(1);
- return _data.get() != 0;
+
+ return _typedBytesContentReader.readBooleanImpl();
+ }
+
+ private void checkAvailable(final int i) throws MessageEOFException
+ {
+ _typedBytesContentReader.checkAvailable(1);
}
public byte readByte() throws JMSException
{
checkReadable();
checkAvailable(1);
- return _data.get();
+ return _typedBytesContentReader.readByteImpl();
}
public int readUnsignedByte() throws JMSException
{
checkReadable();
checkAvailable(1);
- return _data.getUnsigned();
+ return _typedBytesContentReader.readByteImpl() & 0xFF;
}
public short readShort() throws JMSException
{
checkReadable();
checkAvailable(2);
- return _data.getShort();
+ return _typedBytesContentReader.readShortImpl();
}
public int readUnsignedShort() throws JMSException
{
checkReadable();
checkAvailable(2);
- return _data.getUnsignedShort();
+ return _typedBytesContentReader.readShortImpl() & 0xFFFF;
}
/**
@@ -127,35 +149,35 @@ public class JMSBytesMessage extends Abs
{
checkReadable();
checkAvailable(2);
- return _data.getChar();
+ return _typedBytesContentReader.readCharImpl();
}
public int readInt() throws JMSException
{
checkReadable();
checkAvailable(4);
- return _data.getInt();
+ return _typedBytesContentReader.readIntImpl();
}
public long readLong() throws JMSException
{
checkReadable();
checkAvailable(8);
- return _data.getLong();
+ return _typedBytesContentReader.readLongImpl();
}
public float readFloat() throws JMSException
{
checkReadable();
checkAvailable(4);
- return _data.getFloat();
+ return _typedBytesContentReader.readFloatImpl();
}
public double readDouble() throws JMSException
{
checkReadable();
checkAvailable(8);
- return _data.getDouble();
+ return _typedBytesContentReader.readDoubleImpl();
}
public String readUTF() throws JMSException
@@ -164,34 +186,7 @@ public class JMSBytesMessage extends Abs
// we check only for one byte since theoretically the string could be only a
// single byte when using UTF-8 encoding
- try
- {
- short length = readShort();
- if(length == 0)
- {
- return "";
- }
- else
- {
- CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
- ByteBuffer encodedString = _data.slice();
- encodedString.limit(length);
- _data.position(_data.position()+length);
- CharBuffer string = decoder.decode(encodedString.buf());
-
- return string.toString();
- }
-
-
-
- }
- catch (CharacterCodingException e)
- {
- JMSException jmse = new JMSException("Error decoding byte stream as a UTF8 string: " + e);
- jmse.setLinkedException(e);
- jmse.initCause(e);
- throw jmse;
- }
+ return _typedBytesContentReader.readLengthPrefixedUTF();
}
public int readBytes(byte[] bytes) throws JMSException
@@ -201,14 +196,14 @@ public class JMSBytesMessage extends Abs
throw new IllegalArgumentException("byte array must not be null");
}
checkReadable();
- int count = (_data.remaining() >= bytes.length ? bytes.length : _data.remaining());
+ int count = (_typedBytesContentReader.remaining() >= bytes.length ? bytes.length : _typedBytesContentReader.remaining());
if (count == 0)
{
return -1;
}
else
{
- _data.get(bytes, 0, count);
+ _typedBytesContentReader.readRawBytes(bytes, 0, count);
return count;
}
}
@@ -224,110 +219,82 @@ public class JMSBytesMessage extends Abs
throw new IllegalArgumentException("maxLength must be <= bytes.length");
}
checkReadable();
- int count = (_data.remaining() >= maxLength ? maxLength : _data.remaining());
+ int count = (_typedBytesContentReader.remaining() >= maxLength ? maxLength : _typedBytesContentReader.remaining());
if (count == 0)
{
return -1;
}
else
{
- _data.get(bytes, 0, count);
+ _typedBytesContentReader.readRawBytes(bytes, 0, count);
return count;
}
}
+
public void writeBoolean(boolean b) throws JMSException
{
checkWritable();
- _changedData = true;
- _data.put(b ? (byte) 1 : (byte) 0);
+ _typedBytesContentWriter.writeBooleanImpl(b);
}
public void writeByte(byte b) throws JMSException
{
checkWritable();
- _changedData = true;
- _data.put(b);
+ _typedBytesContentWriter.writeByteImpl(b);
}
public void writeShort(short i) throws JMSException
{
checkWritable();
- _changedData = true;
- _data.putShort(i);
+ _typedBytesContentWriter.writeShortImpl(i);
}
public void writeChar(char c) throws JMSException
{
checkWritable();
- _changedData = true;
- _data.putChar(c);
+ _typedBytesContentWriter.writeCharImpl(c);
}
public void writeInt(int i) throws JMSException
{
checkWritable();
- _changedData = true;
- _data.putInt(i);
+ _typedBytesContentWriter.writeIntImpl(i);
}
public void writeLong(long l) throws JMSException
{
checkWritable();
- _changedData = true;
- _data.putLong(l);
+ _typedBytesContentWriter.writeLongImpl(l);
}
public void writeFloat(float v) throws JMSException
{
checkWritable();
- _changedData = true;
- _data.putFloat(v);
+ _typedBytesContentWriter.writeFloatImpl(v);
}
public void writeDouble(double v) throws JMSException
{
checkWritable();
- _changedData = true;
- _data.putDouble(v);
+ _typedBytesContentWriter.writeDoubleImpl(v);
}
public void writeUTF(String string) throws JMSException
{
checkWritable();
- try
- {
- CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
- java.nio.ByteBuffer encodedString = encoder.encode(CharBuffer.wrap(string));
-
- _data.putShort((short)encodedString.limit());
- _data.put(encodedString);
- _changedData = true;
- //_data.putString(string, Charset.forName("UTF-8").newEncoder());
- // we must add the null terminator manually
- //_data.put((byte)0);
- }
- catch (CharacterCodingException e)
- {
- JMSException jmse = new JMSException("Unable to encode string: " + e);
- jmse.setLinkedException(e);
- jmse.initCause(e);
- throw jmse;
- }
+ _typedBytesContentWriter.writeLengthPrefixedUTF(string);
}
public void writeBytes(byte[] bytes) throws JMSException
{
- checkWritable();
- _data.put(bytes);
- _changedData = true;
+ writeBytes(bytes, 0, bytes.length);
}
public void writeBytes(byte[] bytes, int offset, int length) throws JMSException
{
checkWritable();
- _data.put(bytes, offset, length);
- _changedData = true;
+ _typedBytesContentWriter.writeBytesRaw(bytes, offset, length);
}
public void writeObject(Object object) throws JMSException
Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessageFactory.java Mon Sep 19 15:13:18 2011
@@ -22,11 +22,12 @@ package org.apache.qpid.client.message;
import javax.jms.JMSException;
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
+import java.nio.ByteBuffer;
+
public class JMSBytesMessageFactory extends AbstractJMSMessageFactory
{
protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSHeaderAdapter.java Mon Sep 19 15:13:18 2011
@@ -20,12 +20,15 @@
*/
package org.apache.qpid.client.message;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
import java.util.Enumeration;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
-import org.apache.mina.common.ByteBuffer;
+import java.nio.ByteBuffer;
import org.apache.qpid.AMQPInvalidClassException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
@@ -282,7 +285,7 @@ public final class JMSHeaderAdapter
s = String.valueOf(o);
}
}
- }//else return s // null;
+ }//else return s // null;
}
return s;
@@ -458,9 +461,29 @@ public final class JMSHeaderAdapter
return getHeaders().isEmpty();
}
- public void writeToBuffer(ByteBuffer data)
+ public void writeToBuffer(final ByteBuffer data)
{
- getHeaders().writeToBuffer(data);
+ try
+ {
+ getHeaders().writeToBuffer(new DataOutputStream(new OutputStream()
+ {
+ @Override
+ public void write(final int b)
+ {
+ data.put((byte)b);
+ }
+
+ @Override
+ public void write(final byte[] b, final int off, final int len)
+ {
+ data.put(b, off, len);
+ }
+ }));
+ }
+ catch (IOException e)
+ {
+ throw new IllegalArgumentException("Unexpected IO Exception - should never happen", e);
+ }
}
public Enumeration getMapNames()
Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java Mon Sep 19 15:13:18 2011
@@ -20,11 +20,8 @@
*/
package org.apache.qpid.client.message;
-import org.apache.mina.common.ByteBuffer;
-
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,13 +29,14 @@ import org.slf4j.LoggerFactory;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
+import java.nio.ByteBuffer;
import java.nio.charset.CharacterCodingException;
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Map;
-public class JMSMapMessage extends AbstractBytesTypedMessage implements javax.jms.MapMessage
+public class JMSMapMessage extends AbstractJMSMessage implements javax.jms.MapMessage
{
private static final Logger _logger = LoggerFactory.getLogger(JMSMapMessage.class);
@@ -54,10 +52,10 @@ public class JMSMapMessage extends Abstr
JMSMapMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) throws JMSException
{
- super(delegateFactory, data); // this instantiates a content header
+ super(delegateFactory, data!=null); // this instantiates a content header
if(data != null)
{
- populateMapFromData();
+ populateMapFromData(data);
}
}
@@ -65,10 +63,10 @@ public class JMSMapMessage extends Abstr
JMSMapMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
{
- super(delegate, data);
+ super(delegate, data != null);
try
{
- populateMapFromData();
+ populateMapFromData(data);
}
catch (JMSException je)
{
@@ -89,18 +87,10 @@ public class JMSMapMessage extends Abstr
return MIME_TYPE;
}
- public ByteBuffer getData()
- {
- // What if _data is null?
- writeMapToData();
-
- return super.getData();
- }
-
@Override
- public void clearBodyImpl() throws JMSException
+ public void clearBody() throws JMSException
{
- super.clearBodyImpl();
+ super.clearBody();
_map.clear();
}
@@ -458,17 +448,18 @@ public class JMSMapMessage extends Abstr
return _map.containsKey(propName);
}
- protected void populateMapFromData() throws JMSException
+ protected void populateMapFromData(ByteBuffer data) throws JMSException
{
- if (_data != null)
+ TypedBytesContentReader reader = new TypedBytesContentReader(data);
+ if (data != null)
{
- _data.rewind();
+ data.rewind();
- final int entries = readIntImpl();
+ final int entries = reader.readIntImpl();
for (int i = 0; i < entries; i++)
{
- String propName = readStringImpl();
- Object value = readObject();
+ String propName = reader.readStringImpl();
+ Object value = reader.readObject();
_map.put(propName, value);
}
}
@@ -478,35 +469,21 @@ public class JMSMapMessage extends Abstr
}
}
- protected void writeMapToData()
+ public ByteBuffer getData()
+ throws JMSException
{
- allocateInitialBuffer();
+ TypedBytesContentWriter writer = new TypedBytesContentWriter();
+
final int size = _map.size();
- writeIntImpl(size);
+ writer.writeIntImpl(size);
for (Map.Entry<String, Object> entry : _map.entrySet())
{
- try
- {
- writeStringImpl(entry.getKey());
- }
- catch (CharacterCodingException e)
- {
- throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey(), e);
-
- }
+ writer.writeNullTerminatedStringImpl(entry.getKey());
- try
- {
- writeObject(entry.getValue());
- }
- catch (JMSException e)
- {
- Object value = entry.getValue();
- throw new IllegalArgumentException("Cannot encode property key name " + entry.getKey() + " value : " + value
- + " (type: " + value.getClass().getName() + ").", e);
- }
+ writer.writeObject(entry.getValue());
}
+ return writer.getData();
}
}
Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessageFactory.java Mon Sep 19 15:13:18 2011
@@ -14,18 +14,16 @@
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
- * under the License.
+ * under the License.
+ *
*
- *
*/
package org.apache.qpid.client.message;
import javax.jms.JMSException;
-import org.apache.mina.common.ByteBuffer;
+import java.nio.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
public class JMSMapMessageFactory extends AbstractJMSMessageFactory
{
Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java Mon Sep 19 15:13:18 2011
@@ -20,26 +20,28 @@
*/
package org.apache.qpid.client.message;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.io.Serializable;
+import java.io.*;
+import java.nio.ByteBuffer;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
import javax.jms.ObjectMessage;
-import org.apache.mina.common.ByteBuffer;
-
import org.apache.qpid.AMQException;
+import org.apache.qpid.client.util.ClassLoadingAwareObjectInputStream;
public class JMSObjectMessage extends AbstractJMSMessage implements ObjectMessage
{
public static final String MIME_TYPE = "application/java-object-stream";
+ private static final int DEFAULT_OUTPUT_BUFFER_SIZE = 256;
+
+ private Serializable _readData;
+ private ByteBuffer _data;
+ private Exception _exception;
+
+ private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
- private static final int DEFAULT_BUFFER_SIZE = 1024;
/**
* Creates empty, writable message for use by producers
@@ -47,41 +49,57 @@ public class JMSObjectMessage extends Ab
*/
public JMSObjectMessage(AMQMessageDelegateFactory delegateFactory)
{
- this(delegateFactory, null);
- }
-
- private JMSObjectMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
- {
- super(delegateFactory, data);
- if (data == null)
- {
- _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
- _data.setAutoExpand(true);
- }
-
- setContentType(getMimeType());
+ super(delegateFactory, false);
}
/**
* Creates read only message for delivery to consumers
*/
- JMSObjectMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
+ JMSObjectMessage(AMQMessageDelegate delegate, final ByteBuffer data) throws AMQException
{
- super(delegate, data);
+ super(delegate, data!=null);
+
+ try
+ {
+ ClassLoadingAwareObjectInputStream in = new ClassLoadingAwareObjectInputStream(new InputStream()
+ {
+
+
+ @Override
+ public int read() throws IOException
+ {
+ return data.get();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException
+ {
+ len = data.remaining() < len ? data.remaining() : len;
+ data.get(b, off, len);
+ return len;
+ }
+ });
+
+ _readData = (Serializable) in.readObject();
+ }
+ catch (IOException e)
+ {
+ _exception = e;
+ }
+ catch (ClassNotFoundException e)
+ {
+ _exception = e;
+ }
}
- public void clearBodyImpl() throws JMSException
+ public void clearBody() throws JMSException
{
- if (_data != null)
- {
- _data.release();
- _data = null;
- }
-
-
-
+ super.clearBody();
+ _exception = null;
+ _readData = null;
+ _data = null;
}
public String toBodyString() throws JMSException
@@ -94,83 +112,116 @@ public class JMSObjectMessage extends Ab
return MIME_TYPE;
}
- public void setObject(Serializable serializable) throws JMSException
+ @Override
+ public ByteBuffer getData() throws JMSException
{
- checkWritable();
-
- if (_data == null)
+ if(_exception != null)
{
- _data = ByteBuffer.allocate(DEFAULT_BUFFER_SIZE);
- _data.setAutoExpand(true);
+ final MessageFormatException messageFormatException =
+ new MessageFormatException("Unable to deserialize message");
+ messageFormatException.setLinkedException(_exception);
+ throw messageFormatException;
+ }
+ if(_readData == null)
+ {
+
+ return _data == null ? EMPTY_BYTE_BUFFER : _data.duplicate();
}
else
{
- _data.rewind();
+ try
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(DEFAULT_OUTPUT_BUFFER_SIZE);
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(_readData);
+ oos.flush();
+ return ByteBuffer.wrap(baos.toByteArray());
+ }
+ catch (IOException e)
+ {
+ final JMSException jmsException = new JMSException("Unable to encode object of type: " +
+ _readData.getClass().getName() + ", value " + _readData);
+ jmsException.setLinkedException(e);
+ throw jmsException;
+ }
}
+ }
+
+ public void setObject(Serializable serializable) throws JMSException
+ {
+ checkWritable();
+ clearBody();
try
{
- ObjectOutputStream out = new ObjectOutputStream(_data.asOutputStream());
- out.writeObject(serializable);
- out.flush();
- out.close();
+ ByteArrayOutputStream baos = new ByteArrayOutputStream(DEFAULT_OUTPUT_BUFFER_SIZE);
+ ObjectOutputStream oos = new ObjectOutputStream(baos);
+ oos.writeObject(serializable);
+ oos.flush();
+ _data = ByteBuffer.wrap(baos.toByteArray());
}
catch (IOException e)
{
- MessageFormatException mfe = new MessageFormatException("Message not serializable: " + e);
- mfe.setLinkedException(e);
- mfe.initCause(e);
- throw mfe;
+ final JMSException jmsException = new JMSException("Unable to encode object of type: " +
+ serializable.getClass().getName() + ", value " + serializable);
+ jmsException.setLinkedException(e);
+ throw jmsException;
}
}
public Serializable getObject() throws JMSException
{
- ObjectInputStream in = null;
- if (_data == null)
+ if(_exception != null)
{
- return null;
+ final MessageFormatException messageFormatException = new MessageFormatException("Unable to deserialize message");
+ messageFormatException.setLinkedException(_exception);
+ throw messageFormatException;
}
-
- try
+ else if(_readData != null || _data == null)
{
- _data.rewind();
- in = new ObjectInputStream(_data.asInputStream());
-
- return (Serializable) in.readObject();
+ return _readData;
}
- catch (IOException e)
- {
- MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e);
- mfe.setLinkedException(e);
- mfe.initCause(e);
- throw mfe;
- }
- catch (ClassNotFoundException e)
- {
- MessageFormatException mfe = new MessageFormatException("Could not deserialize message: " + e);
- mfe.setLinkedException(e);
- mfe.initCause(e);
- throw mfe;
- }
- finally
+ else
{
- // _data.rewind();
- close(in);
- }
- }
+ Exception exception = null;
- private static void close(InputStream in)
- {
- try
- {
- if (in != null)
+ final ByteBuffer data = _data.duplicate();
+ try
{
- in.close();
+ ClassLoadingAwareObjectInputStream in = new ClassLoadingAwareObjectInputStream(new InputStream()
+ {
+ @Override
+ public int read() throws IOException
+ {
+ return data.get();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException
+ {
+ len = data.remaining() < len ? data.remaining() : len;
+ data.get(b, off, len);
+ return len;
+ }
+ });
+
+ return (Serializable) in.readObject();
}
+ catch (ClassNotFoundException e)
+ {
+ exception = e;
+ }
+ catch (IOException e)
+ {
+ exception = e;
+ }
+
+ JMSException jmsException = new JMSException("Could not deserialize object");
+ jmsException.setLinkedException(exception);
+ throw jmsException;
}
- catch (IOException ignore)
- { }
+
}
+
}
Modified: qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java (original)
+++ qpid/branches/qpid-3346/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessageFactory.java Mon Sep 19 15:13:18 2011
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,10 +22,8 @@ package org.apache.qpid.client.message;
import javax.jms.JMSException;
-import org.apache.mina.common.ByteBuffer;
+import java.nio.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
public class JMSObjectMessageFactory extends AbstractJMSMessageFactory
{
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org