You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2011/08/12 22:29:31 UTC

svn commit: r1157238 [2/3] - in /activemq/trunk: activemq-core/ activemq-core/src/main/filtered-resources/ activemq-core/src/main/filtered-resources/org/ activemq-core/src/main/filtered-resources/org/apache/ activemq-core/src/main/filtered-resources/or...

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java Fri Aug 12 20:29:29 2011
@@ -16,10 +16,16 @@
  */
 package org.apache.activemq.transport.stomp;
 
+import java.io.BufferedReader;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
 import java.io.OutputStreamWriter;
 import java.io.PrintWriter;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -34,6 +40,7 @@ import org.apache.activemq.command.Activ
 import org.apache.activemq.command.ActiveMQTempQueue;
 import org.apache.activemq.command.ActiveMQTempTopic;
 import org.apache.activemq.command.Command;
+import org.apache.activemq.command.CommandTypes;
 import org.apache.activemq.command.ConnectionError;
 import org.apache.activemq.command.ConnectionId;
 import org.apache.activemq.command.ConnectionInfo;
@@ -62,7 +69,6 @@ import org.apache.activemq.util.Introspe
 import org.apache.activemq.util.LongSequenceGenerator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.context.ApplicationContextAware;
 
 /**
  * @author <a href="http://hiramchirino.com">chirino</a>
@@ -70,9 +76,29 @@ import org.springframework.context.Appli
 public class ProtocolConverter {
 
     private static final Logger LOG = LoggerFactory.getLogger(ProtocolConverter.class);
-    
+
     private static final IdGenerator CONNECTION_ID_GENERATOR = new IdGenerator();
 
+    private static final String BROKER_VERSION;
+    private static final StompFrame ping = new StompFrame(Stomp.Commands.KEEPALIVE);
+
+    private static final long DEFAULT_OUTBOUND_HEARTBEAT = 100;
+    private static final long DEFAULT_INBOUND_HEARTBEAT = 1000;
+    private static final long DEFAULT_INITIAL_HEARTBEAT_DELAY = 1000;
+
+    static {
+        InputStream in = null;
+        String version = "5.6.0";
+        if ((in = ProtocolConverter.class.getResourceAsStream("/org/apache/activemq/version.txt")) != null) {
+            BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+            try {
+                version = reader.readLine();
+            } catch(Exception e) {
+            }
+        }
+        BROKER_VERSION = version;
+    }
+
     private final ConnectionId connectionId = new ConnectionId(CONNECTION_ID_GENERATOR.generateId());
     private final SessionId sessionId = new SessionId(connectionId, -1);
     private final ProducerId producerId = new ProducerId(sessionId, 1);
@@ -84,6 +110,7 @@ public class ProtocolConverter {
 
     private final ConcurrentHashMap<Integer, ResponseHandler> resposeHandlers = new ConcurrentHashMap<Integer, ResponseHandler>();
     private final ConcurrentHashMap<ConsumerId, StompSubscription> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, StompSubscription>();
+    private final ConcurrentHashMap<String, StompSubscription> subscriptions = new ConcurrentHashMap<String, StompSubscription>();
     private final ConcurrentHashMap<String, ActiveMQDestination> tempDestinations = new ConcurrentHashMap<String, ActiveMQDestination>();
     private final ConcurrentHashMap<String, String> tempDestinationAmqToStompMap = new ConcurrentHashMap<String, String>();
     private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>();
@@ -92,13 +119,15 @@ public class ProtocolConverter {
     private final Object commnadIdMutex = new Object();
     private int lastCommandId;
     private final AtomicBoolean connected = new AtomicBoolean(false);
-    private final FrameTranslator frameTranslator;
+    private final FrameTranslator frameTranslator = new LegacyFrameTranslator();
     private final FactoryFinder FRAME_TRANSLATOR_FINDER = new FactoryFinder("META-INF/services/org/apache/activemq/transport/frametranslator/");
     private final BrokerContext brokerContext;
+    private String version = "1.0";
+    private long hbReadInterval = DEFAULT_INBOUND_HEARTBEAT;
+    private long hbWriteInterval = DEFAULT_OUTBOUND_HEARTBEAT;
 
-    public ProtocolConverter(StompTransport stompTransport, FrameTranslator translator, BrokerContext brokerContext) {
+    public ProtocolConverter(StompTransport stompTransport, BrokerContext brokerContext) {
         this.stompTransport = stompTransport;
-        this.frameTranslator = translator;
         this.brokerContext = brokerContext;
     }
 
@@ -178,6 +207,8 @@ public class ProtocolConverter {
                 onStompSend(command);
             } else if (action.startsWith(Stomp.Commands.ACK)) {
                 onStompAck(command);
+            } else if (action.startsWith(Stomp.Commands.NACK)) {
+                onStompNack(command);
             } else if (action.startsWith(Stomp.Commands.BEGIN)) {
                 onStompBegin(command);
             } else if (action.startsWith(Stomp.Commands.COMMIT)) {
@@ -188,7 +219,8 @@ public class ProtocolConverter {
                 onStompSubscribe(command);
             } else if (action.startsWith(Stomp.Commands.UNSUBSCRIBE)) {
                 onStompUnsubscribe(command);
-            } else if (action.startsWith(Stomp.Commands.CONNECT)) {
+            } else if (action.startsWith(Stomp.Commands.CONNECT) ||
+                       action.startsWith(Stomp.Commands.STOMP)) {
                 onStompConnect(command);
             } else if (action.startsWith(Stomp.Commands.DISCONNECT)) {
                 onStompDisconnect(command);
@@ -199,7 +231,7 @@ public class ProtocolConverter {
         } catch (ProtocolException e) {
             handleException(e, command);
             // Some protocol errors can cause the connection to get closed.
-            if( e.isFatal() ) {
+            if (e.isFatal()) {
                getStompTransport().onException(e);
             }
         }
@@ -219,6 +251,7 @@ public class ProtocolConverter {
 
         HashMap<String, String> headers = new HashMap<String, String>();
         headers.put(Stomp.Headers.Error.MESSAGE, exception.getMessage());
+        headers.put(Stomp.Headers.CONTENT_TYPE, "text/plain");
 
         if (command != null) {
             final String receiptId = command.getHeaders().get(Stomp.Headers.RECEIPT_REQUESTED);
@@ -235,6 +268,11 @@ public class ProtocolConverter {
         checkConnected();
 
         Map<String, String> headers = command.getHeaders();
+        String destination = headers.get(Stomp.Headers.Send.DESTINATION);
+        if (destination == null) {
+            throw new ProtocolException("SEND received without a Destination specified!");
+        }
+
         String stompTx = headers.get(Stomp.Headers.TRANSACTION);
         headers.remove("transaction");
 
@@ -255,24 +293,64 @@ public class ProtocolConverter {
 
         message.onSend();
         sendToActiveMQ(message, createResponseHandler(command));
+    }
+
+    protected void onStompNack(StompFrame command) throws ProtocolException {
 
+        checkConnected();
+
+        if (this.version.equals(Stomp.V1_1)) {
+            throw new ProtocolException("NACK received but connection is in v1.0 mode.");
+        }
+
+        Map<String, String> headers = command.getHeaders();
+
+        String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
+        if (subscriptionId == null) {
+            throw new ProtocolException("NACK received without a subscription id for acknowledge!");
+        }
+
+        String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
+        if (messageId == null) {
+            throw new ProtocolException("NACK received without a message-id to acknowledge!");
+        }
+
+        TransactionId activemqTx = null;
+        String stompTx = headers.get(Stomp.Headers.TRANSACTION);
+        if (stompTx != null) {
+            activemqTx = transactions.get(stompTx);
+            if (activemqTx == null) {
+                throw new ProtocolException("Invalid transaction id: " + stompTx);
+            }
+        }
+
+        if (subscriptionId != null) {
+            StompSubscription sub = this.subscriptions.get(subscriptionId);
+            if (sub != null) {
+                MessageAck ack = sub.onStompMessageNack(messageId, activemqTx);
+                if (ack != null) {
+                    sendToActiveMQ(ack, createResponseHandler(command));
+                } else {
+                    throw new ProtocolException("Unexpected NACK received for message-id [" + messageId + "]");
+                }
+            }
+        }
     }
 
     protected void onStompAck(StompFrame command) throws ProtocolException {
         checkConnected();
 
-        // TODO: acking with just a message id is very bogus
-        // since the same message id could have been sent to 2 different
-        // subscriptions
-        // on the same stomp connection. For example, when 2 subs are created on
-        // the same topic.
-
         Map<String, String> headers = command.getHeaders();
         String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
         if (messageId == null) {
             throw new ProtocolException("ACK received without a message-id to acknowledge!");
         }
 
+        String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
+        if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
+            throw new ProtocolException("ACK received without a subscription id for acknowledge!");
+        }
+
         TransactionId activemqTx = null;
         String stompTx = headers.get(Stomp.Headers.TRANSACTION);
         if (stompTx != null) {
@@ -283,21 +361,37 @@ public class ProtocolConverter {
         }
 
         boolean acked = false;
-        for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
-            StompSubscription sub = iter.next();
-            MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
-            if (ack != null) {
-                ack.setTransactionId(activemqTx);
-                sendToActiveMQ(ack, createResponseHandler(command));
-                acked = true;
-                break;
+
+        if (subscriptionId != null) {
+
+            StompSubscription sub = this.subscriptions.get(subscriptionId);
+            if (sub != null) {
+                MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
+                if (ack != null) {
+                    sendToActiveMQ(ack, createResponseHandler(command));
+                    acked = true;
+                }
+            }
+
+        } else {
+
+            // TODO: acking with just a message id is very bogus since the same message id
+            // could have been sent to 2 different subscriptions on the same Stomp connection.
+            // For example, when 2 subs are created on the same topic.
+
+            for (StompSubscription sub : subscriptionsByConsumerId.values()) {
+                MessageAck ack = sub.onStompMessageAck(messageId, activemqTx);
+                if (ack != null) {
+                    sendToActiveMQ(ack, createResponseHandler(command));
+                    acked = true;
+                    break;
+                }
             }
         }
 
         if (!acked) {
             throw new ProtocolException("Unexpected ACK received for message-id [" + messageId + "]");
         }
-
     }
 
     protected void onStompBegin(StompFrame command) throws ProtocolException {
@@ -324,7 +418,6 @@ public class ProtocolConverter {
         tx.setType(TransactionInfo.BEGIN);
 
         sendToActiveMQ(tx, createResponseHandler(command));
-
     }
 
     protected void onStompCommit(StompFrame command) throws ProtocolException {
@@ -342,8 +435,7 @@ public class ProtocolConverter {
             throw new ProtocolException("Invalid transaction id: " + stompTx);
         }
 
-        for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
-            StompSubscription sub = iter.next();
+        for (StompSubscription sub : subscriptionsByConsumerId.values()) {
             sub.onStompCommit(activemqTx);
         }
 
@@ -353,7 +445,6 @@ public class ProtocolConverter {
         tx.setType(TransactionInfo.COMMIT_ONE_PHASE);
 
         sendToActiveMQ(tx, createResponseHandler(command));
-
     }
 
     protected void onStompAbort(StompFrame command) throws ProtocolException {
@@ -369,8 +460,7 @@ public class ProtocolConverter {
         if (activemqTx == null) {
             throw new ProtocolException("Invalid transaction id: " + stompTx);
         }
-        for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
-            StompSubscription sub = iter.next();
+        for (StompSubscription sub : subscriptionsByConsumerId.values()) {
             try {
                 sub.onStompAbort(activemqTx);
             } catch (Exception e) {
@@ -384,7 +474,6 @@ public class ProtocolConverter {
         tx.setType(TransactionInfo.ROLLBACK);
 
         sendToActiveMQ(tx, createResponseHandler(command));
-
     }
 
     protected void onStompSubscribe(StompFrame command) throws ProtocolException {
@@ -395,6 +484,10 @@ public class ProtocolConverter {
         String subscriptionId = headers.get(Stomp.Headers.Subscribe.ID);
         String destination = headers.get(Stomp.Headers.Subscribe.DESTINATION);
 
+        if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
+            throw new ProtocolException("SUBSCRIBE received without a subscription id!");
+        }
+
         ActiveMQDestination actualDest = translator.convertDestination(this, destination);
 
         if (actualDest == null) {
@@ -406,6 +499,16 @@ public class ProtocolConverter {
         consumerInfo.setPrefetchSize(1000);
         consumerInfo.setDispatchAsync(true);
 
+        String browser = headers.get(Stomp.Headers.Subscribe.BROWSER);
+        if (browser != null && browser.equals(Stomp.TRUE)) {
+
+            if (!this.version.equals(Stomp.V1_1)) {
+                throw new ProtocolException("Queue Browser feature only valid for Stomp v1.1 clients!");
+            }
+
+            consumerInfo.setBrowser(true);
+        }
+
         String selector = headers.remove(Stomp.Headers.Subscribe.SELECTOR);
         consumerInfo.setSelector(selector);
 
@@ -413,7 +516,12 @@ public class ProtocolConverter {
 
         consumerInfo.setDestination(translator.convertDestination(this, destination));
 
-        StompSubscription stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
+        StompSubscription stompSubscription;
+        if (!consumerInfo.isBrowser()) {
+            stompSubscription = new StompSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
+        } else {
+            stompSubscription = new StompQueueBrowserSubscription(this, subscriptionId, consumerInfo, headers.get(Stomp.Headers.TRANSFORMATION));
+        }
         stompSubscription.setDestination(actualDest);
 
         String ackMode = headers.get(Stomp.Headers.Subscribe.ACK_MODE);
@@ -426,8 +534,11 @@ public class ProtocolConverter {
         }
 
         subscriptionsByConsumerId.put(id, stompSubscription);
+        // Stomp v1.0 doesn't need to set this header so we avoid an NPE if not set.
+        if (subscriptionId != null) {
+            subscriptions.put(subscriptionId, stompSubscription);
+        }
         sendToActiveMQ(consumerInfo, createResponseHandler(command));
-
     }
 
     protected void onStompUnsubscribe(StompFrame command) throws ProtocolException {
@@ -441,6 +552,9 @@ public class ProtocolConverter {
         }
 
         String subscriptionId = headers.get(Stomp.Headers.Unsubscribe.ID);
+        if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
+            throw new ProtocolException("UNSUBSCRIBE received without a subscription id!");
+        }
 
         if (subscriptionId == null && destination == null) {
             throw new ProtocolException("Must specify the subscriptionId or the destination you are unsubscribing from");
@@ -457,18 +571,26 @@ public class ProtocolConverter {
             return;
         }
 
-        // TODO: Unsubscribing using a destination is a bit wierd if multiple
-        // subscriptions
-        // are created with the same destination. Perhaps this should be
-        // removed.
-        //
-        for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
-            StompSubscription sub = iter.next();
-            if ((subscriptionId != null && subscriptionId.equals(sub.getSubscriptionId())) || (destination != null && destination.equals(sub.getDestination()))) {
+        if (subscriptionId != null) {
+
+            StompSubscription sub = this.subscriptions.remove(subscriptionId);
+            if (sub != null) {
                 sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
-                iter.remove();
                 return;
             }
+
+        } else {
+
+            // Unsubscribing using a destination is a bit weird if multiple subscriptions
+            // are created with the same destination.
+            for (Iterator<StompSubscription> iter = subscriptionsByConsumerId.values().iterator(); iter.hasNext();) {
+                StompSubscription sub = iter.next();
+                if (destination != null && destination.equals(sub.getDestination())) {
+                    sendToActiveMQ(sub.getConsumerInfo().createRemoveCommand(), createResponseHandler(command));
+                    iter.remove();
+                    return;
+                }
+            }
         }
 
         throw new ProtocolException("No subscription matched.");
@@ -488,10 +610,28 @@ public class ProtocolConverter {
         String login = headers.get(Stomp.Headers.Connect.LOGIN);
         String passcode = headers.get(Stomp.Headers.Connect.PASSCODE);
         String clientId = headers.get(Stomp.Headers.Connect.CLIENT_ID);
+        String heartBeat = headers.get(Stomp.Headers.Connect.HEART_BEAT);
+        String accepts = headers.get(Stomp.Headers.Connect.ACCEPT_VERSION);
 
+        if (accepts == null) {
+            accepts = Stomp.DEFAULT_VERSION;
+        }
+        if (heartBeat == null) {
+            heartBeat = Stomp.DEFAULT_HEART_BEAT;
+        }
 
-        IntrospectionSupport.setProperties(connectionInfo, headers, "activemq.");
+        HashSet<String> acceptsVersions = new HashSet<String>(Arrays.asList(accepts.split(Stomp.COMMA)));
+        acceptsVersions.retainAll(Arrays.asList(Stomp.SUPPORTED_PROTOCOL_VERSIONS));
+        if (acceptsVersions.isEmpty()) {
+            throw new ProtocolException("Invlid Protocol version, supported versions are: " +
+                                        Arrays.toString(Stomp.SUPPORTED_PROTOCOL_VERSIONS), true);
+        } else {
+            this.version = Collections.max(acceptsVersions);
+        }
 
+        configureInactivityMonitor(heartBeat);
+
+        IntrospectionSupport.setProperties(connectionInfo, headers, "activemq.");
         connectionInfo.setConnectionId(connectionId);
         if (clientId != null) {
             connectionInfo.setClientId(clientId);
@@ -544,10 +684,22 @@ public class ProtocolConverter {
                             responseHeaders.put(Stomp.Headers.Response.RECEIPT_ID, requestId);
                         }
 
+                        responseHeaders.put(Stomp.Headers.Connected.VERSION, version);
+                        responseHeaders.put(Stomp.Headers.Connected.HEART_BEAT,
+                                            String.format("%d,%d", hbWriteInterval, hbReadInterval));
+                        responseHeaders.put(Stomp.Headers.Connected.SERVER, "ActiveMQ/"+BROKER_VERSION);
+
                         StompFrame sc = new StompFrame();
                         sc.setAction(Stomp.Responses.CONNECTED);
                         sc.setHeaders(responseHeaders);
                         sendToStomp(sc);
+
+                        if (version.equals(Stomp.V1_1)) {
+                            StompWireFormat format = stompTransport.getWireFormat();
+                            if (format != null) {
+                                format.setEncodingEnabled(true);
+                            }
+                        }
                     }
                 });
 
@@ -576,7 +728,6 @@ public class ProtocolConverter {
      */
     public void onActiveMQCommand(Command command) throws IOException, JMSException {
         if (command.isResponse()) {
-
             Response response = (Response)command;
             ResponseHandler rh = resposeHandlers.remove(Integer.valueOf(response.getCorrelationId()));
             if (rh != null) {
@@ -589,12 +740,13 @@ public class ProtocolConverter {
                 }
             }
         } else if (command.isMessageDispatch()) {
-
             MessageDispatch md = (MessageDispatch)command;
             StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
             if (sub != null) {
                 sub.onMessageDispatch(md);
             }
+        } else if (command.getDataStructureType() == CommandTypes.KEEP_ALIVE_INFO) {
+            stompTransport.sendToStomp(ping);
         } else if (command.getDataStructureType() == ConnectionError.DATA_STRUCTURE_TYPE) {
             // Pass down any unexpected async errors. Should this close the connection?
             Throwable exception = ((ConnectionError)command).getException();
@@ -643,4 +795,49 @@ public class ProtocolConverter {
     public String getCreatedTempDestinationName(ActiveMQDestination destination) {
         return tempDestinationAmqToStompMap.get(destination.getQualifiedName());
     }
+
+    protected void configureInactivityMonitor(String heartBeatConfig) throws ProtocolException {
+
+        String[] keepAliveOpts = heartBeatConfig.split(Stomp.COMMA);
+
+        if (keepAliveOpts == null || keepAliveOpts.length != 2) {
+            throw new ProtocolException("Invlid heart-beat header:" + heartBeatConfig, true);
+        } else {
+
+            try {
+                hbReadInterval = Long.parseLong(keepAliveOpts[0]);
+                hbWriteInterval = Long.parseLong(keepAliveOpts[1]);
+            } catch(NumberFormatException e) {
+                throw new ProtocolException("Invlid heart-beat header:" + heartBeatConfig, true);
+            }
+
+            if (hbReadInterval > 0) {
+                hbReadInterval = Math.max(DEFAULT_INBOUND_HEARTBEAT, hbReadInterval);
+                hbReadInterval += Math.min(hbReadInterval, 5000);
+            }
+
+            if (hbWriteInterval > 0) {
+                hbWriteInterval = Math.max(DEFAULT_OUTBOUND_HEARTBEAT, hbWriteInterval);
+            }
+
+            try {
+
+                StompInactivityMonitor monitor = this.stompTransport.getInactivityMonitor();
+
+                monitor.setReadCheckTime(hbReadInterval);
+                monitor.setInitialDelayTime(DEFAULT_INITIAL_HEARTBEAT_DELAY);
+                monitor.setWriteCheckTime(hbWriteInterval);
+
+                monitor.startMonitoring();
+
+            } catch(Exception ex) {
+                hbReadInterval = 0;
+                hbWriteInterval = 0;
+            }
+
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Stomp Connect heartbeat conf RW[" + hbReadInterval + "," + hbWriteInterval + "]");
+            }
+        }
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java Fri Aug 12 20:29:29 2011
@@ -20,7 +20,29 @@ public interface Stomp {
     String NULL = "\u0000";
     String NEWLINE = "\n";
 
+    byte BREAK = '\n';
+    byte COLON = ':';
+    byte ESCAPE = '\\';
+    byte[] ESCAPE_ESCAPE_SEQ = { 92, 92 };
+    byte[] COLON_ESCAPE_SEQ = { 92, 99 };
+    byte[] NEWLINE_ESCAPE_SEQ = { 92, 110 };
+
+    String COMMA = ",";
+    String V1_0 = "1.0";
+    String V1_1 = "1.1";
+    String DEFAULT_HEART_BEAT = "0,0";
+    String DEFAULT_VERSION = "1.0";
+    String EMPTY = "";
+
+    String[] SUPPORTED_PROTOCOL_VERSIONS = {"1.1", "1.0"};
+
+    String TEXT_PLAIN = "text/plain";
+    String TRUE = "true";
+    String FALSE = "false";
+    String END = "end";
+
     public static interface Commands {
+        String STOMP = "STOMP";
         String CONNECT = "CONNECT";
         String SEND = "SEND";
         String DISCONNECT = "DISCONNECT";
@@ -34,6 +56,8 @@ public interface Stomp {
         String COMMIT = "COMMIT";
         String ABORT = "ABORT";
         String ACK = "ACK";
+        String NACK = "NACK";
+        String KEEPALIVE = "KEEPALIVE";
     }
 
     public interface Responses {
@@ -48,8 +72,10 @@ public interface Stomp {
         String RECEIPT_REQUESTED = "receipt";
         String TRANSACTION = "transaction";
         String CONTENT_LENGTH = "content-length";
+        String CONTENT_TYPE = "content-type";
         String TRANSFORMATION = "transformation";
         String TRANSFORMATION_ERROR = "transformation-error";
+
         /**
          * This header is used to instruct ActiveMQ to construct the message
          * based with a specific type.
@@ -81,6 +107,7 @@ public interface Stomp {
             String TIMESTAMP = "timestamp";
             String TYPE = "type";
             String SUBSCRIPTION = "subscription";
+            String BROWSER = "browser";
             String USERID = "JMSXUserID";
             String ORIGINAL_DESTINATION = "original-destination";
         }
@@ -90,6 +117,7 @@ public interface Stomp {
             String ACK_MODE = "ack";
             String ID = "id";
             String SELECTOR = "selector";
+            String BROWSER = "browser";
 
             public interface AckModeValues {
                 String AUTO = "auto";
@@ -108,6 +136,9 @@ public interface Stomp {
             String PASSCODE = "passcode";
             String CLIENT_ID = "client-id";
             String REQUEST_ID = "request-id";
+            String ACCEPT_VERSION = "accept-version";
+            String HOST = "host";
+            String HEART_BEAT = "heart-beat";
         }
 
         public interface Error {
@@ -117,30 +148,34 @@ public interface Stomp {
         public interface Connected {
             String SESSION = "session";
             String RESPONSE_ID = "response-id";
+            String SERVER = "server";
+            String VERSION = "version";
+            String HEART_BEAT = "heart-beat";
         }
 
         public interface Ack {
             String MESSAGE_ID = "message-id";
+            String SUBSCRIPTION = "subscription";
         }
     }
 
-	public enum Transformations {
-		JMS_BYTE,
-		JMS_XML,
-		JMS_JSON,
-		JMS_OBJECT_XML,
-		JMS_OBJECT_JSON,
-		JMS_MAP_XML,
-		JMS_MAP_JSON,
-		JMS_ADVISORY_XML,
-		JMS_ADVISORY_JSON;
-
-		public String toString() {
-			return name().replaceAll("_", "-").toLowerCase();
-		}
-
-		public static Transformations getValue(String value) {
-			return valueOf(value.replaceAll("-", "_").toUpperCase());
-		}
-	}
+    public enum Transformations {
+        JMS_BYTE,
+        JMS_XML,
+        JMS_JSON,
+        JMS_OBJECT_XML,
+        JMS_OBJECT_JSON,
+        JMS_MAP_XML,
+        JMS_MAP_JSON,
+        JMS_ADVISORY_XML,
+        JMS_ADVISORY_JSON;
+
+        public String toString() {
+            return name().replaceAll("_", "-").toLowerCase();
+        }
+
+        public static Transformations getValue(String value) {
+            return valueOf(value.replaceAll("-", "_").toUpperCase());
+        }
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompCodec.java Fri Aug 12 20:29:29 2011
@@ -19,7 +19,6 @@ package org.apache.activemq.transport.st
 import org.apache.activemq.transport.tcp.TcpTransport;
 import org.apache.activemq.util.ByteArrayOutputStream;
 import org.apache.activemq.util.DataByteArrayInputStream;
-import org.apache.activemq.wireformat.WireFormat;
 
 import java.io.ByteArrayInputStream;
 import java.util.HashMap;
@@ -41,55 +40,55 @@ public class StompCodec {
     }
 
     public void parse(ByteArrayInputStream input, int readSize) throws Exception {
-               int i = 0;
-               int b;
-               while(i++ < readSize) {
-                   b = input.read();
-                   // skip repeating nulls
-                   if (!processedHeaders && previousByte == 0 && b == 0) {
-                       continue;
+       int i = 0;
+       int b;
+       while(i++ < readSize) {
+           b = input.read();
+           // skip repeating nulls
+           if (!processedHeaders && previousByte == 0 && b == 0) {
+               continue;
+           }
+
+           if (!processedHeaders) {
+               currentCommand.write(b);
+               // end of headers section, parse action and header
+               if (previousByte == '\n' && b == '\n') {
+                   if (transport.getWireFormat() instanceof StompWireFormat) {
+                       DataByteArrayInputStream data = new DataByteArrayInputStream(currentCommand.toByteArray());
+                       action = ((StompWireFormat)transport.getWireFormat()).parseAction(data);
+                       headers = ((StompWireFormat)transport.getWireFormat()).parseHeaders(data);
+                       String contentLengthHeader = headers.get(Stomp.Headers.CONTENT_LENGTH);
+                       if (contentLengthHeader != null) {
+                           contentLength = ((StompWireFormat)transport.getWireFormat()).parseContentLength(contentLengthHeader);
+                       } else {
+                           contentLength = -1;
+                       }
                    }
+                   processedHeaders = true;
+                   currentCommand.reset();
+               }
+           } else {
 
-                   if (!processedHeaders) {
+               if (contentLength == -1) {
+                   // end of command reached, unmarshal
+                   if (b == 0) {
+                       processCommand();
+                   } else {
                        currentCommand.write(b);
-                       // end of headers section, parse action and header
-                       if (previousByte == '\n' && b == '\n') {
-                           if (transport.getWireFormat() instanceof StompWireFormat) {
-                               DataByteArrayInputStream data = new DataByteArrayInputStream(currentCommand.toByteArray());
-                               action = ((StompWireFormat)transport.getWireFormat()).parseAction(data);
-                               headers = ((StompWireFormat)transport.getWireFormat()).parseHeaders(data);
-                               String contentLengthHeader = headers.get(Stomp.Headers.CONTENT_LENGTH);
-                               if (contentLengthHeader != null) {
-                                   contentLength = ((StompWireFormat)transport.getWireFormat()).parseContentLength(contentLengthHeader);
-                               } else {
-                                   contentLength = -1;
-                               }
-                           }
-                           processedHeaders = true;
-                           currentCommand.reset();
-                       }
+                   }
+               } else {
+                   // read desired content length
+                   if (readLength++ == contentLength) {
+                       processCommand();
+                       readLength = 0;
                    } else {
-
-                       if (contentLength == -1) {
-                           // end of command reached, unmarshal
-                           if (b == 0) {
-                               processCommand();
-                           } else {
-                               currentCommand.write(b);
-                           }
-                       } else {
-                           // read desired content length
-                           if (readLength++ == contentLength) {
-                               processCommand();
-                               readLength = 0;
-                           } else {
-                               currentCommand.write(b);
-                           }
-                       }
+                       currentCommand.write(b);
                    }
-
-                   previousByte = b;
                }
+           }
+
+           previousByte = b;
+       }
     }
 
     protected void processCommand() throws Exception {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompConnection.java Fri Aug 12 20:29:29 2011
@@ -38,7 +38,7 @@ public class StompConnection {
     }
 
     public void open(Socket socket) {
-    	stompSocket = socket;
+        stompSocket = socket;
     }
 
     public void close() throws IOException {
@@ -70,8 +70,8 @@ public class StompConnection {
     }
 
     public StompFrame receive(long timeOut) throws Exception {
-    	stompSocket.setSoTimeout((int)timeOut);
-    	InputStream is = stompSocket.getInputStream();
+        stompSocket.setSoTimeout((int)timeOut);
+        InputStream is = stompSocket.getInputStream();
         StompWireFormat wf = new StompWireFormat();
         DataInputStream dis = new DataInputStream(is);
         return (StompFrame)wf.unmarshal(dis);
@@ -104,143 +104,143 @@ public class StompConnection {
         }
     }
 
-	private String stringFromBuffer(ByteArrayOutputStream inputBuffer) throws Exception {
-	    byte[] ba = inputBuffer.toByteArray();
+    private String stringFromBuffer(ByteArrayOutputStream inputBuffer) throws Exception {
+        byte[] ba = inputBuffer.toByteArray();
         inputBuffer.reset();
         return new String(ba, "UTF-8");
     }
 
     public Socket getStompSocket() {
-		return stompSocket;
-	}
+        return stompSocket;
+    }
 
-	public void setStompSocket(Socket stompSocket) {
-		this.stompSocket = stompSocket;
-	}
+    public void setStompSocket(Socket stompSocket) {
+        this.stompSocket = stompSocket;
+    }
 
     public void connect(String username, String password) throws Exception {
         connect(username, password, null);
     }
 
     public void connect(String username, String password, String client) throws Exception {
-    	HashMap<String, String> headers = new HashMap();
-    	headers.put("login", username);
-    	headers.put("passcode", password);
-    	if (client != null) {
-    		headers.put("client-id", client);
-    	}
-    	StompFrame frame = new StompFrame("CONNECT", headers);
+        HashMap<String, String> headers = new HashMap<String, String>();
+        headers.put("login", username);
+        headers.put("passcode", password);
+        if (client != null) {
+            headers.put("client-id", client);
+        }
+        StompFrame frame = new StompFrame("CONNECT", headers);
         sendFrame(frame.format());
 
         StompFrame connect = receive();
         if (!connect.getAction().equals(Stomp.Responses.CONNECTED)) {
-        	throw new Exception ("Not connected: " + connect.getBody());
+            throw new Exception ("Not connected: " + connect.getBody());
         }
     }
 
     public void disconnect() throws Exception {
-    	StompFrame frame = new StompFrame("DISCONNECT");
+        StompFrame frame = new StompFrame("DISCONNECT");
         sendFrame(frame.format());
     }
 
     public void send(String destination, String message) throws Exception {
-    	send(destination, message, null, null);
+        send(destination, message, null, null);
     }
 
     public void send(String destination, String message, String transaction, HashMap<String, String> headers) throws Exception {
-    	if (headers == null) {
-    		headers = new HashMap<String, String>();
-    	}
-    	headers.put("destination", destination);
-    	if (transaction != null) {
-    		headers.put("transaction", transaction);
-    	}
-    	StompFrame frame = new StompFrame("SEND", headers, message.getBytes());
+        if (headers == null) {
+            headers = new HashMap<String, String>();
+        }
+        headers.put("destination", destination);
+        if (transaction != null) {
+            headers.put("transaction", transaction);
+        }
+        StompFrame frame = new StompFrame("SEND", headers, message.getBytes());
         sendFrame(frame.format());
     }
 
     public void subscribe(String destination) throws Exception {
-    	subscribe(destination, null, null);
+        subscribe(destination, null, null);
     }
 
     public void subscribe(String destination, String ack) throws Exception {
-    	subscribe(destination, ack, new HashMap<String, String>());
+        subscribe(destination, ack, new HashMap<String, String>());
     }
 
     public void subscribe(String destination, String ack, HashMap<String, String> headers) throws Exception {
-		if (headers == null) {
-			headers = new HashMap<String, String>();
-		}
-		headers.put("destination", destination);
-    	if (ack != null) {
-    		headers.put("ack", ack);
-    	}
-    	StompFrame frame = new StompFrame("SUBSCRIBE", headers);
+        if (headers == null) {
+            headers = new HashMap<String, String>();
+        }
+        headers.put("destination", destination);
+        if (ack != null) {
+            headers.put("ack", ack);
+        }
+        StompFrame frame = new StompFrame("SUBSCRIBE", headers);
         sendFrame(frame.format());
     }
 
     public void unsubscribe(String destination) throws Exception {
-    	unsubscribe(destination, null);
+        unsubscribe(destination, null);
     }
 
     public void unsubscribe(String destination, HashMap<String, String> headers) throws Exception {
-		if (headers == null) {
-			headers = new HashMap<String, String>();
-		}
-		headers.put("destination", destination);
-    	StompFrame frame = new StompFrame("UNSUBSCRIBE", headers);
+        if (headers == null) {
+            headers = new HashMap<String, String>();
+        }
+        headers.put("destination", destination);
+        StompFrame frame = new StompFrame("UNSUBSCRIBE", headers);
         sendFrame(frame.format());
-    }    
-    
+    }
+
     public void begin(String transaction) throws Exception {
-    	HashMap<String, String> headers = new HashMap<String, String>();
-    	headers.put("transaction", transaction);
-    	StompFrame frame = new StompFrame("BEGIN", headers);
-    	sendFrame(frame.format());
+        HashMap<String, String> headers = new HashMap<String, String>();
+        headers.put("transaction", transaction);
+        StompFrame frame = new StompFrame("BEGIN", headers);
+        sendFrame(frame.format());
     }
 
     public void abort(String transaction) throws Exception {
-    	HashMap<String, String> headers = new HashMap<String, String>();
-    	headers.put("transaction", transaction);
-    	StompFrame frame = new StompFrame("ABORT", headers);
-    	sendFrame(frame.format());
+        HashMap<String, String> headers = new HashMap<String, String>();
+        headers.put("transaction", transaction);
+        StompFrame frame = new StompFrame("ABORT", headers);
+        sendFrame(frame.format());
     }
 
     public void commit(String transaction) throws Exception {
-    	HashMap<String, String> headers = new HashMap<String, String>();
-    	headers.put("transaction", transaction);
-    	StompFrame frame = new StompFrame("COMMIT", headers);
-    	sendFrame(frame.format());
+        HashMap<String, String> headers = new HashMap<String, String>();
+        headers.put("transaction", transaction);
+        StompFrame frame = new StompFrame("COMMIT", headers);
+        sendFrame(frame.format());
     }
 
     public void ack(StompFrame frame) throws Exception {
-    	ack(frame.getHeaders().get("message-id"), null);
+        ack(frame.getHeaders().get("message-id"), null);
     }
 
     public void ack(StompFrame frame, String transaction) throws Exception {
-    	ack(frame.getHeaders().get("message-id"), transaction);
+        ack(frame.getHeaders().get("message-id"), transaction);
     }
 
     public void ack(String messageId) throws Exception {
-    	ack(messageId, null);
+        ack(messageId, null);
     }
 
     public void ack(String messageId, String transaction) throws Exception {
-    	HashMap<String, String> headers = new HashMap<String, String>();
-    	headers.put("message-id", messageId);
-    	if (transaction != null)
-    		headers.put("transaction", transaction);
-    	StompFrame frame = new StompFrame("ACK", headers);
-    	sendFrame(frame.format());
+        HashMap<String, String> headers = new HashMap<String, String>();
+        headers.put("message-id", messageId);
+        if (transaction != null)
+            headers.put("transaction", transaction);
+        StompFrame frame = new StompFrame("ACK", headers);
+        sendFrame(frame.format());
     }
 
     protected String appendHeaders(HashMap<String, Object> headers) {
-    	StringBuffer result = new StringBuffer();
-    	for (String key : headers.keySet()) {
-    		result.append(key + ":" + headers.get(key) + "\n");
-    	}
-    	result.append("\n");
-    	return result.toString();
+        StringBuilder result = new StringBuilder();
+        for (String key : headers.keySet()) {
+            result.append(key + ":" + headers.get(key) + "\n");
+        }
+        result.append("\n");
+        return result.toString();
     }
 
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompFrame.java Fri Aug 12 20:29:29 2011
@@ -19,7 +19,6 @@ package org.apache.activemq.transport.st
 import java.io.UnsupportedEncodingException;
 import java.util.Arrays;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 
 import org.apache.activemq.command.Command;
@@ -180,12 +179,11 @@ public class StompFrame implements Comma
     }
 
     public String format(boolean forLogging) {
-        StringBuffer buffer = new StringBuffer();
+        StringBuilder buffer = new StringBuilder();
         buffer.append(getAction());
         buffer.append("\n");
-        Map headers = getHeaders();
-        for (Iterator iter = headers.entrySet().iterator(); iter.hasNext();) {
-            Map.Entry entry = (Map.Entry)iter.next();
+        Map<String, String> headers = getHeaders();
+        for (Map.Entry<String, String> entry : headers.entrySet()) {
             buffer.append(entry.getKey());
             buffer.append(":");
             if (forLogging && entry.getKey().toString().toLowerCase().contains(Stomp.Headers.Connect.PASSCODE)) {

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompInactivityMonitor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompInactivityMonitor.java?rev=1157238&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompInactivityMonitor.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompInactivityMonitor.java Fri Aug 12 20:29:29 2011
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+
+import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.transport.AbstractInactivityMonitor;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.wireformat.WireFormat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Used to make sure that commands are arriving periodically from the peer of
+ * the transport.
+ */
+public class StompInactivityMonitor extends AbstractInactivityMonitor {
+
+    private static final Logger LOG = LoggerFactory.getLogger(StompInactivityMonitor.class);
+
+    private boolean isConfigured = false;
+
+    public StompInactivityMonitor(Transport next, WireFormat wireFormat) {
+        super(next, wireFormat);
+    }
+
+    public void startMonitoring() throws IOException {
+        this.isConfigured = true;
+        this.startMonitorThreads();
+    }
+
+    @Override
+    protected void processInboundWireFormatInfo(WireFormatInfo info) throws IOException {
+    }
+
+    @Override
+    protected void processOutboundWireFormatInfo(WireFormatInfo info) throws IOException{
+    }
+
+    @Override
+    protected boolean configuredOk() throws IOException {
+
+        if (!isConfigured) {
+            return false;
+        }
+
+        LOG.debug("Stomp Inactivity Monitor read check: " + getReadCheckTime() +
+                  ", write check: " + getWriteCheckTime());
+
+        if (this.getReadCheckTime() >= 0 && this.getWriteCheckTime() >= 0) {
+            return true;
+        }
+
+        return false;
+    }
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompInactivityMonitor.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompInactivityMonitor.java
------------------------------------------------------------------------------
    svn:executable = *

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransport.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransport.java Fri Aug 12 20:29:29 2011
@@ -17,12 +17,10 @@
 package org.apache.activemq.transport.stomp;
 
 import org.apache.activemq.transport.nio.NIOSSLTransport;
-import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.wireformat.WireFormat;
 
 import javax.net.SocketFactory;
 import java.io.ByteArrayInputStream;
-import java.io.EOFException;
 import java.io.IOException;
 import java.net.Socket;
 import java.net.URI;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransportFactory.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOSSLTransportFactory.java Fri Aug 12 20:29:29 2011
@@ -54,15 +54,15 @@ public class StompNIOSSLTransportFactory
         return new StompNIOSSLTransport(wf, socketFactory, location, localLocation);
     }
 
-        @Override
+    @Override
     public TransportServer doBind(URI location) throws IOException {
-         if (SslContext.getCurrentSslContext() != null) {
-             try {
-                 context = SslContext.getCurrentSslContext().getSSLContext();
-             } catch (Exception e) {
-                 throw new IOException(e);
-             }
-         }
+       if (SslContext.getCurrentSslContext() != null) {
+            try {
+                context = SslContext.getCurrentSslContext().getSSLContext();
+            } catch (Exception e) {
+                throw new IOException(e);
+            }
+        }
         return super.doBind(location);
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransport.java Fri Aug 12 20:29:29 2011
@@ -26,19 +26,14 @@ import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
-import java.util.HashMap;
 
 import javax.net.SocketFactory;
 
-import org.apache.activemq.command.Command;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.nio.NIOOutputStream;
 import org.apache.activemq.transport.nio.SelectorManager;
 import org.apache.activemq.transport.nio.SelectorSelection;
 import org.apache.activemq.transport.tcp.TcpTransport;
-import org.apache.activemq.util.ByteArrayOutputStream;
-import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.util.DataByteArrayInputStream;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.activemq.wireformat.WireFormat;
@@ -46,7 +41,7 @@ import org.apache.activemq.wireformat.Wi
 /**
  * An implementation of the {@link Transport} interface for using Stomp over NIO
  *
- * 
+ *
  */
 public class StompNIOTransport extends TcpTransport {
 
@@ -133,7 +128,7 @@ public class StompNIOTransport extends T
         try {
             selection.close();
         } catch (Exception e) {
-        	e.printStackTrace();
+            e.printStackTrace();
         }
         super.doStop(stopper);
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompNIOTransportFactory.java Fri Aug 12 20:29:29 2011
@@ -35,12 +35,11 @@ import org.apache.activemq.transport.tcp
 import org.apache.activemq.transport.tcp.TcpTransportServer;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.wireformat.WireFormat;
-import org.apache.activemq.xbean.XBeanBrokerService;
 
 /**
  * A <a href="http://stomp.codehaus.org/">STOMP</a> over NIO transport factory
- * 
- * 
+ *
+ *
  */
 public class StompNIOTransportFactory extends NIOTransportFactory implements BrokerServiceAware {
 
@@ -60,20 +59,15 @@ public class StompNIOTransportFactory ex
 
     protected TcpTransport createTcpTransport(WireFormat wf, SocketFactory socketFactory, URI location, URI localLocation) throws UnknownHostException, IOException {
         return new StompNIOTransport(wf, socketFactory, location, localLocation);
-    }  
+    }
 
+    @SuppressWarnings("rawtypes")
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
-        transport = new StompTransportFilter(transport, new LegacyFrameTranslator(), brokerContext);
+        transport = new StompTransportFilter(transport, format, brokerContext);
         IntrospectionSupport.setProperties(transport, options);
         return super.compositeConfigure(transport, format, options);
     }
 
-    protected boolean isUseInactivityMonitor(Transport transport) {
-        // lets disable the inactivity monitor as stomp does not use keep alive
-        // packets
-        return false;
-    }
-    
     public void setBrokerService(BrokerService brokerService) {
         this.brokerContext = brokerService.getBrokerContext();
     }

Added: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java?rev=1157238&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java (added)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java Fri Aug 12 20:29:29 2011
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.TransactionId;
+
+public class StompQueueBrowserSubscription extends StompSubscription {
+
+    public StompQueueBrowserSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
+        super(stompTransport, subscriptionId, consumerInfo, transformation);
+    }
+
+    @Override
+    void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
+
+        if (md.getMessage() != null) {
+            super.onMessageDispatch(md);
+        } else {
+            StompFrame browseDone = new StompFrame(Stomp.Responses.MESSAGE);
+            browseDone.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, this.getSubscriptionId());
+            browseDone.getHeaders().put(Stomp.Headers.Message.BROWSER, "end");
+            browseDone.getHeaders().put(Stomp.Headers.Message.DESTINATION,
+                    protocolConverter.findTranslator(null).convertDestination(protocolConverter, this.destination));
+            browseDone.getHeaders().put(Stomp.Headers.Message.MESSAGE_ID, "0");
+
+            protocolConverter.sendToStomp(browseDone);
+        }
+    }
+
+    @Override
+    public MessageAck onStompMessageNack(String messageId, TransactionId transactionId) throws ProtocolException {
+        throw new ProtocolException("Cannot Nack a message on a Queue Browser Subscription.");
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSslTransportFactory.java Fri Aug 12 20:29:29 2011
@@ -28,8 +28,8 @@ import org.apache.activemq.wireformat.Wi
 
 /**
  * A <a href="http://stomp.codehaus.org/">STOMP</a> over SSL transport factory
- * 
- * 
+ *
+ *
  */
 public class StompSslTransportFactory extends SslTransportFactory implements BrokerServiceAware {
 
@@ -39,8 +39,9 @@ public class StompSslTransportFactory ex
         return "stomp";
     }
 
+    @SuppressWarnings("rawtypes")
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
-        transport = new StompTransportFilter(transport, new LegacyFrameTranslator(), brokerContext);
+        transport = new StompTransportFilter(transport, format, brokerContext);
         IntrospectionSupport.setProperties(transport, options);
         return super.compositeConfigure(transport, format, options);
     }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java Fri Aug 12 20:29:29 2011
@@ -45,17 +45,16 @@ public class StompSubscription {
     public static final String CLIENT_ACK = Stomp.Headers.Subscribe.AckModeValues.CLIENT;
     public static final String INDIVIDUAL_ACK = Stomp.Headers.Subscribe.AckModeValues.INDIVIDUAL;
 
-    private final ProtocolConverter protocolConverter;
-    private final String subscriptionId;
-    private final ConsumerInfo consumerInfo;
-
-    private final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<MessageId, MessageDispatch>();
-    private final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<MessageDispatch>();
-
-    private String ackMode = AUTO_ACK;
-    private ActiveMQDestination destination;
-    private String transformation;
-
+    protected final ProtocolConverter protocolConverter;
+    protected final String subscriptionId;
+    protected final ConsumerInfo consumerInfo;
+
+    protected final LinkedHashMap<MessageId, MessageDispatch> dispatchedMessage = new LinkedHashMap<MessageId, MessageDispatch>();
+    protected final LinkedList<MessageDispatch> unconsumedMessage = new LinkedList<MessageDispatch>();
+
+    protected String ackMode = AUTO_ACK;
+    protected ActiveMQDestination destination;
+    protected String transformation;
 
     public StompSubscription(ProtocolConverter stompTransport, String subscriptionId, ConsumerInfo consumerInfo, String transformation) {
         this.protocolConverter = stompTransport;
@@ -82,12 +81,12 @@ public class StompSubscription {
         boolean ignoreTransformation = false;
 
         if (transformation != null && !( message instanceof ActiveMQBytesMessage ) ) {
-       		message.setReadOnlyProperties(false);
-        	message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation);
+               message.setReadOnlyProperties(false);
+            message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation);
         } else {
-        	if (message.getStringProperty(Stomp.Headers.TRANSFORMATION) != null) {
-        		ignoreTransformation = true;
-        	}
+            if (message.getStringProperty(Stomp.Headers.TRANSFORMATION) != null) {
+                ignoreTransformation = true;
+            }
         }
 
         StompFrame command = protocolConverter.convertMessage(message, ignoreTransformation);
@@ -101,24 +100,25 @@ public class StompSubscription {
     }
 
     synchronized void onStompAbort(TransactionId transactionId) {
-    	unconsumedMessage.clear();
+        unconsumedMessage.clear();
     }
 
     synchronized void onStompCommit(TransactionId transactionId) {
-    	for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
+        for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
+            @SuppressWarnings("rawtypes")
             Map.Entry entry = (Entry)iter.next();
-            MessageId id = (MessageId)entry.getKey();
             MessageDispatch msg = (MessageDispatch)entry.getValue();
             if (unconsumedMessage.contains(msg)) {
-            	iter.remove();
+                iter.remove();
             }
-    	}
-    	unconsumedMessage.clear();
+        }
+
+        unconsumedMessage.clear();
     }
 
     synchronized MessageAck onStompMessageAck(String messageId, TransactionId transactionId) {
 
-    	MessageId msgId = new MessageId(messageId);
+        MessageId msgId = new MessageId(messageId);
 
         if (!dispatchedMessage.containsKey(msgId)) {
             return null;
@@ -129,10 +129,11 @@ public class StompSubscription {
         ack.setConsumerId(consumerInfo.getConsumerId());
 
         if (ackMode == CLIENT_ACK) {
-        	ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
+            ack.setAckType(MessageAck.STANDARD_ACK_TYPE);
             int count = 0;
-            for (Iterator iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
+            for (Iterator<?> iter = dispatchedMessage.entrySet().iterator(); iter.hasNext();) {
 
+                @SuppressWarnings("rawtypes")
                 Map.Entry entry = (Entry)iter.next();
                 MessageId id = (MessageId)entry.getKey();
                 MessageDispatch msg = (MessageDispatch)entry.getValue();
@@ -142,39 +143,59 @@ public class StompSubscription {
                 }
 
                 if (transactionId != null) {
-                	if (!unconsumedMessage.contains(msg)) {
-                		unconsumedMessage.add(msg);
-                	}
+                    if (!unconsumedMessage.contains(msg)) {
+                        unconsumedMessage.add(msg);
+                    }
                 } else {
-                	iter.remove();
+                    iter.remove();
                 }
 
-
                 count++;
 
                 if (id.equals(msgId)) {
                     ack.setLastMessageId(id);
                     break;
                 }
-
             }
             ack.setMessageCount(count);
             if (transactionId != null) {
-            	ack.setTransactionId(transactionId);
+                ack.setTransactionId(transactionId);
             }
-        }
-        else if (ackMode == INDIVIDUAL_ACK) {
+
+        } else if (ackMode == INDIVIDUAL_ACK) {
             ack.setAckType(MessageAck.INDIVIDUAL_ACK_TYPE);
             ack.setMessageID(msgId);
             if (transactionId != null) {
-            	unconsumedMessage.add(dispatchedMessage.get(msgId));
-            	ack.setTransactionId(transactionId);
+                unconsumedMessage.add(dispatchedMessage.get(msgId));
+                ack.setTransactionId(transactionId);
             }
             dispatchedMessage.remove(msgId);
         }
         return ack;
     }
 
+    public MessageAck onStompMessageNack(String messageId, TransactionId transactionId) throws ProtocolException {
+
+        MessageId msgId = new MessageId(messageId);
+
+        if (!dispatchedMessage.containsKey(msgId)) {
+            return null;
+        }
+
+        MessageAck ack = new MessageAck();
+        ack.setDestination(consumerInfo.getDestination());
+        ack.setConsumerId(consumerInfo.getConsumerId());
+        ack.setAckType(MessageAck.POSION_ACK_TYPE);
+        ack.setMessageID(msgId);
+        if (transactionId != null) {
+            unconsumedMessage.add(dispatchedMessage.get(msgId));
+            ack.setTransactionId(transactionId);
+        }
+        dispatchedMessage.remove(msgId);
+
+        return null;
+    }
+
     public String getAckMode() {
         return ackMode;
     }
@@ -198,5 +219,4 @@ public class StompSubscription {
     public ConsumerInfo getConsumerInfo() {
         return consumerInfo;
     }
-
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransport.java Fri Aug 12 20:29:29 2011
@@ -28,11 +28,14 @@ import org.apache.activemq.command.Comma
 public interface StompTransport {
 
     public void sendToActiveMQ(Command command);
-    
+
     public void sendToStomp(StompFrame command) throws IOException;
-    
+
     public X509Certificate[] getPeerCertificates();
-    
+
     public void onException(IOException error);
-    
+
+    public StompInactivityMonitor getInactivityMonitor();
+
+    public StompWireFormat getWireFormat();
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFactory.java Fri Aug 12 20:29:29 2011
@@ -16,48 +16,47 @@
  */
 package org.apache.activemq.transport.stomp;
 
-import java.io.IOException;
-import java.net.URI;
-import java.net.URISyntaxException;
 import java.util.Map;
 
-import javax.net.ServerSocketFactory;
 import org.apache.activemq.broker.BrokerContext;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.tcp.TcpTransportFactory;
-import org.apache.activemq.transport.tcp.TcpTransportServer;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.wireformat.WireFormat;
-import org.apache.activemq.xbean.XBeanBrokerService;
 
 /**
  * A <a href="http://stomp.codehaus.org/">STOMP</a> transport factory
- * 
- * 
+ *
+ *
  */
 public class StompTransportFactory extends TcpTransportFactory implements BrokerServiceAware {
 
-	private BrokerContext brokerContext = null;
-	
+    private BrokerContext brokerContext = null;
+
     protected String getDefaultWireFormatType() {
         return "stomp";
     }
 
+    @SuppressWarnings("rawtypes")
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
-        transport = new StompTransportFilter(transport, new LegacyFrameTranslator(), brokerContext);
+        transport = new StompTransportFilter(transport, format, brokerContext);
         IntrospectionSupport.setProperties(transport, options);
         return super.compositeConfigure(transport, format, options);
     }
 
-    protected boolean isUseInactivityMonitor(Transport transport) {
-        // lets disable the inactivity monitor as stomp does not use keep alive
-        // packets
-        return false;
+    public void setBrokerService(BrokerService brokerService) {
+        this.brokerContext = brokerService.getBrokerContext();
     }
 
-	public void setBrokerService(BrokerService brokerService) {
-	    this.brokerContext = brokerService.getBrokerContext();
-	}
+    @Override
+    protected Transport createInactivityMonitor(Transport transport, WireFormat format) {
+        StompInactivityMonitor monitor = new StompInactivityMonitor(transport, format);
+
+        StompTransportFilter filter = (StompTransportFilter) transport.narrow(StompTransportFilter.class);
+        filter.setInactivityMonitor(monitor);
+
+        return monitor;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompTransportFilter.java Fri Aug 12 20:29:29 2011
@@ -28,6 +28,7 @@ import org.apache.activemq.transport.Tra
 import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.transport.tcp.SslTransport;
 import org.apache.activemq.util.IOExceptionSupport;
+import org.apache.activemq.wireformat.WireFormat;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,20 +37,24 @@ import org.slf4j.LoggerFactory;
  * configured with the StompWireFormat and is used to convert STOMP commands to
  * ActiveMQ commands. All of the conversion work is done by delegating to the
  * ProtocolConverter.
- * 
+ *
  * @author <a href="http://hiramchirino.com">chirino</a>
  */
 public class StompTransportFilter extends TransportFilter implements StompTransport {
     private static final Logger LOG = LoggerFactory.getLogger(StompTransportFilter.class);
     private final ProtocolConverter protocolConverter;
-    private final FrameTranslator frameTranslator;
+    private StompInactivityMonitor monitor;
+    private StompWireFormat wireFormat;
 
     private boolean trace;
 
-    public StompTransportFilter(Transport next, FrameTranslator translator, BrokerContext brokerContext) {
+    public StompTransportFilter(Transport next, WireFormat wireFormat, BrokerContext brokerContext) {
         super(next);
-        this.frameTranslator = translator;
-        this.protocolConverter = new ProtocolConverter(this, translator, brokerContext);
+        this.protocolConverter = new ProtocolConverter(this, brokerContext);
+
+        if (wireFormat instanceof StompWireFormat) {
+            this.wireFormat = (StompWireFormat) wireFormat;
+        }
     }
 
     public void oneway(Object o) throws IOException {
@@ -66,7 +71,7 @@ public class StompTransportFilter extend
             if (trace) {
                 LOG.trace("Received: \n" + command);
             }
-           
+
             protocolConverter.onStompCommand((StompFrame)command);
         } catch (IOException e) {
             onException(e);
@@ -92,21 +97,17 @@ public class StompTransportFilter extend
         }
     }
 
-    public FrameTranslator getFrameTranslator() {
-        return frameTranslator;
-    }
-
     public X509Certificate[] getPeerCertificates() {
-    	if(next instanceof SslTransport) {    	
-    		X509Certificate[] peerCerts = ((SslTransport)next).getPeerCertificates();
-    		if (trace && peerCerts != null) {
+        if(next instanceof SslTransport) {
+            X509Certificate[] peerCerts = ((SslTransport)next).getPeerCertificates();
+            if (trace && peerCerts != null) {
                 LOG.debug("Peer Identity has been verified\n");
             }
-    		return peerCerts;
-    	}
-    	return null;
+            return peerCerts;
+        }
+        return null;
     }
-    
+
     public boolean isTrace() {
         return trace;
     }
@@ -114,4 +115,18 @@ public class StompTransportFilter extend
     public void setTrace(boolean trace) {
         this.trace = trace;
     }
+
+    @Override
+    public StompInactivityMonitor getInactivityMonitor() {
+        return monitor;
+    }
+
+    public void setInactivityMonitor(StompInactivityMonitor monitor) {
+        this.monitor = monitor;
+    }
+
+    @Override
+    public StompWireFormat getWireFormat() {
+        return this.wireFormat;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java Fri Aug 12 20:29:29 2011
@@ -21,8 +21,9 @@ import java.io.DataInputStream;
 import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.io.PushbackInputStream;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 
 import org.apache.activemq.util.ByteArrayInputStream;
@@ -44,6 +45,7 @@ public class StompWireFormat implements 
     private static final int MAX_HEADERS = 1000;
     private static final int MAX_DATA_LENGTH = 1024 * 1024 * 100;
 
+    private boolean encodingEnabled = false;
     private int version = 1;
 
     public ByteSequence marshal(Object command) throws IOException {
@@ -63,16 +65,20 @@ public class StompWireFormat implements 
     public void marshal(Object command, DataOutput os) throws IOException {
         StompFrame stomp = (org.apache.activemq.transport.stomp.StompFrame)command;
 
-        StringBuffer buffer = new StringBuffer();
+        if (stomp.getAction().equals(Stomp.Commands.KEEPALIVE)) {
+            os.write(Stomp.BREAK);
+            return;
+        }
+
+        StringBuilder buffer = new StringBuilder();
         buffer.append(stomp.getAction());
         buffer.append(Stomp.NEWLINE);
 
         // Output the headers.
-        for (Iterator iter = stomp.getHeaders().entrySet().iterator(); iter.hasNext();) {
-            Map.Entry entry = (Map.Entry)iter.next();
+        for (Map.Entry<String, String> entry : stomp.getHeaders().entrySet()) {
             buffer.append(entry.getKey());
             buffer.append(Stomp.Headers.SEPERATOR);
-            buffer.append(entry.getValue());
+            buffer.append(encodeHeader(entry.getValue()));
             buffer.append(Stomp.NEWLINE);
         }
 
@@ -87,7 +93,7 @@ public class StompWireFormat implements 
     public Object unmarshal(DataInput in) throws IOException {
 
         try {
-            
+
             // parse action
             String action = parseAction(in);
 
@@ -129,7 +135,6 @@ public class StompWireFormat implements 
                     baos.close();
                     data = baos.toByteArray();
                 }
-
             }
 
             return new StompFrame(action, headers, data);
@@ -137,10 +142,14 @@ public class StompWireFormat implements 
         } catch (ProtocolException e) {
             return new StompFrameError(e);
         }
-
     }
 
     private String readLine(DataInput in, int maxLength, String errorMessage) throws IOException {
+        ByteSequence sequence = readHeaderLine(in, maxLength, errorMessage);
+        return new String(sequence.getData(), sequence.getOffset(), sequence.getLength(), "UTF-8").trim();
+    }
+
+    private ByteSequence readHeaderLine(DataInput in, int maxLength, String errorMessage) throws IOException {
         byte b;
         ByteArrayOutputStream baos = new ByteArrayOutputStream(maxLength);
         while ((b = in.readByte()) != '\n') {
@@ -150,10 +159,9 @@ public class StompWireFormat implements 
             baos.write(b);
         }
         baos.close();
-        ByteSequence sequence = baos.toByteSequence();
-        return new String(sequence.getData(), sequence.getOffset(), sequence.getLength(), "UTF-8");
+        return baos.toByteSequence();
     }
-    
+
     protected String parseAction(DataInput in) throws IOException {
         String action = null;
 
@@ -171,21 +179,35 @@ public class StompWireFormat implements 
         }
         return action;
     }
-    
+
     protected HashMap<String, String> parseHeaders(DataInput in) throws IOException {
         HashMap<String, String> headers = new HashMap<String, String>(25);
         while (true) {
-            String line = readLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
-            if (line != null && line.trim().length() > 0) {
+            ByteSequence line = readHeaderLine(in, MAX_HEADER_LENGTH, "The maximum header length was exceeded");
+            if (line != null && line.length > 0) {
 
                 if (headers.size() > MAX_HEADERS) {
                     throw new ProtocolException("The maximum number of headers was exceeded", true);
                 }
 
                 try {
-                    int seperatorIndex = line.indexOf(Stomp.Headers.SEPERATOR);
-                    String name = line.substring(0, seperatorIndex).trim();
-                    String value = line.substring(seperatorIndex + 1, line.length()).trim();
+
+                    ByteArrayInputStream headerLine = new ByteArrayInputStream(line);
+                    ByteArrayOutputStream stream = new ByteArrayOutputStream(line.length);
+
+                    // First complete the name
+                    int result = -1;
+                    while ((result = headerLine.read()) != -1) {
+                        if (result != ':') {
+                            stream.write(result);
+                        } else {
+                            break;
+                        }
+                    }
+
+                    ByteSequence nameSeq = stream.toByteSequence();
+                    String name = new String(nameSeq.getData(), nameSeq.getOffset(), nameSeq.getLength(), "UTF-8").trim();
+                    String value = decodeHeader(headerLine).trim();
                     headers.put(name, value);
                 } catch (Exception e) {
                     throw new ProtocolException("Unable to parser header line [" + line + "]", true);
@@ -193,10 +215,10 @@ public class StompWireFormat implements 
             } else {
                 break;
             }
-        }     
+        }
         return headers;
     }
-    
+
     protected int parseContentLength(String contentLength) throws ProtocolException {
         int length;
         try {
@@ -208,10 +230,71 @@ public class StompWireFormat implements 
         if (length > MAX_DATA_LENGTH) {
             throw new ProtocolException("The maximum data length was exceeded", true);
         }
-        
+
         return length;
     }
 
+    private String encodeHeader(String header) throws IOException {
+        String result = header;
+        if (this.encodingEnabled) {
+            byte[] utf8buf = header.getBytes("UTF-8");
+            ByteArrayOutputStream stream = new ByteArrayOutputStream(utf8buf.length);
+            for(byte val : utf8buf) {
+                switch(val) {
+                case Stomp.ESCAPE:
+                    stream.write(Stomp.ESCAPE_ESCAPE_SEQ);
+                    break;
+                case Stomp.BREAK:
+                    stream.write(Stomp.NEWLINE_ESCAPE_SEQ);
+                    break;
+                case Stomp.COLON:
+                    stream.write(Stomp.COLON_ESCAPE_SEQ);
+                    break;
+                default:
+                    stream.write(val);
+                }
+            }
+        }
+
+        return result;
+    }
+
+    private String decodeHeader(InputStream header) throws IOException {
+        ByteArrayOutputStream decoded = new ByteArrayOutputStream();
+        PushbackInputStream stream = new PushbackInputStream(header);
+
+        int value = -1;
+        while( (value = stream.read()) != -1) {
+            if (value == 92) {
+
+                int next = stream.read();
+                if (next != -1) {
+                    switch(next) {
+                    case 110:
+                        decoded.write(Stomp.BREAK);
+                        break;
+                    case 99:
+                        decoded.write(Stomp.COLON);
+                        break;
+                    case 92:
+                        decoded.write(Stomp.ESCAPE);
+                        break;
+                    default:
+                        stream.unread(next);
+                        decoded.write(value);
+                    }
+                } else {
+                    decoded.write(value);
+                }
+
+            } else {
+                decoded.write(value);
+            }
+        }
+
+        return new String(decoded.toByteArray(), "UTF-8");
+    }
+
     public int getVersion() {
         return version;
     }
@@ -220,4 +303,12 @@ public class StompWireFormat implements 
         this.version = version;
     }
 
+    public boolean isEncodingEnabled() {
+        return this.encodingEnabled;
+    }
+
+    public void setEncodingEnabled(boolean value) {
+        this.encodingEnabled = value;
+    }
+
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java?rev=1157238&r1=1157237&r2=1157238&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/tcp/SslTransportFactory.java Fri Aug 12 20:29:29 2011
@@ -20,9 +20,6 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
-import java.security.KeyManagementException;
-import java.security.NoSuchAlgorithmException;
-import java.security.NoSuchProviderException;
 import java.security.SecureRandom;
 import java.util.HashMap;
 import java.util.Map;
@@ -30,21 +27,13 @@ import java.util.Map;
 import javax.net.ServerSocketFactory;
 import javax.net.SocketFactory;
 import javax.net.ssl.KeyManager;
-import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLServerSocketFactory;
 import javax.net.ssl.SSLSocketFactory;
 import javax.net.ssl.TrustManager;
 
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.broker.BrokerServiceAware;
 import org.apache.activemq.broker.SslContext;
-import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.transport.InactivityMonitor;
 import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportFactory;
-import org.apache.activemq.transport.TransportLoggerFactory;
 import org.apache.activemq.transport.TransportServer;
-import org.apache.activemq.transport.WireFormatNegotiator;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IntrospectionSupport;
 import org.apache.activemq.util.URISupport;
@@ -57,15 +46,15 @@ import org.slf4j.LoggerFactory;
  * contribution from this class is that it is aware of SslTransportServer and
  * SslTransport classes. All Transports and TransportServers created from this
  * factory will have their needClientAuth option set to false.
- * 
+ *
  * @author sepandm@gmail.com (Sepand)
  * @author David Martin Clavo david(dot)martin(dot)clavo(at)gmail.com (logging improvement modifications)
- * 
+ *
  */
 public class SslTransportFactory extends TcpTransportFactory {
     // The log this uses.,
     private static final Logger LOG = LoggerFactory.getLogger(SslTransportFactory.class);
-    
+
     /**
      * Overriding to use SslTransportServer and allow for proper reflection.
      */
@@ -91,6 +80,7 @@ public class SslTransportFactory extends
      * Overriding to allow for proper configuration through reflection but delegate to get common
      * configuration
      */
+    @SuppressWarnings("rawtypes")
     public Transport compositeConfigure(Transport transport, WireFormat format, Map options) {
 
         SslTransport sslTransport = (SslTransport)transport.narrow(SslTransport.class);
@@ -120,14 +110,12 @@ public class SslTransportFactory extends
         return new SslTransport(wf, (SSLSocketFactory)socketFactory, location, localLocation, false);
     }
 
-
-
     /**
      * Creates a new SSL ServerSocketFactory. The given factory will use
      * user-provided key and trust managers (if the user provided them).
-     * 
+     *
      * @return Newly created (Ssl)ServerSocketFactory.
-     * @throws IOException 
+     * @throws IOException
      */
     protected ServerSocketFactory createServerSocketFactory() throws IOException {
         if( SslContext.getCurrentSslContext()!=null ) {
@@ -145,12 +133,12 @@ public class SslTransportFactory extends
     /**
      * Creates a new SSL SocketFactory. The given factory will use user-provided
      * key and trust managers (if the user provided them).
-     * 
+     *
      * @return Newly created (Ssl)SocketFactory.
-     * @throws IOException 
+     * @throws IOException
      */
     protected SocketFactory createSocketFactory() throws IOException {
-        
+
         if( SslContext.getCurrentSslContext()!=null ) {
             SslContext ctx = SslContext.getCurrentSslContext();
             try {
@@ -161,11 +149,10 @@ public class SslTransportFactory extends
         } else {
             return SSLSocketFactory.getDefault();
         }
-        
     }
 
     /**
-     * 
+     *
      * @param km
      * @param tm
      * @param random