You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/10/30 22:09:15 UTC

svn commit: r1403869 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/stomp/ test/java/org/apache/activemq/transport/stomp/

Author: tabish
Date: Tue Oct 30 21:09:15 2012
New Revision: 1403869

URL: http://svn.apache.org/viewvc?rev=1403869&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4129 Adds support for the STOMP v1.2 spec changes.

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOSSLTest.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOTest.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12SslAuthTest.java   (with props)
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java?rev=1403869&r1=1403868&r2=1403869&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java Tue Oct 30 21:09:15 2012
@@ -41,7 +41,6 @@ import com.thoughtworks.xstream.io.json.
  */
 public class LegacyFrameTranslator implements FrameTranslator {
 
-
     public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command) throws JMSException, ProtocolException {
         final Map<?, ?> headers = command.getHeaders();
         final ActiveMQMessage msg;
@@ -59,6 +58,7 @@ public class LegacyFrameTranslator imple
                     data.writeInt(command.getContent().length);
                     data.write(command.getContent());
                     text.setContent(bytes.toByteSequence());
+                    data.close();
                 } catch (Throwable e) {
                     throw new ProtocolException("Text could not bet set: " + e, false, e);
                 }
@@ -83,6 +83,7 @@ public class LegacyFrameTranslator imple
                 data.writeInt(command.getContent().length);
                 data.write(command.getContent());
                 text.setContent(bytes.toByteSequence());
+                data.close();
             } catch (Throwable e) {
                 throw new ProtocolException("Text could not bet set: " + e, false, e);
             }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=1403869&r1=1403868&r2=1403869&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java Tue Oct 30 21:09:15 2012
@@ -110,6 +110,9 @@ public class ProtocolConverter {
     private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>();
     private final StompTransport stompTransport;
 
+    private final ConcurrentHashMap<String, AckEntry> pedingAcks = new ConcurrentHashMap<String, AckEntry>();
+    private final IdGenerator ACK_ID_GENERATOR = new IdGenerator();
+
     private final Object commnadIdMutex = new Object();
     private int lastCommandId;
     private final AtomicBoolean connected = new AtomicBoolean(false);
@@ -121,6 +124,33 @@ public class ProtocolConverter {
     private long hbWriteInterval;
     private String defaultHeartBeat = Stomp.DEFAULT_HEART_BEAT;
 
+    private static class AckEntry {
+
+        private String messageId;
+        private StompSubscription subscription;
+
+        public AckEntry(String messageId, StompSubscription subscription) {
+            this.messageId = messageId;
+            this.subscription = subscription;
+        }
+
+        public MessageAck onMessageAck(TransactionId transactionId) {
+            return subscription.onStompMessageAck(messageId, transactionId);
+        }
+
+        public MessageAck onMessageNack(TransactionId transactionId) throws ProtocolException {
+            return subscription.onStompMessageNack(messageId, transactionId);
+        }
+
+        public String getMessageId() {
+            return this.messageId;
+        }
+
+        public StompSubscription getSubscription() {
+            return this.subscription;
+        }
+    }
+
     public ProtocolConverter(StompTransport stompTransport, BrokerContext brokerContext) {
         this.stompTransport = stompTransport;
         this.brokerContext = brokerContext;
@@ -301,15 +331,20 @@ public class ProtocolConverter {
         Map<String, String> headers = command.getHeaders();
 
         String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
-        if (subscriptionId == null) {
+        if (subscriptionId == null && !this.version.equals(Stomp.V1_2)) {
             throw new ProtocolException("NACK received without a subscription id for acknowledge!");
         }
 
         String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
-        if (messageId == null) {
+        if (messageId == null && !this.version.equals(Stomp.V1_2)) {
             throw new ProtocolException("NACK received without a message-id to acknowledge!");
         }
 
+        String ackId = headers.get(Stomp.Headers.Ack.ACK_ID);
+        if (ackId == null && this.version.equals(Stomp.V1_2)) {
+            throw new ProtocolException("NACK received without an ack header to acknowledge!");
+        }
+
         TransactionId activemqTx = null;
         String stompTx = headers.get(Stomp.Headers.TRANSACTION);
         if (stompTx != null) {
@@ -319,17 +354,32 @@ public class ProtocolConverter {
             }
         }
 
-        if (subscriptionId != null) {
+        boolean nacked = false;
+
+        if (ackId != null) {
+            AckEntry pendingAck = this.pedingAcks.get(ackId);
+            if (pendingAck != null) {
+                messageId = pendingAck.getMessageId();
+                MessageAck ack = pendingAck.onMessageNack(activemqTx);
+                if (ack != null) {
+                    sendToActiveMQ(ack, createResponseHandler(command));
+                    nacked = true;
+                }
+            }
+        } else if (subscriptionId != null) {
             StompSubscription sub = this.subscriptions.get(subscriptionId);
             if (sub != null) {
                 MessageAck ack = sub.onStompMessageNack(messageId, activemqTx);
                 if (ack != null) {
                     sendToActiveMQ(ack, createResponseHandler(command));
-                } else {
-                    throw new ProtocolException("Unexpected NACK received for message-id [" + messageId + "]");
+                    nacked = true;
                 }
             }
         }
+
+        if (!nacked) {
+            throw new ProtocolException("Unexpected NACK received for message-id [" + messageId + "]");
+        }
     }
 
     protected void onStompAck(StompFrame command) throws ProtocolException {
@@ -337,15 +387,20 @@ public class ProtocolConverter {
 
         Map<String, String> headers = command.getHeaders();
         String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
-        if (messageId == null) {
+        if (messageId == null && !(this.version.equals(Stomp.V1_2))) {
             throw new ProtocolException("ACK received without a message-id to acknowledge!");
         }
 
         String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
-        if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
+        if (subscriptionId == null && this.version.equals(Stomp.V1_1)) {
             throw new ProtocolException("ACK received without a subscription id for acknowledge!");
         }
 
+        String ackId = headers.get(Stomp.Headers.Ack.ACK_ID);
+        if (ackId == null && this.version.equals(Stomp.V1_2)) {
+            throw new ProtocolException("ACK received without a ack id for acknowledge!");
+        }
+
         TransactionId activemqTx = null;
         String stompTx = headers.get(Stomp.Headers.TRANSACTION);
         if (stompTx != null) {
@@ -357,7 +412,19 @@ public class ProtocolConverter {
 
         boolean acked = false;
 
-        if (subscriptionId != null) {
+        if (ackId != null) {
+
+            AckEntry pendingAck = this.pedingAcks.get(ackId);
+            if (pendingAck != null) {
+                messageId = pendingAck.getMessageId();
+                MessageAck ack = pendingAck.onMessageAck(activemqTx);
+                if (ack != null) {
+                    sendToActiveMQ(ack, createResponseHandler(command));
+                    acked = true;
+                }
+            }
+
+        } else if (subscriptionId != null) {
 
             StompSubscription sub = this.subscriptions.get(subscriptionId);
             if (sub != null) {
@@ -370,7 +437,7 @@ public class ProtocolConverter {
 
         } else {
 
-            // TODO: acking with just a message id is very bogus since the same message id
+            // STOMP v1.0: acking with just a message id is very bogus since the same message id
             // could have been sent to 2 different subscriptions on the same Stomp connection.
             // For example, when 2 subs are created on the same topic.
 
@@ -505,8 +572,8 @@ public class ProtocolConverter {
         }
 
         String selector = headers.remove(Stomp.Headers.Subscribe.SELECTOR);
-        if( selector!=null ) {
-            consumerInfo.setSelector("convert_string_expressions:"+selector);
+        if (selector != null) {
+            consumerInfo.setSelector("convert_string_expressions:" + selector);
         }
 
         IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
@@ -606,6 +673,7 @@ public class ProtocolConverter {
         if (this.version.equals(Stomp.V1_1)) {
             clientId = connectionInfo.getClientId();
         }
+
         if (durable != null) {
             RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
             info.setClientId(clientId);
@@ -733,7 +801,6 @@ public class ProtocolConverter {
                         }
                     }
                 });
-
             }
         });
     }
@@ -775,7 +842,19 @@ public class ProtocolConverter {
             MessageDispatch md = (MessageDispatch)command;
             StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
             if (sub != null) {
-                sub.onMessageDispatch(md);
+                String ackId = null;
+                if (version.equals(Stomp.V1_2) && sub.getAckMode() != Stomp.Headers.Subscribe.AckModeValues.AUTO) {
+                    AckEntry pendingAck = new AckEntry(md.getMessage().getMessageId().toString(), sub);
+                    ackId = this.ACK_ID_GENERATOR.generateId();
+                    this.pedingAcks.put(ackId, pendingAck);
+                }
+                try {
+                    sub.onMessageDispatch(md, ackId);
+                } catch (Exception ex) {
+                    if (ackId != null) {
+                        this.pedingAcks.remove(ackId);
+                    }
+                }
             }
         } else if (command.getDataStructureType() == CommandTypes.KEEP_ALIVE_INFO) {
             stompTransport.sendToStomp(ping);
@@ -846,15 +925,11 @@ public class ProtocolConverter {
             }
 
             try {
-
                 StompInactivityMonitor monitor = this.stompTransport.getInactivityMonitor();
-
                 monitor.setReadCheckTime(hbReadInterval);
                 monitor.setInitialDelayTime(Math.min(hbReadInterval, hbWriteInterval));
                 monitor.setWriteCheckTime(hbWriteInterval);
-
                 monitor.startMonitoring();
-
             } catch(Exception ex) {
                 hbReadInterval = 0;
                 hbWriteInterval = 0;

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java?rev=1403869&r1=1403868&r2=1403869&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java Tue Oct 30 21:09:15 2012
@@ -32,11 +32,12 @@ public interface Stomp {
     String COMMA = ",";
     String V1_0 = "1.0";
     String V1_1 = "1.1";
+    String V1_2 = "1.2";
     String DEFAULT_HEART_BEAT = "0,0";
     String DEFAULT_VERSION = "1.0";
     String EMPTY = "";
 
-    String[] SUPPORTED_PROTOCOL_VERSIONS = {"1.1", "1.0"};
+    String[] SUPPORTED_PROTOCOL_VERSIONS = {"1.2", "1.1", "1.0"};
 
     String TEXT_PLAIN = "text/plain";
     String TRUE = "true";
@@ -100,6 +101,7 @@ public interface Stomp {
 
         public interface Message {
             String MESSAGE_ID = "message-id";
+            String ACK_ID = "ack";
             String DESTINATION = "destination";
             String CORRELATION_ID = "correlation-id";
             String EXPIRATION_TIME = "expires";
@@ -159,6 +161,7 @@ public interface Stomp {
         public interface Ack {
             String MESSAGE_ID = "message-id";
             String SUBSCRIPTION = "subscription";
+            String ACK_ID = "id";
         }
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java?rev=1403869&r1=1403868&r2=1403869&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java Tue Oct 30 21:09:15 2012
@@ -32,10 +32,10 @@ public class StompQueueBrowserSubscripti
     }
 
     @Override
-    void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
+    void onMessageDispatch(MessageDispatch md, String ackId) throws IOException, JMSException {
 
         if (md.getMessage() != null) {
-            super.onMessageDispatch(md);
+            super.onMessageDispatch(md, ackId);
         } else {
             StompFrame browseDone = new StompFrame(Stomp.Responses.MESSAGE);
             browseDone.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, this.getSubscriptionId());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java?rev=1403869&r1=1403868&r2=1403869&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java Tue Oct 30 21:09:15 2012
@@ -16,9 +16,6 @@
  */
 package org.apache.activemq.transport.stomp;
 
-import org.apache.activemq.command.*;
-
-import javax.jms.JMSException;
 import java.io.IOException;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
@@ -26,6 +23,17 @@ import java.util.LinkedList;
 import java.util.Map;
 import java.util.Map.Entry;
 
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.TransactionId;
+
 /**
  * Keeps track of the STOMP subscription so that acking is correctly done.
  *
@@ -55,7 +63,7 @@ public class StompSubscription {
         this.transformation = transformation;
     }
 
-    void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
+    void onMessageDispatch(MessageDispatch md, String ackId) throws IOException, JMSException {
         ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
         if (ackMode == CLIENT_ACK) {
             synchronized (this) {
@@ -73,7 +81,7 @@ public class StompSubscription {
         boolean ignoreTransformation = false;
 
         if (transformation != null && !( message instanceof ActiveMQBytesMessage ) ) {
-               message.setReadOnlyProperties(false);
+            message.setReadOnlyProperties(false);
             message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation);
         } else {
             if (message.getStringProperty(Stomp.Headers.TRANSFORMATION) != null) {
@@ -88,6 +96,10 @@ public class StompSubscription {
             command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
         }
 
+        if (ackId != null) {
+            command.getHeaders().put(Stomp.Headers.Message.ACK_ID, ackId);
+        }
+
         protocolConverter.getStompTransport().sendToStomp(command);
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java?rev=1403869&r1=1403868&r2=1403869&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java Tue Oct 30 21:09:15 2012
@@ -154,12 +154,23 @@ public class StompWireFormat implements 
         ByteArrayOutputStream baos = new ByteArrayOutputStream(maxLength);
         while ((b = in.readByte()) != '\n') {
             if (baos.size() > maxLength) {
+                baos.close();
                 throw new ProtocolException(errorMessage, true);
             }
             baos.write(b);
         }
+
         baos.close();
-        return baos.toByteSequence();
+        ByteSequence line = baos.toByteSequence();
+
+        if (stompVersion.equals(Stomp.V1_0) || stompVersion.equals(Stomp.V1_2)) {
+            int lineLength = line.getLength();
+            if (lineLength > 0 && line.data[lineLength-1] == '\r') {
+                line.setLength(lineLength-1);
+            }
+        }
+
+        return line;
     }
 
     protected String parseAction(DataInput in) throws IOException {
@@ -177,6 +188,7 @@ public class StompWireFormat implements 
                 }
             }
         }
+
         return action;
     }
 
@@ -206,6 +218,7 @@ public class StompWireFormat implements 
                     }
 
                     ByteSequence nameSeq = stream.toByteSequence();
+
                     String name = new String(nameSeq.getData(), nameSeq.getOffset(), nameSeq.getLength(), "UTF-8");
                     String value = decodeHeader(headerLine);
                     if (stompVersion.equals(Stomp.V1_0)) {
@@ -213,8 +226,11 @@ public class StompWireFormat implements 
                     }
 
                     if (!headers.containsKey(name)) {
-                    	headers.put(name, value);
+                        headers.put(name, value);
                     }
+
+                    stream.close();
+
                 } catch (Exception e) {
                     throw new ProtocolException("Unable to parser header line [" + line + "]", true);
                 }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOSSLTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOSSLTest.java?rev=1403869&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOSSLTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOSSLTest.java Tue Oct 30 21:09:15 2012
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLSocketFactory;
+
+public class Stomp12NIOSSLTest extends Stomp12Test {
+
+    protected void setUp() throws Exception {
+        bindAddress = "stomp+nio+ssl://localhost:61613";
+        confUri = "xbean:org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml";
+        System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
+        System.setProperty("javax.net.ssl.trustStorePassword", "password");
+        System.setProperty("javax.net.ssl.trustStoreType", "jks");
+        System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
+        System.setProperty("javax.net.ssl.keyStorePassword", "password");
+        System.setProperty("javax.net.ssl.keyStoreType", "jks");
+        super.setUp();
+    }
+
+    protected Socket createSocket(URI connectUri) throws IOException {
+        SocketFactory factory = SSLSocketFactory.getDefault();
+        return factory.createSocket("127.0.0.1", connectUri.getPort());
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOSSLTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOTest.java?rev=1403869&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOTest.java Tue Oct 30 21:09:15 2012
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+public class Stomp12NIOTest extends Stomp12Test {
+
+    @Override
+    protected void setUp() throws Exception {
+        bindAddress = "stomp+nio://localhost:61612";
+        confUri = "xbean:org/apache/activemq/transport/stomp/niostomp-auth-broker.xml";
+        super.setUp();
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12SslAuthTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12SslAuthTest.java?rev=1403869&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12SslAuthTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12SslAuthTest.java Tue Oct 30 21:09:15 2012
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLSocketFactory;
+
+
+
+/**
+ *
+ */
+public class Stomp12SslAuthTest extends Stomp12Test {
+
+
+    protected void setUp() throws Exception {
+
+        // Test mutual authentication on both stomp and standard ssl transports
+        bindAddress = "stomp+ssl://localhost:61612";
+        confUri = "xbean:org/apache/activemq/transport/stomp/sslstomp-mutual-auth-broker.xml";
+        jmsUri="ssl://localhost:61617";
+
+        System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
+        System.setProperty("javax.net.ssl.trustStorePassword", "password");
+        System.setProperty("javax.net.ssl.trustStoreType", "jks");
+        System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
+        System.setProperty("javax.net.ssl.keyStorePassword", "password");
+        System.setProperty("javax.net.ssl.keyStoreType", "jks");
+        //System.setProperty("javax.net.debug","ssl,handshake");
+        super.setUp();
+    }
+
+    protected Socket createSocket(URI connectUri) throws IOException {
+        SocketFactory factory = SSLSocketFactory.getDefault();
+        return factory.createSocket("127.0.0.1", connectUri.getPort());
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12SslAuthTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java?rev=1403869&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java Tue Oct 30 21:09:15 2012
@@ -0,0 +1,387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+
+import javax.jms.Connection;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.CombinationTestSupport;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Stomp12Test extends CombinationTestSupport {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Stomp12Test.class);
+
+    protected String bindAddress = "stomp://localhost:61613";
+    protected String confUri = "xbean:org/apache/activemq/transport/stomp/stomp-auth-broker.xml";
+    protected String jmsUri = "vm://localhost";
+
+    private BrokerService broker;
+    private StompConnection stompConnection = new StompConnection();
+    private Connection connection;
+
+    @Override
+    protected void setUp() throws Exception {
+
+        broker = BrokerFactory.createBroker(new URI(confUri));
+        broker.start();
+        broker.waitUntilStarted();
+
+        stompConnect();
+
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(jmsUri);
+        connection = cf.createConnection("system", "manager");
+        connection.start();
+    }
+
+    private void stompConnect() throws IOException, URISyntaxException, UnknownHostException {
+        URI connectUri = new URI(bindAddress);
+        stompConnection.open(createSocket(connectUri));
+    }
+
+    protected Socket createSocket(URI connectUri) throws IOException {
+        return new Socket("127.0.0.1", connectUri.getPort());
+    }
+
+    protected String getQueueName() {
+        return getClass().getName() + "." + getName();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        try {
+            stompDisconnect();
+        } catch(Exception e) {
+            // Some tests explicitly disconnect from stomp so can ignore
+        } finally {
+            broker.stop();
+            broker.waitUntilStopped();
+        }
+    }
+
+    private void stompDisconnect() throws IOException {
+        if (stompConnection != null) {
+            stompConnection.close();
+            stompConnection = null;
+        }
+    }
+
+    @Test
+    public void testTelnetStyleSends() throws Exception {
+
+        stompConnection.setVersion(Stomp.V1_2);
+
+        String connect = "CONNECT\r\n" +
+                         "accept-version:1.2\r\n" +
+                         "login:system\r\n" +
+                         "passcode:manager\r\n" +
+                         "\r\n" +
+                         "\u0000\r\n";
+
+        stompConnection.sendFrame(connect);
+
+        String f = stompConnection.receiveFrame();
+        LOG.info("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+        assertTrue(f.indexOf("version:1.2") >= 0);
+        assertTrue(f.indexOf("session:") >= 0);
+
+        String send = "SUBSCRIBE\r\n" +
+                      "id:1\r\n" +
+                      "destination:/queue/" + getQueueName() + "\r\n" +
+                      "receipt:1\r\n" +
+                      "\r\n"+
+                      "\u0000\r\n";
+
+        stompConnection.sendFrame(send);
+
+        StompFrame receipt = stompConnection.receive();
+        LOG.info("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        String receiptId = receipt.getHeaders().get("receipt-id");
+        assertEquals("1", receiptId);
+
+        String disconnect = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(disconnect);
+    }
+
+    @Test
+    public void testClientAckWithoutAckId() throws Exception {
+
+        stompConnection.setVersion(Stomp.V1_2);
+
+        String connect = "STOMP\r\n" +
+                         "accept-version:1.2\r\n" +
+                         "login:system\r\n" +
+                         "passcode:manager\r\n" +
+                         "\r\n" +
+                         "\u0000\r\n";
+
+        stompConnection.sendFrame(connect);
+
+        String f = stompConnection.receiveFrame();
+        LOG.info("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+        assertTrue(f.indexOf("version:1.2") >= 0);
+        assertTrue(f.indexOf("session:") >= 0);
+
+        String subscribe = "SUBSCRIBE\n" +
+                           "id:1\n" +
+                           "ack:client\n" +
+                           "destination:/queue/" + getQueueName() + "\n" +
+                           "receipt:1\n" +
+                           "\n" + Stomp.NULL;
+
+        stompConnection.sendFrame(subscribe);
+
+        StompFrame receipt = stompConnection.receive();
+        LOG.info("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        String receiptId = receipt.getHeaders().get("receipt-id");
+        assertEquals("1", receiptId);
+
+        String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "1" + Stomp.NULL;
+        stompConnection.sendFrame(message);
+
+        StompFrame received = stompConnection.receive();
+        assertTrue(received.getAction().equals("MESSAGE"));
+        assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+        assertEquals("1", received.getBody());
+
+        String frame = "ACK\n" + "message-id:" +
+                received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        received = stompConnection.receive();
+        assertTrue(received.getAction().equals("ERROR"));
+        LOG.info("Broker sent: " + received);
+
+        String disconnect = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(disconnect);
+    }
+
+    @Test
+    public void testClientAck() throws Exception {
+
+        stompConnection.setVersion(Stomp.V1_2);
+
+        String connect = "STOMP\r\n" +
+                         "accept-version:1.2\r\n" +
+                         "login:system\r\n" +
+                         "passcode:manager\r\n" +
+                         "\r\n" +
+                         "\u0000\r\n";
+
+        stompConnection.sendFrame(connect);
+
+        String f = stompConnection.receiveFrame();
+        LOG.info("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+        assertTrue(f.indexOf("version:1.2") >= 0);
+        assertTrue(f.indexOf("session:") >= 0);
+
+        String subscribe = "SUBSCRIBE\n" +
+                           "id:1\n" +
+                           "ack:client\n" +
+                           "destination:/queue/" + getQueueName() + "\n" +
+                           "receipt:1\n" +
+                           "\n" + Stomp.NULL;
+
+        stompConnection.sendFrame(subscribe);
+
+        StompFrame receipt = stompConnection.receive();
+        LOG.info("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        String receiptId = receipt.getHeaders().get("receipt-id");
+        assertEquals("1", receiptId);
+
+        String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "1" + Stomp.NULL;
+        stompConnection.sendFrame(message);
+        message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "2" + Stomp.NULL;
+        stompConnection.sendFrame(message);
+
+        StompFrame received = stompConnection.receive();
+        LOG.info("Stomp Message: {}", received);
+        assertTrue(received.getAction().equals("MESSAGE"));
+        assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+        assertEquals("1", received.getBody());
+
+        received = stompConnection.receive();
+        LOG.info("Stomp Message: {}", received);
+        assertTrue(received.getAction().equals("MESSAGE"));
+        assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+        assertEquals("2", received.getBody());
+
+        String frame = "ACK\n" + "id:" +
+                received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = "DISCONNECT\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        try {
+            Thread.sleep(400);
+        } catch (InterruptedException e){}
+
+        // reconnect and send some messages to the offline subscribers and then try to get
+        // them after subscribing again.
+        stompConnect();
+        stompConnection.sendFrame(connect);
+        frame = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + frame);
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        stompConnection.sendFrame(subscribe);
+
+        receipt = stompConnection.receive();
+        LOG.info("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        receiptId = receipt.getHeaders().get("receipt-id");
+        assertEquals("1", receiptId);
+
+        message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "3" + Stomp.NULL;
+        stompConnection.sendFrame(message);
+
+        received = stompConnection.receive();
+        LOG.info("Stomp Message: {}", received);
+        assertTrue(received.getAction().equals("MESSAGE"));
+        assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+        assertEquals("3", received.getBody());
+
+        frame = "ACK\n" + "id:" +
+                received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        String disconnect = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(disconnect);
+    }
+
+    @Test
+    public void testClientIndividualAck() throws Exception {
+
+        stompConnection.setVersion(Stomp.V1_2);
+
+        String connect = "STOMP\r\n" +
+                         "accept-version:1.2\r\n" +
+                         "login:system\r\n" +
+                         "passcode:manager\r\n" +
+                         "\r\n" +
+                         "\u0000\r\n";
+
+        stompConnection.sendFrame(connect);
+
+        String f = stompConnection.receiveFrame();
+        LOG.info("Broker sent: " + f);
+
+        assertTrue(f.startsWith("CONNECTED"));
+        assertTrue(f.indexOf("version:1.2") >= 0);
+        assertTrue(f.indexOf("session:") >= 0);
+
+        String subscribe = "SUBSCRIBE\n" +
+                           "id:1\n" +
+                           "ack:client-individual\n" +
+                           "destination:/queue/" + getQueueName() + "\n" +
+                           "receipt:1\n" +
+                           "\n" + Stomp.NULL;
+
+        stompConnection.sendFrame(subscribe);
+
+        StompFrame receipt = stompConnection.receive();
+        LOG.info("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        String receiptId = receipt.getHeaders().get("receipt-id");
+        assertEquals("1", receiptId);
+
+        String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "1" + Stomp.NULL;
+        stompConnection.sendFrame(message);
+        message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "2" + Stomp.NULL;
+        stompConnection.sendFrame(message);
+
+        StompFrame received = stompConnection.receive();
+        assertTrue(received.getAction().equals("MESSAGE"));
+        assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+        assertEquals("1", received.getBody());
+
+        received = stompConnection.receive();
+        assertTrue(received.getAction().equals("MESSAGE"));
+        assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+        assertEquals("2", received.getBody());
+
+        String frame = "ACK\n" + "id:" +
+                received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+        try {
+            Thread.sleep(400);
+        } catch (InterruptedException e){}
+
+        // reconnect and send some messages to the offline subscribers and then try to get
+        // them after subscribing again.
+        stompConnect();
+        stompConnection.sendFrame(connect);
+        frame = stompConnection.receiveFrame();
+        LOG.debug("Broker sent: " + frame);
+        assertTrue(frame.startsWith("CONNECTED"));
+
+        stompConnection.sendFrame(subscribe);
+
+        receipt = stompConnection.receive();
+        LOG.info("Broker sent: " + receipt);
+        assertTrue(receipt.getAction().startsWith("RECEIPT"));
+        receiptId = receipt.getHeaders().get("receipt-id");
+        assertEquals("1", receiptId);
+
+        message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "3" + Stomp.NULL;
+        stompConnection.sendFrame(message);
+
+        received = stompConnection.receive();
+        assertTrue(received.getAction().equals("MESSAGE"));
+        assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+        assertEquals("1", received.getBody());
+
+        frame = "ACK\n" + "id:" +
+                received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        received = stompConnection.receive();
+        assertTrue(received.getAction().equals("MESSAGE"));
+        assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+        assertEquals("3", received.getBody());
+
+        frame = "ACK\n" + "id:" +
+                received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(frame);
+
+        String disconnect = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+        stompConnection.sendFrame(disconnect);
+    }
+
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java
------------------------------------------------------------------------------
    svn:eol-style = native