You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2006/12/16 19:21:49 UTC

svn commit: r487849 - in /incubator/qpid/trunk/qpid/java/client/src: main/java/org/apache/qpid/client/ main/java/org/apache/qpid/client/message/ test/java/org/apache/qpid/test/unit/basic/ test/java/org/apache/qpid/test/unit/topic/

Author: rgreig
Date: Sat Dec 16 10:21:48 2006
New Revision: 487849

URL: http://svn.apache.org/viewvc?view=rev&rev=487849
Log:
QPID-206 : Fix byte buffer reseting in AbstractJMSMessage

Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java
    incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=487849&r1=487848&r2=487849
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Sat Dec 16 10:21:48 2006
@@ -76,6 +76,8 @@
      */
     private final ConcurrentHashMap<String, TopicSubscriberAdaptor> _subscriptions =
             new ConcurrentHashMap<String, TopicSubscriberAdaptor>();
+    private final ConcurrentHashMap<BasicMessageConsumer, String> _reverseSubscriptionMap =
+                new ConcurrentHashMap<BasicMessageConsumer, String>();
 
     /**
      * Used in the consume method. We generate the consume tag on the client so that we can use the nowait
@@ -107,6 +109,7 @@
      */
     private ConcurrentHashMap<Destination, AtomicInteger> _destinationConsumerCount =
             new ConcurrentHashMap<Destination, AtomicInteger>();
+
     /**
      * Default value for immediate flag used by producers created by this session is false, i.e. a consumer does not
      * need to be attached to a queue
@@ -1205,7 +1208,9 @@
         }
 
         subscriber = new TopicSubscriberAdaptor(dest, (BasicMessageConsumer) createConsumer(dest));
+
         _subscriptions.put(name,subscriber);
+        _reverseSubscriptionMap.put(subscriber.getMessageConsumer(),name);
 
         return subscriber;
     }
@@ -1236,6 +1241,7 @@
         BasicMessageConsumer consumer = (BasicMessageConsumer) createConsumer(dest, messageSelector, noLocal);
         TopicSubscriberAdaptor subscriber = new TopicSubscriberAdaptor(dest, consumer);
         _subscriptions.put(name,subscriber);
+        _reverseSubscriptionMap.put(subscriber.getMessageConsumer(),name);
         return subscriber;
     }
 
@@ -1280,6 +1286,7 @@
             // send a queue.delete for the subscription
             deleteQueue(AMQTopic.getDurableTopicQueueName(name, _connection));
             _subscriptions.remove(name);
+            _reverseSubscriptionMap.remove(subscriber);
         }
         else
         {
@@ -1443,6 +1450,12 @@
     void deregisterConsumer(BasicMessageConsumer consumer)
     {
         _consumers.remove(consumer.getConsumerTag());
+        String subscriptionName = _reverseSubscriptionMap.remove(consumer);
+        if(subscriptionName != null)
+        {
+            _subscriptions.remove(subscriptionName);    
+        }
+
         Destination dest = consumer.getDestination();
         synchronized(dest)
         {

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java?view=diff&rev=487849&r1=487848&r2=487849
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java Sat Dec 16 10:21:48 2006
@@ -35,10 +35,10 @@
 class TopicSubscriberAdaptor implements TopicSubscriber
 {
     private final Topic _topic;
-    private final MessageConsumer _consumer;
+    private final BasicMessageConsumer _consumer;
     private final boolean _noLocal;
 
-    TopicSubscriberAdaptor(Topic topic, MessageConsumer consumer, boolean noLocal)
+    TopicSubscriberAdaptor(Topic topic, BasicMessageConsumer consumer, boolean noLocal)
     {
         _topic = topic;
         _consumer = consumer;
@@ -119,4 +119,10 @@
 			throw new javax.jms.IllegalStateException("Invalid Session");
 		}
 	}
+
+    BasicMessageConsumer getMessageConsumer()
+    {
+        return _consumer;
+    }
+
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=487849&r1=487848&r2=487849
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Sat Dec 16 10:21:48 2006
@@ -479,14 +479,7 @@
         // position beyond the start
         if (_data != null)
         {
-            if (!_readableMessage)
-            {
-                _data.flip();
-            }
-            else
-            {
-                _data.rewind();
-            }
+            reset();
         }
         return _data;
     }
@@ -525,7 +518,7 @@
         return !_readableMessage;
     }
 
-    public void reset() throws JMSException
+    public void reset() 
     {
         if (_readableMessage)
         {

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java?view=diff&rev=487849&r1=487848&r2=487849
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/basic/TextMessageTest.java Sat Dec 16 10:21:48 2006
@@ -21,11 +21,8 @@
 package org.apache.qpid.test.unit.basic;
 
 import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQDestination;
 import org.apache.qpid.client.AMQQueue;
 import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
-import org.apache.qpid.client.transport.TransportConnection;
 import org.apache.qpid.client.message.JMSTextMessage;
 import org.apache.qpid.testutil.VMBrokerSetup;
 import org.apache.log4j.Logger;

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java?view=diff&rev=487849&r1=487848&r2=487849
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/topic/TopicSessionTest.java Sat Dec 16 10:21:48 2006
@@ -200,7 +200,29 @@
         con.close();
     }
 
-    public void testTempoaryTopic() throws Exception
+    public void testSendingSameMessage() throws Exception
+    {
+        AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
+        TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+        TemporaryTopic topic = session.createTemporaryTopic();
+        assertNotNull(topic);
+        TopicPublisher producer = session.createPublisher(topic);
+        MessageConsumer consumer = session.createConsumer(topic);
+        conn.start();
+        TextMessage sentMessage = session.createTextMessage("Test Message");
+        producer.send(sentMessage);
+        TextMessage receivedMessage = (TextMessage) consumer.receive(2000);
+        assertNotNull(receivedMessage);
+        assertEquals(sentMessage.getText(),receivedMessage.getText());
+        producer.send(sentMessage);
+        receivedMessage = (TextMessage) consumer.receive(2000);
+        assertNotNull(receivedMessage);
+        assertEquals(sentMessage.getText(),receivedMessage.getText());
+
+
+    }
+
+    public void testTemporaryTopic() throws Exception
     {
         AMQConnection conn = new AMQConnection("vm://:1?retries='0'", "guest", "guest", "test", "/test");
         TopicSession session = conn.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);