You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2011/12/20 12:06:40 UTC

svn commit: r1221199 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/stomp/ test/java/org/apache/activemq/transport/stomp/

Author: dejanb
Date: Tue Dec 20 11:06:40 2011
New Revision: 1221199

URL: http://svn.apache.org/viewvc?rev=1221199&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3496 - reply-to header for Stomp

Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java?rev=1221199&r1=1221198&r2=1221199&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/FrameTranslator.java Tue Dec 20 11:06:40 2011
@@ -16,15 +16,14 @@
  */
 package org.apache.activemq.transport.stomp;
 
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
 
 import javax.jms.Destination;
 import javax.jms.JMSException;
-
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Implementations of this interface are used to map back and forth from Stomp
@@ -39,7 +38,7 @@ public interface FrameTranslator {
 
     String convertDestination(ProtocolConverter converter, Destination d);
 
-    ActiveMQDestination convertDestination(ProtocolConverter converter, String name) throws ProtocolException;
+    ActiveMQDestination convertDestination(ProtocolConverter converter, String name, boolean forceFallback) throws ProtocolException;
 
     /**
      * Helper class which holds commonly needed functions used when implementing
@@ -98,7 +97,7 @@ public interface FrameTranslator {
         public static void copyStandardHeadersFromFrameToMessage(ProtocolConverter converter, StompFrame command, ActiveMQMessage msg, FrameTranslator ft) throws ProtocolException, JMSException {
             final Map<String, String> headers = new HashMap<String, String>(command.getHeaders());
             final String destination = headers.remove(Stomp.Headers.Send.DESTINATION);
-            msg.setDestination(ft.convertDestination(converter, destination));
+            msg.setDestination(ft.convertDestination(converter, destination, true));
 
             // the standard JMS headers
             msg.setJMSCorrelationID(headers.remove(Stomp.Headers.Send.CORRELATION_ID));
@@ -122,7 +121,12 @@ public interface FrameTranslator {
 
             o = headers.remove(Stomp.Headers.Send.REPLY_TO);
             if (o != null) {
-                msg.setJMSReplyTo(ft.convertDestination(converter, (String)o));
+                try {
+                    ActiveMQDestination dest = ft.convertDestination(converter, (String)o, false);
+                    msg.setJMSReplyTo(dest);
+                } catch (ProtocolException pe) {
+                    msg.setStringProperty("reply-to", (String)o);
+                }
             }
 
             o = headers.remove(Stomp.Headers.Send.PERSISTENT);

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java?rev=1221199&r1=1221198&r2=1221199&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java Tue Dec 20 11:06:40 2011
@@ -16,25 +16,19 @@
  */
 package org.apache.activemq.transport.stomp;
 
-import java.io.DataOutputStream;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-
+import com.thoughtworks.xstream.XStream;
+import com.thoughtworks.xstream.io.json.JsonHierarchicalStreamDriver;
 import org.apache.activemq.advisory.AdvisorySupport;
-import org.apache.activemq.command.ActiveMQBytesMessage;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQTextMessage;
-import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.command.*;
 import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.ByteSequence;
 
-import com.thoughtworks.xstream.XStream;
-import com.thoughtworks.xstream.io.json.JsonHierarchicalStreamDriver;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * Implements ActiveMQ 4.0 translations
@@ -167,7 +161,7 @@ public class LegacyFrameTranslator imple
         return buffer.toString();
     }
 
-    public ActiveMQDestination convertDestination(ProtocolConverter converter, String name) throws ProtocolException {
+    public ActiveMQDestination convertDestination(ProtocolConverter converter, String name, boolean forceFallback) throws ProtocolException {
         if (name == null) {
             return null;
         } else if (name.startsWith("/queue/")) {
@@ -187,14 +181,16 @@ public class LegacyFrameTranslator imple
         } else if (name.startsWith("/temp-topic/")) {
             return converter.createTempDestination(name, true);
         } else {
-            try {
-                ActiveMQDestination fallback = ActiveMQDestination.getUnresolvableDestinationTransformer().transform(name);
-                if (fallback != null) {
-                    return fallback;
+            if (forceFallback) {
+                try {
+                    ActiveMQDestination fallback = ActiveMQDestination.getUnresolvableDestinationTransformer().transform(name);
+                    if (fallback != null) {
+                        return fallback;
+                    }
+                } catch (JMSException e) {
+                    throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations "
+                            + "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/", false, e);
                 }
-            } catch (JMSException e) {
-                 throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations "
-                                        + "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/", false, e);
             }
             throw new ProtocolException("Illegal destination name: [" + name + "] -- ActiveMQ STOMP destinations "
                                         + "must begin with one of: /queue/ /topic/ /temp-queue/ /temp-topic/");

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=1221199&r1=1221198&r2=1221199&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java Tue Dec 20 11:06:40 2011
@@ -16,60 +16,20 @@
  */
 package org.apache.activemq.transport.stomp;
 
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import javax.jms.JMSException;
-
 import org.apache.activemq.broker.BrokerContext;
 import org.apache.activemq.broker.BrokerContextAware;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQTempQueue;
-import org.apache.activemq.command.ActiveMQTempTopic;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.CommandTypes;
-import org.apache.activemq.command.ConnectionError;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConnectionInfo;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.DestinationInfo;
-import org.apache.activemq.command.ExceptionResponse;
-import org.apache.activemq.command.LocalTransactionId;
-import org.apache.activemq.command.MessageAck;
-import org.apache.activemq.command.MessageDispatch;
-import org.apache.activemq.command.MessageId;
-import org.apache.activemq.command.ProducerId;
-import org.apache.activemq.command.ProducerInfo;
-import org.apache.activemq.command.RemoveSubscriptionInfo;
-import org.apache.activemq.command.Response;
-import org.apache.activemq.command.SessionId;
-import org.apache.activemq.command.SessionInfo;
-import org.apache.activemq.command.ShutdownInfo;
-import org.apache.activemq.command.TransactionId;
-import org.apache.activemq.command.TransactionInfo;
+import org.apache.activemq.command.*;
 import org.apache.activemq.util.ByteArrayOutputStream;
-import org.apache.activemq.util.FactoryFinder;
-import org.apache.activemq.util.IOExceptionSupport;
-import org.apache.activemq.util.IdGenerator;
-import org.apache.activemq.util.IntrospectionSupport;
-import org.apache.activemq.util.LongSequenceGenerator;
+import org.apache.activemq.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.jms.JMSException;
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 /**
  * @author <a href="http://hiramchirino.com">chirino</a>
  */
@@ -485,7 +445,7 @@ public class ProtocolConverter {
             throw new ProtocolException("SUBSCRIBE received without a subscription id!");
         }
 
-        ActiveMQDestination actualDest = translator.convertDestination(this, destination);
+        ActiveMQDestination actualDest = translator.convertDestination(this, destination, true);
 
         if (actualDest == null) {
             throw new ProtocolException("Invalid Destination.");
@@ -511,7 +471,7 @@ public class ProtocolConverter {
 
         IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
 
-        consumerInfo.setDestination(translator.convertDestination(this, destination));
+        consumerInfo.setDestination(translator.convertDestination(this, destination, true));
 
         StompSubscription stompSubscription;
         if (!consumerInfo.isBrowser()) {
@@ -548,7 +508,7 @@ public class ProtocolConverter {
         ActiveMQDestination destination = null;
         Object o = headers.get(Stomp.Headers.Unsubscribe.DESTINATION);
         if (o != null) {
-            destination = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertDestination(this, (String)o);
+            destination = findTranslator(command.getHeaders().get(Stomp.Headers.TRANSFORMATION)).convertDestination(this, (String)o, true);
         }
 
         String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID);

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java?rev=1221199&r1=1221198&r2=1221199&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/StompTest.java Tue Dec 20 11:06:40 2011
@@ -16,31 +16,6 @@
  */
 package org.apache.activemq.transport.stomp;
 
-import java.io.IOException;
-import java.net.Socket;
-import java.net.SocketTimeoutException;
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.net.UnknownHostException;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import javax.jms.BytesMessage;
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-import javax.jms.TextMessage;
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
-
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.CombinationTestSupport;
 import org.apache.activemq.broker.BrokerFactory;
@@ -52,6 +27,18 @@ import org.apache.activemq.command.Activ
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.jms.*;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
+import java.io.IOException;
+import java.net.*;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
 public class StompTest extends CombinationTestSupport {
     private static final Logger LOG = LoggerFactory.getLogger(StompTest.class);
 
@@ -1707,6 +1694,27 @@ public class StompTest extends Combinati
         assertEquals(2, queueView.getDequeueCount());
         assertEquals(0, queueView.getQueueSize());
     }
+    
+    public void testReplytoModification() throws Exception {
+    	String replyto = "some destination";
+        String frame = "CONNECT\n" + "login: system\n" + "passcode: manager\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = stompConnection.receiveFrame();
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        frame = "SUBSCRIBE\n" + "destination:/queue/" + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        
+        frame = "SEND\n" + "destination:/queue/" + getQueueName() + "\n" + "reply-to:" + replyto + "\n\nhello world" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        StompFrame message = stompConnection.receive();
+        assertTrue(message.getAction().equals("MESSAGE"));
+        assertEquals(replyto, message.getHeaders().get("reply-to"));
+
+        stompConnection.sendFrame("DISCONNECT\n" + "\n\n" + Stomp.NULL);
+    }
 
     public void testReplyToDestinationNaming() throws Exception {