You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2010/01/28 00:44:45 UTC

svn commit: r903911 - in /qpid/trunk/qpid/java: client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpid/client/message/ systests/src/main/java/org/apache/qpid/client/message/

Author: rajith
Date: Wed Jan 27 23:44:44 2010
New Revision: 903911

URL: http://svn.apache.org/viewvc?rev=903911&view=rev
Log:
This is related to QPID-2363
I am comitting the patch as it is.
I will make the agreed changes in a subsequent commit shortly.

Added:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java
    qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java
Modified:
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
    qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java?rev=903911&r1=903910&r2=903911&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java Wed Jan 27 23:44:44 2010
@@ -61,6 +61,11 @@
 import java.util.Timer;
 import java.util.TimerTask;
 
+import org.apache.qpid.client.message.AMQPEncodedMapMessage;
+import org.apache.qpid.client.message.JMSMapMessage;
+import org.apache.qpid.client.message.MessageFactoryRegistry;
+import org.apache.qpid.client.message.AMQMessageDelegateFactory;
+
 /**
  * This is a 0.10 Session
  */
@@ -122,6 +127,7 @@
     private TimerTask flushTask = null;
     private RangeSet unacked = new RangeSet();
     private int unackedCount = 0;
+    private boolean useAMQPEncodedMapMessage = !Boolean.getBoolean("qpid.use_legacy_map_message");
 
     /**
      * USed to store the range of in tx messages
@@ -933,4 +939,17 @@
         return AMQMessageDelegateFactory.FACTORY_0_10;
     }
 
+    @ Override
+    public MapMessage createMapMessage() throws JMSException
+    {
+        checkNotClosed();
+        if (useAMQPEncodedMapMessage)
+        {
+            return new AMQPEncodedMapMessage(AMQMessageDelegateFactory.FACTORY_0_10);
+        }
+        else
+        {
+            return new JMSMapMessage(AMQMessageDelegateFactory.FACTORY_0_10);
+        }
+    }
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java?rev=903911&r1=903910&r2=903911&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate.java Wed Jan 27 23:44:44 2010
@@ -25,7 +25,10 @@
 
 import javax.jms.Destination;
 import javax.jms.JMSException;
+
+import java.nio.ByteBuffer;
 import java.util.Enumeration;
+import java.util.Map;
 import java.util.UUID;
 
 public interface AMQMessageDelegate
@@ -134,4 +137,8 @@
     long getDeliveryTag();
 
     void setJMSMessageID(final UUID messageId) throws JMSException;
+    
+    ByteBuffer encodeMap(Map<String,Object> map);
+    
+    Map<String,Object> decodeMap(ByteBuffer buf);
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java?rev=903911&r1=903910&r2=903911&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java Wed Jan 27 23:44:44 2010
@@ -21,26 +21,36 @@
 
 package org.apache.qpid.client.message;
 
-import org.apache.commons.collections.map.ReferenceMap;
-import org.apache.qpid.client.*;
-import org.apache.qpid.framing.ContentHeaderProperties;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQPInvalidClassException;
-import org.apache.qpid.jms.Message;
-import org.apache.qpid.url.BindingURL;
-import org.apache.qpid.url.AMQBindingURL;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.qpid.transport.*;
+import java.util.Collections;
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
 
+import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
-import javax.jms.MessageNotWriteableException;
 import javax.jms.MessageFormatException;
-import javax.jms.DeliveryMode;
-import java.util.*;
+import javax.jms.MessageNotWriteableException;
+
+import org.apache.commons.collections.map.ReferenceMap;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.CustomJMSXProperty;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jms.Message;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.ExchangeQueryResult;
+import org.apache.qpid.transport.Future;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageDeliveryMode;
+import org.apache.qpid.transport.MessageDeliveryPriority;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.ReplyTo;
+import org.apache.qpid.transport.codec.BBDecoder;
+import org.apache.qpid.transport.codec.BBEncoder;
 
 /**
  * This extends AbstractAMQMessageDelegate which contains common code between
@@ -912,4 +922,18 @@
         return _deliveryProps;
     }
 
+    
+    public java.nio.ByteBuffer encodeMap(Map<String,Object> map)
+    {
+        BBEncoder encoder = new BBEncoder(1024);
+        encoder.writeMap(map);
+        return encoder.segment();
+    }
+    
+    public Map<String,Object> decodeMap(java.nio.ByteBuffer buf)
+    {
+       BBDecoder decoder = new BBDecoder();
+       decoder.init(buf);
+       return decoder.readMap();
+    }
 }

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java?rev=903911&r1=903910&r2=903911&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java Wed Jan 27 23:44:44 2010
@@ -573,5 +573,25 @@
         return _deliveryTag;
     }
 
+    
+    public java.nio.ByteBuffer encodeMap(Map<String,Object> map)
+    {   
+        String errorMsg = "There is no support for encoding maps";
+        if (_session != null)
+        {
+            errorMsg = errorMsg + " in AMQP " + _session.getAMQConnection().getProtocolVersion();
+        }
+        throw new UnsupportedOperationException(errorMsg);
+    }
+    
+    public Map<String,Object> decodeMap(java.nio.ByteBuffer buf)
+    {
+        String errorMsg = "There is no support for encoding maps";
+        if (_session != null)
+        {
+            errorMsg = errorMsg + " in AMQP " + _session.getAMQConnection().getProtocolVersion();
+        }
+        throw new UnsupportedOperationException(errorMsg);
+    }
 
 }

Added: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java?rev=903911&view=auto
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java (added)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessage.java Wed Jan 27 23:44:44 2010
@@ -0,0 +1,73 @@
+package org.apache.qpid.client.message;
+
+import java.util.Map;
+
+import javax.jms.JMSException;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+
+public class AMQPEncodedMapMessage extends JMSMapMessage
+{
+    public static final String MIME_TYPE = "amqp/map";
+    
+    public AMQPEncodedMapMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
+    {
+        this(delegateFactory, null);
+    }
+
+    AMQPEncodedMapMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data) throws JMSException
+    {
+        super(delegateFactory, data); 
+    }
+
+    AMQPEncodedMapMessage(AMQMessageDelegate delegate, ByteBuffer data) throws AMQException
+    {
+        super(delegate, data);
+    }
+    
+    @ Override
+    protected String getMimeType()
+    {
+        return MIME_TYPE;
+    }
+
+    // The super clas methods resets the buffer
+    @ Override
+    public ByteBuffer getData()
+    {
+        writeMapToData();
+        return _data;
+    }
+    
+    @ Override
+    protected void populateMapFromData() throws JMSException
+    {
+        if (_data != null)
+        {
+            _data.rewind();
+            _map = _delegate.decodeMap(_data.buf());
+        }
+        else
+        {
+            _map.clear();
+        }
+    }
+
+    @ Override
+    protected void writeMapToData()
+    {
+        _data = ByteBuffer.wrap(_delegate.encodeMap(_map));
+    }
+    
+    // for testing
+    Map<String,Object> getMap()
+    {
+        return _map;
+    }
+    
+    void setMap(Map<String,Object> map)
+    {
+        _map = map;
+    }
+}

Added: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java?rev=903911&view=auto
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java (added)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageFactory.java Wed Jan 27 23:44:44 2010
@@ -0,0 +1,25 @@
+package org.apache.qpid.client.message;
+
+import javax.jms.JMSException;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.qpid.AMQException;
+
+public class AMQPEncodedMapMessageFactory extends AbstractJMSMessageFactory
+{
+
+    @Override
+    protected AbstractJMSMessage createMessage(AMQMessageDelegate delegate,
+            ByteBuffer data) throws AMQException
+    {
+        return new AMQPEncodedMapMessage(delegate,data);
+    }
+
+    @Override
+    public AbstractJMSMessage createMessage(
+            AMQMessageDelegateFactory delegateFactory) throws JMSException
+    {
+        return new AMQPEncodedMapMessage(delegateFactory);
+    }
+
+}

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?rev=903911&r1=903910&r2=903911&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Wed Jan 27 23:44:44 2010
@@ -49,7 +49,7 @@
 
 
 
-    private AMQMessageDelegate _delegate;
+    protected AMQMessageDelegate _delegate;
     private boolean _redelivered;
 
     protected AbstractJMSMessage(AMQMessageDelegateFactory delegateFactory, ByteBuffer data)
@@ -379,6 +379,7 @@
             buf.append("\nJMS Destination: ").append(getJMSDestination());
             buf.append("\nJMS Type: ").append(getJMSType());
             buf.append("\nJMS MessageID: ").append(getJMSMessageID());
+            buf.append("\nJMS Content-Type: ").append(getContentType());
             buf.append("\nAMQ message number: ").append(getDeliveryTag());
 
             buf.append("\nProperties:");

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java?rev=903911&r1=903910&r2=903911&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java Wed Jan 27 23:44:44 2010
@@ -44,8 +44,7 @@
 
     public static final String MIME_TYPE = "jms/map-message";
 
-
-    private Map<String, Object> _map = new HashMap<String, Object>();
+    protected Map<String, Object> _map = new HashMap<String, Object>();
 
     public JMSMapMessage(AMQMessageDelegateFactory delegateFactory) throws JMSException
     {
@@ -459,7 +458,7 @@
         return _map.containsKey(propName);
     }
 
-    private void populateMapFromData() throws JMSException
+    protected void populateMapFromData() throws JMSException
     {
         if (_data != null)
         {
@@ -479,7 +478,7 @@
         }
     }
 
-    private void writeMapToData()
+    protected void writeMapToData()
     {
         allocateInitialBuffer();
         final int size = _map.size();

Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java?rev=903911&r1=903910&r2=903911&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/MessageFactoryRegistry.java Wed Jan 27 23:44:44 2010
@@ -64,6 +64,7 @@
         mf.registerFactory(JMSBytesMessage.MIME_TYPE, new JMSBytesMessageFactory());
         mf.registerFactory(JMSObjectMessage.MIME_TYPE, new JMSObjectMessageFactory());
         mf.registerFactory(JMSStreamMessage.MIME_TYPE, new JMSStreamMessageFactory());
+        mf.registerFactory(AMQPEncodedMapMessage.MIME_TYPE, new AMQPEncodedMapMessageFactory());
         mf.registerFactory(null, mf._default);
 
         return mf;

Added: qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java?rev=903911&view=auto
==============================================================================
--- qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java (added)
+++ qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/client/message/AMQPEncodedMapMessageTest.java Wed Jan 27 23:44:44 2010
@@ -0,0 +1,138 @@
+package org.apache.qpid.client.message;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.message.AMQPEncodedMapMessage;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.test.utils.QpidTestCase;
+
+
+public class AMQPEncodedMapMessageTest extends QpidTestCase
+{
+    private Connection _connection;
+    private Session _session;
+    MessageConsumer _consumer;
+    MessageProducer _producer;
+    
+    public void setUp() throws Exception
+    {
+        super.setUp();
+        
+        //Create Connection
+        _connection = getConnection();
+        
+        //Create Session
+        _session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+        //Create Queue
+        String queueName = getTestQueueName();
+        Queue queue = _session.createQueue(queueName);
+
+        //Create Consumer
+        _consumer = _session.createConsumer(queue);
+
+        //Create Producer
+        _producer = _session.createProducer(queue);
+
+        _connection.start();
+    }
+
+    public void testEmptyMessage() throws JMSException
+    {
+        if (((AMQConnection)_connection).getProtocolVersion() == ProtocolVersion.v0_10)
+        {
+            MapMessage m = _session.createMapMessage();
+            _producer.send(m);
+            AMQPEncodedMapMessage msg = (AMQPEncodedMapMessage)_consumer.receive(RECEIVE_TIMEOUT);
+            assertNotNull("Message was not received on time",msg);  
+            assertEquals("Message content-type is incorrect",
+                            AMQPEncodedMapMessage.MIME_TYPE,
+                            ((AbstractJMSMessage)msg).getContentType());
+            
+            assertEquals("Message content should be an empty map",
+                    Collections.EMPTY_MAP,
+                    ((AMQPEncodedMapMessage)msg).getMap());
+    
+        }
+    }
+    
+    public void testNullMessage() throws JMSException
+    {
+        if (((AMQConnection)_connection).getProtocolVersion() == ProtocolVersion.v0_10)
+        {
+            MapMessage m = _session.createMapMessage();
+            ((AMQPEncodedMapMessage)m).setMap(null);
+            _producer.send(m);
+            AMQPEncodedMapMessage msg = (AMQPEncodedMapMessage)_consumer.receive(RECEIVE_TIMEOUT);
+            assertNotNull("Message was not received on time",msg);  
+            assertEquals("Message content-type is incorrect",
+                            AMQPEncodedMapMessage.MIME_TYPE,
+                            ((AbstractJMSMessage)msg).getContentType());
+            
+            assertEquals("Message content should be null",
+                    null,
+                    ((AMQPEncodedMapMessage)msg).getMap());
+    
+        }
+    }
+
+    public void testMessageWithContent() throws JMSException
+    {
+        if (((AMQConnection)_connection).getProtocolVersion() == ProtocolVersion.v0_10)
+        {
+            MapMessage m = _session.createMapMessage();
+            m.setBoolean("Boolean", true);
+            m.setByte("Byte", (byte)5);
+            byte[] bytes = new byte[]{(byte)5,(byte)8};
+            m.setBytes("Bytes", bytes);
+            m.setChar("Char", 'X');
+            m.setDouble("Double", 56.84);
+            m.setFloat("Float", Integer.MAX_VALUE + 5000);
+            m.setInt("Int", Integer.MAX_VALUE - 5000);
+            m.setShort("Short", (short)58);
+            m.setString("String", "Hello");            
+            _producer.send(m);
+            
+            AMQPEncodedMapMessage msg = (AMQPEncodedMapMessage)_consumer.receive(RECEIVE_TIMEOUT);
+            assertNotNull("Message was not received on time",msg);            
+            assertEquals("Message content-type is incorrect",
+                            AMQPEncodedMapMessage.MIME_TYPE,
+                            ((AbstractJMSMessage)msg).getContentType());
+            
+            assertEquals(true,m.getBoolean("Boolean"));
+            assertEquals((byte)5,m.getByte("Byte"));
+            byte[] bytesRcv = m.getBytes("Bytes");
+            assertNotNull("Byte array is null",bytesRcv);
+            assertEquals((byte)5,bytesRcv[0]);
+            assertEquals((byte)8,bytesRcv[1]);
+            assertEquals('X',m.getChar("Char"));
+            assertEquals(56.84,m.getDouble("Double"));
+            //assertEquals(Integer.MAX_VALUE + 5000,m.getFloat("Float"));
+            assertEquals(Integer.MAX_VALUE - 5000,m.getInt("Int"));
+            assertEquals((short)58,m.getShort("Short"));
+            assertEquals("Hello",m.getString("String"));            
+        }
+    }
+    
+    public void tearDown() throws Exception
+    {
+        //clean up
+        _connection.close();
+
+        super.tearDown();
+    }
+}



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org