You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/05/03 22:56:38 UTC
svn commit: r1478976 - in /activemq/trunk/activemq-stomp/src:
main/java/org/apache/activemq/transport/stomp/
test/java/org/apache/activemq/transport/stomp/
Author: tabish
Date: Fri May 3 20:54:31 2013
New Revision: 1478976
URL: http://svn.apache.org/r1478976
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4468
Modified:
activemq/trunk/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java
activemq/trunk/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
Modified: activemq/trunk/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java?rev=1478976&r1=1478975&r2=1478976&view=diff
==============================================================================
--- activemq/trunk/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java (original)
+++ activemq/trunk/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java Fri May 3 20:54:31 2013
@@ -16,15 +16,16 @@
*/
package org.apache.activemq.transport.stomp;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+
/**
* Implementations of this interface are used to map back and forth from Stomp
* to ActiveMQ. There are several standard mappings which are semantically the
@@ -107,6 +108,13 @@ public interface FrameTranslator {
msg.setJMSExpiration(Long.parseLong((String)o));
}
+ o = headers.remove(Stomp.Headers.Message.TIMESTAMP);
+ if (o != null) {
+ msg.setJMSTimestamp(Long.parseLong((String)o));
+ } else {
+ msg.setJMSTimestamp(System.currentTimeMillis());
+ }
+
o = headers.remove(Stomp.Headers.Send.PRIORITY);
if (o != null) {
msg.setJMSPriority(Integer.parseInt((String)o));
@@ -141,7 +149,6 @@ public interface FrameTranslator {
// be sent back to a STOMP consumer we need to sanitize anything which could be in
// Stomp.Headers.Message and might get passed through to the consumer
headers.remove(Stomp.Headers.Message.MESSAGE_ID);
- headers.remove(Stomp.Headers.Message.TIMESTAMP);
headers.remove(Stomp.Headers.Message.REDELIVERED);
headers.remove(Stomp.Headers.Message.SUBSCRIPTION);
headers.remove(Stomp.Headers.Message.USERID);
Modified: activemq/trunk/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=1478976&r1=1478975&r2=1478976&view=diff
==============================================================================
--- activemq/trunk/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java (original)
+++ activemq/trunk/activemq-stomp/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java Fri May 3 20:54:31 2013
@@ -128,8 +128,8 @@ public class ProtocolConverter {
private static class AckEntry {
- private String messageId;
- private StompSubscription subscription;
+ private final String messageId;
+ private final StompSubscription subscription;
public AckEntry(String messageId, StompSubscription subscription) {
this.messageId = messageId;
@@ -148,6 +148,7 @@ public class ProtocolConverter {
return this.messageId;
}
+ @SuppressWarnings("unused")
public StompSubscription getSubscription() {
return this.subscription;
}
@@ -168,6 +169,7 @@ public class ProtocolConverter {
final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
if (receiptId != null) {
return new ResponseHandler() {
+ @Override
public void onResponse(ProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
// Generally a command can fail.. but that does not invalidate the connection.
@@ -317,7 +319,6 @@ public class ProtocolConverter {
message.setProducerId(producerId);
MessageId id = new MessageId(producerId, messageIdGenerator.getNextSequenceId());
message.setMessageId(id);
- message.setJMSTimestamp(System.currentTimeMillis());
if (stompTx != null) {
TransactionId activemqTx = transactions.get(stompTx);
@@ -634,6 +635,7 @@ public class ProtocolConverter {
consumerInfo.setPrefetchSize(0);
final ResponseHandler handler = new ResponseHandler() {
+ @Override
public void onResponse(ProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
// Generally a command can fail.. but that does not invalidate the connection.
@@ -761,6 +763,7 @@ public class ProtocolConverter {
connectionInfo.setTransportContext(command.getTransportContext());
sendToActiveMQ(connectionInfo, new ResponseHandler() {
+ @Override
public void onResponse(ProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
@@ -776,6 +779,7 @@ public class ProtocolConverter {
final ProducerInfo producerInfo = new ProducerInfo(producerId);
sendToActiveMQ(producerInfo, new ResponseHandler() {
+ @Override
public void onResponse(ProtocolConverter converter, Response response) throws IOException {
if (response.isException()) {
Modified: activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=1478976&r1=1478975&r2=1478976&view=diff
==============================================================================
--- activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java (original)
+++ activemq/trunk/activemq-stomp/src/test/java/org/apache/activemq/transport/stomp/StompTest.java Fri May 3 20:54:31 2013
@@ -1843,7 +1843,7 @@ public class StompTest extends StompTest
assertFalse("Thisisnotallowed".equals(mess_headers.get(Stomp.Headers.Message.MESSAGE_ID)
));
- assertFalse("1234".equals(mess_headers.get(Stomp.Headers.Message.TIMESTAMP)));
+ assertTrue("1234".equals(mess_headers.get(Stomp.Headers.Message.TIMESTAMP)));
assertNull(mess_headers.get(Stomp.Headers.Message.REDELIVERED));
assertNull(mess_headers.get(Stomp.Headers.Message.SUBSCRIPTION));
assertEquals("system", mess_headers.get(Stomp.Headers.Message.USERID));