You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2010/01/28 00:44:45 UTC
svn commit: r903911 - in /qpid/trunk/qpid/java:
client/src/main/java/org/apache/qpid/client/
client/src/main/java/org/apache/qpid/client/message/
systests/src/main/java/org/apache/qpid/client/message/
Author: rajith
Date: Wed Jan 27 23:44:44 2010
New Revision: 903911
URL: http://svn.apache.org/viewvc?rev=903911&view=rev
Log:
This is related to QPID-2363
I am comitting the patch as it is.
I will make the agreed changes in a subsequent commit shortly.
Added:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java
qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java
Modified:
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=903911&r1=903910&r2=903911&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Wed Jan 27 23:44:44 2010
@@ -61,6 +61,11 @@
import java.util.Timer;
import java.util.TimerTask;
+import org.apache.qpid.client.message.AMQPEncodedMapMessage;
+import org.apache.qpid.client.message.JMSMapMessage;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+
/**
* This is a 0.10 Session
*/
@@ -122,6 +127,7 @@
private TimerTask flushTask = null;
private RangeSet unacked = new RangeSet();
private int unackedCount = 0;
+ private boolean useAMQPEncodedMapMessage = !Boolean.getBoolean("qpid.use_legacy_map_message");
/**
* USed to store the range of in tx messages
@@ -933,4 +939,17 @@
return AMQMessageDelegateFactory.FACTORY_0_10;
}
+ @ Override
+ public MapMessage createMapMessage() throws JMSException
+ {
+ checkNotClosed();
+ if (useAMQPEncodedMapMessage)
+ {
+ return new AMQPEncodedMapMessage(AMQMessageDelegateFactory.FACTORY_0_10);
+ }
+ else
+ {
+ return new JMSMapMessage(AMQMessageDelegateFactory.FACTORY_0_10);
+ }
+ }
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java?rev=903911&r1=903910&r2=903911&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java Wed Jan 27 23:44:44 2010
@@ -25,7 +25,10 @@
import javax.jms.Destination;
import javax.jms.JMSException;
+
+import java.nio.ByteBuffer;
import java.util.Enumeration;
+import java.util.Map;
import java.util.UUID;
public interface AMQMessageDelegate
@@ -134,4 +137,8 @@
long getDeliveryTag();
void setJMSMessageID(final UUID messageId) throws JMSException;
+
+ ByteBuffer encodeMap(Map<String,Object> map);
+
+ Map<String,Object> decodeMap(ByteBuffer buf);
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java?rev=903911&r1=903910&r2=903911&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java Wed Jan 27 23:44:44 2010
@@ -21,26 +21,36 @@
package org.apache.qpid.client.message;
-import org.apache.commons.collections.map.ReferenceMap;
-import org.apache.qpid.client.*;
-import org.apache.qpid.framing.ContentHeaderProperties;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQPInvalidClassException;
-import org.apache.qpid.jms.Message;
-import org.apache.qpid.url.BindingURL;
-import org.apache.qpid.url.AMQBindingURL;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.transport.*;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
-import javax.jms.MessageNotWriteableException;
import javax.jms.MessageFormatException;
-import javax.jms.DeliveryMode;
-import java.util.*;
+import javax.jms.MessageNotWriteableException;
+
+import org.apache.commons.collections.map.ReferenceMap;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.CustomJMSXProperty;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jms.Message;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.ExchangeQueryResult;
+import org.apache.qpid.transport.Future;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageDeliveryMode;
+import org.apache.qpid.transport.MessageDeliveryPriority;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.ReplyTo;
+import org.apache.qpid.transport.codec.BBDecoder;
+import org.apache.qpid.transport.codec.BBEncoder;
/**
* This extends AbstractAMQMessageDelegate which contains common code between
@@ -912,4 +922,18 @@
return _deliveryProps;
}
+
+ public java.nio.ByteBuffer encodeMap(Map<String,Object> map)
+ {
+ BBEncoder encoder = new BBEncoder(1024);
+ encoder.writeMap(map);
+ return encoder.segment();
+ }
+
+ public Map<String,Object> decodeMap(java.nio.ByteBuffer buf)
+ {
+ BBDecoder decoder = new BBDecoder();
+ decoder.init(buf);
+ return decoder.readMap();
+ }
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java?rev=903911&r1=903910&r2=903911&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java Wed Jan 27 23:44:44 2010
@@ -573,5 +573,25 @@
return _deliveryTag;
}
+
+ public java.nio.ByteBuffer encodeMap(Map<String,Object> map)
+ {
+ String errorMsg = "There is no support for encoding maps";
+ if (_session != null)
+ {
+ errorMsg = errorMsg + " in AMQP " + _session.getAMQConnection().getProtocolVersion();
+ }
+ throw new UnsupportedOperationException(errorMsg);
+ }
+
+ public Map<String,Object> decodeMap(java.nio.ByteBuffer buf)
+ {
+ String errorMsg = "There is no support for encoding maps";
+ if (_session != null)
+ {
+ errorMsg = errorMsg + " in AMQP " + _session.getAMQConnection().getProtocolVersion();
+ }
+ throw new UnsupportedOperationException(errorMsg);
+ }
}
Added: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java?rev=903911&view=auto
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java (added)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java Wed Jan 27 23:44:44 2010
@@ -0,0 +1,73 @@
+package org.apache.qpid.client.message;
+
+import java.util.Map;
+
+import javax.jms.JMSException;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+
+public class AMQPEncodedMapMessage extends JMSMapMessage
+{
+ public static final String MIME_TYPE = "amqp/map";
+
+ public AMQPEncodedMapMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
+ {
+ this(delegateFactory, null);
+ }
+
+ AMQPEncodedMapMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) throws JMSException
+ {
+ super(delegateFactory, data);
+ }
+
+ AMQPEncodedMapMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
+ {
+ super(delegate, data);
+ }
+
+ @ Override
+ protected String getMimeType()
+ {
+ return MIME_TYPE;
+ }
+
+ // The super clas methods resets the buffer
+ @ Override
+ public ByteBuffer getData()
+ {
+ writeMapToData();
+ return _data;
+ }
+
+ @ Override
+ protected void populateMapFromData() throws JMSException
+ {
+ if (_data != null)
+ {
+ _data.rewind();
+ _map = _delegate.decodeMap(_data.buf());
+ }
+ else
+ {
+ _map.clear();
+ }
+ }
+
+ @ Override
+ protected void writeMapToData()
+ {
+ _data = ByteBuffer.wrap(_delegate.encodeMap(_map));
+ }
+
+ // for testing
+ Map<String,Object> getMap()
+ {
+ return _map;
+ }
+
+ void setMap(Map<String,Object> map)
+ {
+ _map = map;
+ }
+}
Added: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java?rev=903911&view=auto
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java (added)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java Wed Jan 27 23:44:44 2010
@@ -0,0 +1,25 @@
+package org.apache.qpid.client.message;
+
+import javax.jms.JMSException;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+
+public class AMQPEncodedMapMessageFactory extends AbstractJMSMessageFactory
+{
+
+ @Override
+ protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate,
+ ByteBuffer data) throws AMQException
+ {
+ return new AMQPEncodedMapMessage(delegate,data);
+ }
+
+ @Override
+ public AbstractJMSMessage createMessage(
+ AMQMessageDelegateFactory delegateFactory) throws JMSException
+ {
+ return new AMQPEncodedMapMessage(delegateFactory);
+ }
+
+}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?rev=903911&r1=903910&r2=903911&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Wed Jan 27 23:44:44 2010
@@ -49,7 +49,7 @@
- private AMQMessageDelegate _delegate;
+ protected AMQMessageDelegate _delegate;
private boolean _redelivered;
protected AbstractJMSMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
@@ -379,6 +379,7 @@
buf.append("\nJMS Destination: ").append(getJMSDestination());
buf.append("\nJMS Type: ").append(getJMSType());
buf.append("\nJMS MessageID: ").append(getJMSMessageID());
+ buf.append("\nJMS Content-Type: ").append(getContentType());
buf.append("\nAMQ message number: ").append(getDeliveryTag());
buf.append("\nProperties:");
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java?rev=903911&r1=903910&r2=903911&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java Wed Jan 27 23:44:44 2010
@@ -44,8 +44,7 @@
public static final String MIME_TYPE = "jms/map-message";
-
- private Map<String, Object> _map = new HashMap<String, Object>();
+ protected Map<String, Object> _map = new HashMap<String, Object>();
public JMSMapMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
{
@@ -459,7 +458,7 @@
return _map.containsKey(propName);
}
- private void populateMapFromData() throws JMSException
+ protected void populateMapFromData() throws JMSException
{
if (_data != null)
{
@@ -479,7 +478,7 @@
}
}
- private void writeMapToData()
+ protected void writeMapToData()
{
allocateInitialBuffer();
final int size = _map.size();
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java?rev=903911&r1=903910&r2=903911&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java Wed Jan 27 23:44:44 2010
@@ -64,6 +64,7 @@
mf.registerFactory(JMSBytesMessage.MIME_TYPE, new JMSBytesMessageFactory());
mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory());
mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory());
+ mf.registerFactory(AMQPEncodedMapMessage.MIME_TYPE, new AMQPEncodedMapMessageFactory());
mf.registerFactory(null, mf._default);
return mf;
Added: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java?rev=903911&view=auto
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java (added)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java Wed Jan 27 23:44:44 2010
@@ -0,0 +1,138 @@
+package org.apache.qpid.client.message;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.message.AMQPEncodedMapMessage;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+
+public class AMQPEncodedMapMessageTest extends QpidTestCase
+{
+ private Connection _connection;
+ private Session _session;
+ MessageConsumer _consumer;
+ MessageProducer _producer;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ //Create Connection
+ _connection = getConnection();
+
+ //Create Session
+ _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ //Create Queue
+ String queueName = getTestQueueName();
+ Queue queue = _session.createQueue(queueName);
+
+ //Create Consumer
+ _consumer = _session.createConsumer(queue);
+
+ //Create Producer
+ _producer = _session.createProducer(queue);
+
+ _connection.start();
+ }
+
+ public void testEmptyMessage() throws JMSException
+ {
+ if (((AMQConnection)_connection).getProtocolVersion() == ProtocolVersion.v0_10)
+ {
+ MapMessage m = _session.createMapMessage();
+ _producer.send(m);
+ AMQPEncodedMapMessage msg = (AMQPEncodedMapMessage)_consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message was not received on time",msg);
+ assertEquals("Message content-type is incorrect",
+ AMQPEncodedMapMessage.MIME_TYPE,
+ ((AbstractJMSMessage)msg).getContentType());
+
+ assertEquals("Message content should be an empty map",
+ Collections.EMPTY_MAP,
+ ((AMQPEncodedMapMessage)msg).getMap());
+
+ }
+ }
+
+ public void testNullMessage() throws JMSException
+ {
+ if (((AMQConnection)_connection).getProtocolVersion() == ProtocolVersion.v0_10)
+ {
+ MapMessage m = _session.createMapMessage();
+ ((AMQPEncodedMapMessage)m).setMap(null);
+ _producer.send(m);
+ AMQPEncodedMapMessage msg = (AMQPEncodedMapMessage)_consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message was not received on time",msg);
+ assertEquals("Message content-type is incorrect",
+ AMQPEncodedMapMessage.MIME_TYPE,
+ ((AbstractJMSMessage)msg).getContentType());
+
+ assertEquals("Message content should be null",
+ null,
+ ((AMQPEncodedMapMessage)msg).getMap());
+
+ }
+ }
+
+ public void testMessageWithContent() throws JMSException
+ {
+ if (((AMQConnection)_connection).getProtocolVersion() == ProtocolVersion.v0_10)
+ {
+ MapMessage m = _session.createMapMessage();
+ m.setBoolean("Boolean", true);
+ m.setByte("Byte", (byte)5);
+ byte[] bytes = new byte[]{(byte)5,(byte)8};
+ m.setBytes("Bytes", bytes);
+ m.setChar("Char", 'X');
+ m.setDouble("Double", 56.84);
+ m.setFloat("Float", Integer.MAX_VALUE + 5000);
+ m.setInt("Int", Integer.MAX_VALUE - 5000);
+ m.setShort("Short", (short)58);
+ m.setString("String", "Hello");
+ _producer.send(m);
+
+ AMQPEncodedMapMessage msg = (AMQPEncodedMapMessage)_consumer.receive(RECEIVE_TIMEOUT);
+ assertNotNull("Message was not received on time",msg);
+ assertEquals("Message content-type is incorrect",
+ AMQPEncodedMapMessage.MIME_TYPE,
+ ((AbstractJMSMessage)msg).getContentType());
+
+ assertEquals(true,m.getBoolean("Boolean"));
+ assertEquals((byte)5,m.getByte("Byte"));
+ byte[] bytesRcv = m.getBytes("Bytes");
+ assertNotNull("Byte array is null",bytesRcv);
+ assertEquals((byte)5,bytesRcv[0]);
+ assertEquals((byte)8,bytesRcv[1]);
+ assertEquals('X',m.getChar("Char"));
+ assertEquals(56.84,m.getDouble("Double"));
+ //assertEquals(Integer.MAX_VALUE + 5000,m.getFloat("Float"));
+ assertEquals(Integer.MAX_VALUE - 5000,m.getInt("Int"));
+ assertEquals((short)58,m.getShort("Short"));
+ assertEquals("Hello",m.getString("String"));
+ }
+ }
+
+ public void tearDown() throws Exception
+ {
+ //clean up
+ _connection.close();
+
+ super.tearDown();
+ }
+}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org