You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/05/03 22:56:38 UTC

svn commit: r1478976 - in /activemq/trunk/activemq-stomp/src: main/java/org/apache/activemq/transport/stomp/ test/java/org/apache/activemq/transport/stomp/

Author: tabish
Date: Fri May  3 20:54:31 2013
New Revision: 1478976

URL: http://svn.apache.org/r1478976
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4468

Modified:
    activemq/trunk/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java
    activemq/trunk/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
    activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java

Modified: activemq/trunk/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java?rev=1478976&r1=1478975&r2=1478976&view=diff
==============================================================================
--- activemq/trunk/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java (original)
+++ activemq/trunk/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java Fri May  3 20:54:31 2013
@@ -16,15 +16,16 @@
  */
 package org.apache.activemq.transport.stomp;
 
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
+import javax.jms.Destination;
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+
 /**
  * Implementations of this interface are used to map back and forth from Stomp
  * to ActiveMQ. There are several standard mappings which are semantically the
@@ -107,6 +108,13 @@ public interface FrameTranslator {
                 msg.setJMSExpiration(Long.parseLong((String)o));
             }
 
+            o = headers.remove(Stomp.Headers.Message.TIMESTAMP);
+            if (o != null) {
+                msg.setJMSTimestamp(Long.parseLong((String)o));
+            } else {
+                msg.setJMSTimestamp(System.currentTimeMillis());
+            }
+
             o = headers.remove(Stomp.Headers.Send.PRIORITY);
             if (o != null) {
                 msg.setJMSPriority(Integer.parseInt((String)o));
@@ -141,7 +149,6 @@ public interface FrameTranslator {
             // be sent back to a STOMP consumer we need to sanitize anything which could be in
             // Stomp.Headers.Message and might get passed through to the consumer
             headers.remove(Stomp.Headers.Message.MESSAGE_ID);
-            headers.remove(Stomp.Headers.Message.TIMESTAMP);
             headers.remove(Stomp.Headers.Message.REDELIVERED);
             headers.remove(Stomp.Headers.Message.SUBSCRIPTION);
             headers.remove(Stomp.Headers.Message.USERID);

Modified: activemq/trunk/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=1478976&r1=1478975&r2=1478976&view=diff
==============================================================================
--- activemq/trunk/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java (original)
+++ activemq/trunk/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java Fri May  3 20:54:31 2013
@@ -128,8 +128,8 @@ public class ProtocolConverter {
 
     private static class AckEntry {
 
-        private String messageId;
-        private StompSubscription subscription;
+        private final String messageId;
+        private final StompSubscription subscription;
 
         public AckEntry(String messageId, StompSubscription subscription) {
             this.messageId = messageId;
@@ -148,6 +148,7 @@ public class ProtocolConverter {
             return this.messageId;
         }
 
+        @SuppressWarnings("unused")
         public StompSubscription getSubscription() {
             return this.subscription;
         }
@@ -168,6 +169,7 @@ public class ProtocolConverter {
         final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
         if (receiptId != null) {
             return new ResponseHandler() {
+                @Override
                 public void onResponse(ProtocolConverter converter, Response response) throws IOException {
                     if (response.isException()) {
                         // Generally a command can fail.. but that does not invalidate the connection.
@@ -317,7 +319,6 @@ public class ProtocolConverter {
         message.setProducerId(producerId);
         MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
         message.setMessageId(id);
-        message.setJMSTimestamp(System.currentTimeMillis());
 
         if (stompTx != null) {
             TransactionId activemqTx = transactions.get(stompTx);
@@ -634,6 +635,7 @@ public class ProtocolConverter {
             consumerInfo.setPrefetchSize(0);
 
             final ResponseHandler handler = new ResponseHandler() {
+                @Override
                 public void onResponse(ProtocolConverter converter, Response response) throws IOException {
                     if (response.isException()) {
                         // Generally a command can fail.. but that does not invalidate the connection.
@@ -761,6 +763,7 @@ public class ProtocolConverter {
         connectionInfo.setTransportContext(command.getTransportContext());
 
         sendToActiveMQ(connectionInfo, new ResponseHandler() {
+            @Override
             public void onResponse(ProtocolConverter converter, Response response) throws IOException {
 
                 if (response.isException()) {
@@ -776,6 +779,7 @@ public class ProtocolConverter {
 
                 final ProducerInfo producerInfo = new ProducerInfo(producerId);
                 sendToActiveMQ(producerInfo, new ResponseHandler() {
+                    @Override
                     public void onResponse(ProtocolConverter converter, Response response) throws IOException {
 
                         if (response.isException()) {

Modified: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=1478976&r1=1478975&r2=1478976&view=diff
==============================================================================
--- activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java (original)
+++ activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java Fri May  3 20:54:31 2013
@@ -1843,7 +1843,7 @@ public class StompTest extends StompTest
 
         assertFalse("Thisisnotallowed".equals(mess_headers.get(Stomp.Headers.Message.MESSAGE_ID)
                 ));
-        assertFalse("1234".equals(mess_headers.get(Stomp.Headers.Message.TIMESTAMP)));
+        assertTrue("1234".equals(mess_headers.get(Stomp.Headers.Message.TIMESTAMP)));
         assertNull(mess_headers.get(Stomp.Headers.Message.REDELIVERED));
         assertNull(mess_headers.get(Stomp.Headers.Message.SUBSCRIPTION));
         assertEquals("system", mess_headers.get(Stomp.Headers.Message.USERID));