You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2013/11/28 18:15:05 UTC

svn commit: r1546399 - in /qpid/jms/trunk/src: main/java/org/apache/qpid/jms/engine/ main/java/org/apache/qpid/jms/impl/ test/java/org/apache/qpid/jms/ test/java/org/apache/qpid/jms/impl/

Author: robbie
Date: Thu Nov 28 17:15:04 2013
New Revision: 1546399

URL: http://svn.apache.org/r1546399
Log:
QPIDJMS-9: add Message support for JMSTimestamp operations, with associated handling for producers

Added:
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SenderIntegrationTest.java
Modified:
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java?rev=1546399&r1=1546398&r2=1546399&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java Thu Nov 28 17:15:04 2013
@@ -210,6 +210,16 @@ public abstract class AmqpMessage
         _message.setAddress(to);
     }
 
+    public long getCreationTime()
+    {
+        return _message.getCreationTime();
+    }
+
+    public void setCreationTime(long timeInMillis)
+    {
+        _message.setCreationTime(timeInMillis);
+    }
+
     //===== Application Properties ======
 
     private void createApplicationProperties()

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java?rev=1546399&r1=1546398&r2=1546399&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java Thu Nov 28 17:15:04 2013
@@ -132,15 +132,13 @@ public abstract class MessageImpl<T exte
     @Override
     public long getJMSTimestamp() throws JMSException
     {
-        // TODO Auto-generated method stub
-        throw new UnsupportedOperationException("Not Implemented");
+        return _amqpMessage.getCreationTime();
     }
 
     @Override
     public void setJMSTimestamp(long timestamp) throws JMSException
     {
-        // TODO Auto-generated method stub
-        throw new UnsupportedOperationException("Not Implemented");
+        _amqpMessage.setCreationTime(timestamp);
     }
 
     @Override

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java?rev=1546399&r1=1546398&r2=1546399&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java Thu Nov 28 17:15:04 2013
@@ -46,6 +46,11 @@ public class SenderImpl extends LinkImpl
         getConnectionImpl().lock();
         try
         {
+            long timestamp = System.currentTimeMillis();
+
+            //set the timestamp
+            message.setJMSTimestamp(timestamp);
+
             //set the Destination
             message.setJMSDestination(_destination);
 

Added: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SenderIntegrationTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SenderIntegrationTest.java?rev=1546399&view=auto
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SenderIntegrationTest.java (added)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SenderIntegrationTest.java Thu Nov 28 17:15:04 2013
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+
+import java.util.Calendar;
+import java.util.Date;
+
+import javax.jms.Connection;
+import javax.jms.Message;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
+import org.junit.Test;
+
+public class SenderIntegrationTest extends QpidJmsTestCase
+{
+    private final IntegrationTestFixture _testFixture = new IntegrationTestFixture();
+
+    @Test
+    public void testSendingMessageSetsJMSDestination() throws Exception
+    {
+        try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+        {
+            Connection connection = _testFixture.establishConnecton(testPeer);
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            String queueName = "myQueue";
+            Queue queue = session.createQueue(queueName);
+            MessageProducer producer = session.createProducer(queue);
+
+            String text = "myMessage";
+            MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+            MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withTo(equalTo(queueName));
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setPropertiesMatcher(propsMatcher);
+            messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
+            testPeer.expectTransfer(messageMatcher);
+
+            Message message = session.createTextMessage(text);
+
+            producer.send(message);
+        }
+    }
+
+    @Test
+    public void testSendingMessageSetsJMSTimestamp() throws Exception
+    {
+        try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+        {
+            Connection connection = _testFixture.establishConnecton(testPeer);
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            String queueName = "myQueue";
+            Queue queue = session.createQueue(queueName);
+            MessageProducer producer = session.createProducer(queue);
+
+            Date currentTime = Calendar.getInstance().getTime();
+            String text = "myMessage";
+            MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+            MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withCreationTime(greaterThanOrEqualTo(currentTime));
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setPropertiesMatcher(propsMatcher);
+            messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text));
+            testPeer.expectTransfer(messageMatcher);
+
+            Message message = session.createTextMessage(text);
+
+            producer.send(message);
+        }
+    }
+}

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java?rev=1546399&r1=1546398&r2=1546399&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java Thu Nov 28 17:15:04 2013
@@ -85,7 +85,7 @@ public class SessionIntegrationTest exte
 
             String text = "myMessage";
             MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
-            MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withTo(equalTo(queueName));
+            MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
             TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
             messageMatcher.setHeadersMatcher(headersMatcher);
             messageMatcher.setPropertiesMatcher(propsMatcher);
@@ -143,7 +143,7 @@ public class SessionIntegrationTest exte
             MessageProducer producer = session.createProducer(queue);
 
             MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
-            MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withTo(equalTo(queueName));
+            MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true);
             TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
             messageMatcher.setHeadersMatcher(headersMatcher);
             messageMatcher.setPropertiesMatcher(propsMatcher);

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java?rev=1546399&r1=1546398&r2=1546399&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageImplTest.java Thu Nov 28 17:15:04 2013
@@ -480,6 +480,44 @@ public class MessageImplTest extends Qpi
         assertEquals(newQueueExpected, _testMessage.getJMSDestination());
     }
 
+    // ====== JMSTimestamp =======
+
+    @Test
+    public void testGetJMSTimestampOnNewMessage() throws Exception
+    {
+        assertEquals("expected JMSTimestamp value not present", 0, _testMessage.getJMSTimestamp());
+    }
+
+    @Test
+    public void testSetGetJMSTimestampOnNewMessage() throws Exception
+    {
+        long timestamp = System.currentTimeMillis();
+
+        _testMessage.setJMSTimestamp(timestamp);
+        assertEquals("expected JMSTimestamp value not present", timestamp, _testMessage.getJMSTimestamp());
+    }
+
+    @Test
+    public void testSetJMSTimestampOnNewMessage() throws Exception
+    {
+        assertEquals(0, _testAmqpMessage.getCreationTime());
+
+        long timestamp = System.currentTimeMillis();
+        _testMessage.setJMSTimestamp(timestamp);
+
+        assertEquals(timestamp, _testAmqpMessage.getCreationTime());
+    }
+
+    @Test
+    public void testGetJMSTimestampOnRecievedMessageWithCreationTime() throws Exception
+    {
+        long timestamp = System.currentTimeMillis();
+        _testAmqpMessage.setCreationTime(timestamp);
+        _testMessage = new TestMessageImpl(_testAmqpMessage, _mockSessionImpl, _mockConnectionImpl);
+
+        assertEquals("expected JMSTimestamp value not present", timestamp, _testMessage.getJMSTimestamp());
+    }
+
     // ====== utility methods =======
 
     private void assertGetPropertyThrowsMessageFormatException(TestMessageImpl testMessage,

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java?rev=1546399&r1=1546398&r2=1546399&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/SenderImplTest.java Thu Nov 28 17:15:04 2013
@@ -101,4 +101,27 @@ public class SenderImplTest extends Qpid
 
         assertSame(_mockQueue, testMessage.getJMSDestination());
     }
+
+    @Test
+    public void testSenderSetsJMSTimestampOnMessage() throws Exception
+    {
+        //Create mock sent message token, ensure that it is immediately marked as Accepted
+        AmqpSentMessageToken _mockToken = Mockito.mock(AmqpSentMessageToken.class);
+        Mockito.when(_mockToken.getRemoteDeliveryState()).thenReturn(Accepted.getInstance());
+        Mockito.when(_mockAmqpSender.sendMessage(Mockito.any(AmqpMessage.class))).thenReturn(_mockToken);
+        ImmediateWaitUntil.mockWaitUntil(_mockConnection);
+
+        SenderImpl senderImpl = new SenderImpl(_mockSession, _mockConnection, _mockAmqpSender, _mockQueue);
+
+        TestAmqpMessage testAmqpMessage = new TestAmqpMessage();
+        TestMessageImpl testMessage = new TestMessageImpl(testAmqpMessage, null, null);
+
+        assertEquals(0, testMessage.getJMSTimestamp());
+        long timestamp = System.currentTimeMillis();
+
+        senderImpl.send(testMessage);
+
+        //verify the timestamp was set, allowing for a 3second delta
+        assertEquals(timestamp, testMessage.getJMSTimestamp(), 3000);
+    }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org