You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/10/30 22:09:15 UTC
svn commit: r1403869 - in /activemq/trunk/activemq-core/src:
main/java/org/apache/activemq/transport/stomp/
test/java/org/apache/activemq/transport/stomp/
Author: tabish
Date: Tue Oct 30 21:09:15 2012
New Revision: 1403869
URL: http://svn.apache.org/viewvc?rev=1403869&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-4129 Adds support for the STOMP v1.2 spec changes.
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOSSLTest.java (with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOTest.java (with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12SslAuthTest.java (with props)
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java (with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java?rev=1403869&r1=1403868&r2=1403869&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/LegacyFrameTranslator.java Tue Oct 30 21:09:15 2012
@@ -41,7 +41,6 @@ import com.thoughtworks.xstream.io.json.
*/
public class LegacyFrameTranslator implements FrameTranslator {
-
public ActiveMQMessage convertFrame(ProtocolConverter converter, StompFrame command) throws JMSException, ProtocolException {
final Map<?, ?> headers = command.getHeaders();
final ActiveMQMessage msg;
@@ -59,6 +58,7 @@ public class LegacyFrameTranslator imple
data.writeInt(command.getContent().length);
data.write(command.getContent());
text.setContent(bytes.toByteSequence());
+ data.close();
} catch (Throwable e) {
throw new ProtocolException("Text could not bet set: " + e, false, e);
}
@@ -83,6 +83,7 @@ public class LegacyFrameTranslator imple
data.writeInt(command.getContent().length);
data.write(command.getContent());
text.setContent(bytes.toByteSequence());
+ data.close();
} catch (Throwable e) {
throw new ProtocolException("Text could not bet set: " + e, false, e);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java?rev=1403869&r1=1403868&r2=1403869&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/ProtocolConverter.java Tue Oct 30 21:09:15 2012
@@ -110,6 +110,9 @@ public class ProtocolConverter {
private final Map<String, LocalTransactionId> transactions = new ConcurrentHashMap<String, LocalTransactionId>();
private final StompTransport stompTransport;
+ private final ConcurrentHashMap<String, AckEntry> pedingAcks = new ConcurrentHashMap<String, AckEntry>();
+ private final IdGenerator ACK_ID_GENERATOR = new IdGenerator();
+
private final Object commnadIdMutex = new Object();
private int lastCommandId;
private final AtomicBoolean connected = new AtomicBoolean(false);
@@ -121,6 +124,33 @@ public class ProtocolConverter {
private long hbWriteInterval;
private String defaultHeartBeat = Stomp.DEFAULT_HEART_BEAT;
+ private static class AckEntry {
+
+ private String messageId;
+ private StompSubscription subscription;
+
+ public AckEntry(String messageId, StompSubscription subscription) {
+ this.messageId = messageId;
+ this.subscription = subscription;
+ }
+
+ public MessageAck onMessageAck(TransactionId transactionId) {
+ return subscription.onStompMessageAck(messageId, transactionId);
+ }
+
+ public MessageAck onMessageNack(TransactionId transactionId) throws ProtocolException {
+ return subscription.onStompMessageNack(messageId, transactionId);
+ }
+
+ public String getMessageId() {
+ return this.messageId;
+ }
+
+ public StompSubscription getSubscription() {
+ return this.subscription;
+ }
+ }
+
public ProtocolConverter(StompTransport stompTransport, BrokerContext brokerContext) {
this.stompTransport = stompTransport;
this.brokerContext = brokerContext;
@@ -301,15 +331,20 @@ public class ProtocolConverter {
Map<String, String> headers = command.getHeaders();
String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
- if (subscriptionId == null) {
+ if (subscriptionId == null && !this.version.equals(Stomp.V1_2)) {
throw new ProtocolException("NACK received without a subscription id for acknowledge!");
}
String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
- if (messageId == null) {
+ if (messageId == null && !this.version.equals(Stomp.V1_2)) {
throw new ProtocolException("NACK received without a message-id to acknowledge!");
}
+ String ackId = headers.get(Stomp.Headers.Ack.ACK_ID);
+ if (ackId == null && this.version.equals(Stomp.V1_2)) {
+ throw new ProtocolException("NACK received without an ack header to acknowledge!");
+ }
+
TransactionId activemqTx = null;
String stompTx = headers.get(Stomp.Headers.TRANSACTION);
if (stompTx != null) {
@@ -319,17 +354,32 @@ public class ProtocolConverter {
}
}
- if (subscriptionId != null) {
+ boolean nacked = false;
+
+ if (ackId != null) {
+ AckEntry pendingAck = this.pedingAcks.get(ackId);
+ if (pendingAck != null) {
+ messageId = pendingAck.getMessageId();
+ MessageAck ack = pendingAck.onMessageNack(activemqTx);
+ if (ack != null) {
+ sendToActiveMQ(ack, createResponseHandler(command));
+ nacked = true;
+ }
+ }
+ } else if (subscriptionId != null) {
StompSubscription sub = this.subscriptions.get(subscriptionId);
if (sub != null) {
MessageAck ack = sub.onStompMessageNack(messageId, activemqTx);
if (ack != null) {
sendToActiveMQ(ack, createResponseHandler(command));
- } else {
- throw new ProtocolException("Unexpected NACK received for message-id [" + messageId + "]");
+ nacked = true;
}
}
}
+
+ if (!nacked) {
+ throw new ProtocolException("Unexpected NACK received for message-id [" + messageId + "]");
+ }
}
protected void onStompAck(StompFrame command) throws ProtocolException {
@@ -337,15 +387,20 @@ public class ProtocolConverter {
Map<String, String> headers = command.getHeaders();
String messageId = headers.get(Stomp.Headers.Ack.MESSAGE_ID);
- if (messageId == null) {
+ if (messageId == null && !(this.version.equals(Stomp.V1_2))) {
throw new ProtocolException("ACK received without a message-id to acknowledge!");
}
String subscriptionId = headers.get(Stomp.Headers.Ack.SUBSCRIPTION);
- if (this.version.equals(Stomp.V1_1) && subscriptionId == null) {
+ if (subscriptionId == null && this.version.equals(Stomp.V1_1)) {
throw new ProtocolException("ACK received without a subscription id for acknowledge!");
}
+ String ackId = headers.get(Stomp.Headers.Ack.ACK_ID);
+ if (ackId == null && this.version.equals(Stomp.V1_2)) {
+ throw new ProtocolException("ACK received without a ack id for acknowledge!");
+ }
+
TransactionId activemqTx = null;
String stompTx = headers.get(Stomp.Headers.TRANSACTION);
if (stompTx != null) {
@@ -357,7 +412,19 @@ public class ProtocolConverter {
boolean acked = false;
- if (subscriptionId != null) {
+ if (ackId != null) {
+
+ AckEntry pendingAck = this.pedingAcks.get(ackId);
+ if (pendingAck != null) {
+ messageId = pendingAck.getMessageId();
+ MessageAck ack = pendingAck.onMessageAck(activemqTx);
+ if (ack != null) {
+ sendToActiveMQ(ack, createResponseHandler(command));
+ acked = true;
+ }
+ }
+
+ } else if (subscriptionId != null) {
StompSubscription sub = this.subscriptions.get(subscriptionId);
if (sub != null) {
@@ -370,7 +437,7 @@ public class ProtocolConverter {
} else {
- // TODO: acking with just a message id is very bogus since the same message id
+ // STOMP v1.0: acking with just a message id is very bogus since the same message id
// could have been sent to 2 different subscriptions on the same Stomp connection.
// For example, when 2 subs are created on the same topic.
@@ -505,8 +572,8 @@ public class ProtocolConverter {
}
String selector = headers.remove(Stomp.Headers.Subscribe.SELECTOR);
- if( selector!=null ) {
- consumerInfo.setSelector("convert_string_expressions:"+selector);
+ if (selector != null) {
+ consumerInfo.setSelector("convert_string_expressions:" + selector);
}
IntrospectionSupport.setProperties(consumerInfo, headers, "activemq.");
@@ -606,6 +673,7 @@ public class ProtocolConverter {
if (this.version.equals(Stomp.V1_1)) {
clientId = connectionInfo.getClientId();
}
+
if (durable != null) {
RemoveSubscriptionInfo info = new RemoveSubscriptionInfo();
info.setClientId(clientId);
@@ -733,7 +801,6 @@ public class ProtocolConverter {
}
}
});
-
}
});
}
@@ -775,7 +842,19 @@ public class ProtocolConverter {
MessageDispatch md = (MessageDispatch)command;
StompSubscription sub = subscriptionsByConsumerId.get(md.getConsumerId());
if (sub != null) {
- sub.onMessageDispatch(md);
+ String ackId = null;
+ if (version.equals(Stomp.V1_2) && sub.getAckMode() != Stomp.Headers.Subscribe.AckModeValues.AUTO) {
+ AckEntry pendingAck = new AckEntry(md.getMessage().getMessageId().toString(), sub);
+ ackId = this.ACK_ID_GENERATOR.generateId();
+ this.pedingAcks.put(ackId, pendingAck);
+ }
+ try {
+ sub.onMessageDispatch(md, ackId);
+ } catch (Exception ex) {
+ if (ackId != null) {
+ this.pedingAcks.remove(ackId);
+ }
+ }
}
} else if (command.getDataStructureType() == CommandTypes.KEEP_ALIVE_INFO) {
stompTransport.sendToStomp(ping);
@@ -846,15 +925,11 @@ public class ProtocolConverter {
}
try {
-
StompInactivityMonitor monitor = this.stompTransport.getInactivityMonitor();
-
monitor.setReadCheckTime(hbReadInterval);
monitor.setInitialDelayTime(Math.min(hbReadInterval, hbWriteInterval));
monitor.setWriteCheckTime(hbWriteInterval);
-
monitor.startMonitoring();
-
} catch(Exception ex) {
hbReadInterval = 0;
hbWriteInterval = 0;
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java?rev=1403869&r1=1403868&r2=1403869&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/Stomp.java Tue Oct 30 21:09:15 2012
@@ -32,11 +32,12 @@ public interface Stomp {
String COMMA = ",";
String V1_0 = "1.0";
String V1_1 = "1.1";
+ String V1_2 = "1.2";
String DEFAULT_HEART_BEAT = "0,0";
String DEFAULT_VERSION = "1.0";
String EMPTY = "";
- String[] SUPPORTED_PROTOCOL_VERSIONS = {"1.1", "1.0"};
+ String[] SUPPORTED_PROTOCOL_VERSIONS = {"1.2", "1.1", "1.0"};
String TEXT_PLAIN = "text/plain";
String TRUE = "true";
@@ -100,6 +101,7 @@ public interface Stomp {
public interface Message {
String MESSAGE_ID = "message-id";
+ String ACK_ID = "ack";
String DESTINATION = "destination";
String CORRELATION_ID = "correlation-id";
String EXPIRATION_TIME = "expires";
@@ -159,6 +161,7 @@ public interface Stomp {
public interface Ack {
String MESSAGE_ID = "message-id";
String SUBSCRIPTION = "subscription";
+ String ACK_ID = "id";
}
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java?rev=1403869&r1=1403868&r2=1403869&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompQueueBrowserSubscription.java Tue Oct 30 21:09:15 2012
@@ -32,10 +32,10 @@ public class StompQueueBrowserSubscripti
}
@Override
- void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
+ void onMessageDispatch(MessageDispatch md, String ackId) throws IOException, JMSException {
if (md.getMessage() != null) {
- super.onMessageDispatch(md);
+ super.onMessageDispatch(md, ackId);
} else {
StompFrame browseDone = new StompFrame(Stomp.Responses.MESSAGE);
browseDone.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, this.getSubscriptionId());
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java?rev=1403869&r1=1403868&r2=1403869&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompSubscription.java Tue Oct 30 21:09:15 2012
@@ -16,9 +16,6 @@
*/
package org.apache.activemq.transport.stomp;
-import org.apache.activemq.command.*;
-
-import javax.jms.JMSException;
import java.io.IOException;
import java.util.Iterator;
import java.util.LinkedHashMap;
@@ -26,6 +23,17 @@ import java.util.LinkedList;
import java.util.Map;
import java.util.Map.Entry;
+import javax.jms.JMSException;
+
+import org.apache.activemq.command.ActiveMQBytesMessage;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.TransactionId;
+
/**
* Keeps track of the STOMP subscription so that acking is correctly done.
*
@@ -55,7 +63,7 @@ public class StompSubscription {
this.transformation = transformation;
}
- void onMessageDispatch(MessageDispatch md) throws IOException, JMSException {
+ void onMessageDispatch(MessageDispatch md, String ackId) throws IOException, JMSException {
ActiveMQMessage message = (ActiveMQMessage)md.getMessage();
if (ackMode == CLIENT_ACK) {
synchronized (this) {
@@ -73,7 +81,7 @@ public class StompSubscription {
boolean ignoreTransformation = false;
if (transformation != null && !( message instanceof ActiveMQBytesMessage ) ) {
- message.setReadOnlyProperties(false);
+ message.setReadOnlyProperties(false);
message.setStringProperty(Stomp.Headers.TRANSFORMATION, transformation);
} else {
if (message.getStringProperty(Stomp.Headers.TRANSFORMATION) != null) {
@@ -88,6 +96,10 @@ public class StompSubscription {
command.getHeaders().put(Stomp.Headers.Message.SUBSCRIPTION, subscriptionId);
}
+ if (ackId != null) {
+ command.getHeaders().put(Stomp.Headers.Message.ACK_ID, ackId);
+ }
+
protocolConverter.getStompTransport().sendToStomp(command);
}
Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java?rev=1403869&r1=1403868&r2=1403869&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/stomp/StompWireFormat.java Tue Oct 30 21:09:15 2012
@@ -154,12 +154,23 @@ public class StompWireFormat implements
ByteArrayOutputStream baos = new ByteArrayOutputStream(maxLength);
while ((b = in.readByte()) != '\n') {
if (baos.size() > maxLength) {
+ baos.close();
throw new ProtocolException(errorMessage, true);
}
baos.write(b);
}
+
baos.close();
- return baos.toByteSequence();
+ ByteSequence line = baos.toByteSequence();
+
+ if (stompVersion.equals(Stomp.V1_0) || stompVersion.equals(Stomp.V1_2)) {
+ int lineLength = line.getLength();
+ if (lineLength > 0 && line.data[lineLength-1] == '\r') {
+ line.setLength(lineLength-1);
+ }
+ }
+
+ return line;
}
protected String parseAction(DataInput in) throws IOException {
@@ -177,6 +188,7 @@ public class StompWireFormat implements
}
}
}
+
return action;
}
@@ -206,6 +218,7 @@ public class StompWireFormat implements
}
ByteSequence nameSeq = stream.toByteSequence();
+
String name = new String(nameSeq.getData(), nameSeq.getOffset(), nameSeq.getLength(), "UTF-8");
String value = decodeHeader(headerLine);
if (stompVersion.equals(Stomp.V1_0)) {
@@ -213,8 +226,11 @@ public class StompWireFormat implements
}
if (!headers.containsKey(name)) {
- headers.put(name, value);
+ headers.put(name, value);
}
+
+ stream.close();
+
} catch (Exception e) {
throw new ProtocolException("Unable to parser header line [" + line + "]", true);
}
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOSSLTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOSSLTest.java?rev=1403869&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOSSLTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOSSLTest.java Tue Oct 30 21:09:15 2012
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLSocketFactory;
+
+public class Stomp12NIOSSLTest extends Stomp12Test {
+
+ protected void setUp() throws Exception {
+ bindAddress = "stomp+nio+ssl://localhost:61613";
+ confUri = "xbean:org/apache/activemq/transport/stomp/sslstomp-auth-broker.xml";
+ System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
+ System.setProperty("javax.net.ssl.trustStorePassword", "password");
+ System.setProperty("javax.net.ssl.trustStoreType", "jks");
+ System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
+ System.setProperty("javax.net.ssl.keyStorePassword", "password");
+ System.setProperty("javax.net.ssl.keyStoreType", "jks");
+ super.setUp();
+ }
+
+ protected Socket createSocket(URI connectUri) throws IOException {
+ SocketFactory factory = SSLSocketFactory.getDefault();
+ return factory.createSocket("127.0.0.1", connectUri.getPort());
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOSSLTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOTest.java?rev=1403869&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOTest.java Tue Oct 30 21:09:15 2012
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+public class Stomp12NIOTest extends Stomp12Test {
+
+ @Override
+ protected void setUp() throws Exception {
+ bindAddress = "stomp+nio://localhost:61612";
+ confUri = "xbean:org/apache/activemq/transport/stomp/niostomp-auth-broker.xml";
+ super.setUp();
+ }
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12NIOTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12SslAuthTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12SslAuthTest.java?rev=1403869&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12SslAuthTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12SslAuthTest.java Tue Oct 30 21:09:15 2012
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+
+import javax.net.SocketFactory;
+import javax.net.ssl.SSLSocketFactory;
+
+
+
+/**
+ *
+ */
+public class Stomp12SslAuthTest extends Stomp12Test {
+
+
+ protected void setUp() throws Exception {
+
+ // Test mutual authentication on both stomp and standard ssl transports
+ bindAddress = "stomp+ssl://localhost:61612";
+ confUri = "xbean:org/apache/activemq/transport/stomp/sslstomp-mutual-auth-broker.xml";
+ jmsUri="ssl://localhost:61617";
+
+ System.setProperty("javax.net.ssl.trustStore", "src/test/resources/client.keystore");
+ System.setProperty("javax.net.ssl.trustStorePassword", "password");
+ System.setProperty("javax.net.ssl.trustStoreType", "jks");
+ System.setProperty("javax.net.ssl.keyStore", "src/test/resources/server.keystore");
+ System.setProperty("javax.net.ssl.keyStorePassword", "password");
+ System.setProperty("javax.net.ssl.keyStoreType", "jks");
+ //System.setProperty("javax.net.debug","ssl,handshake");
+ super.setUp();
+ }
+
+ protected Socket createSocket(URI connectUri) throws IOException {
+ SocketFactory factory = SSLSocketFactory.getDefault();
+ return factory.createSocket("127.0.0.1", connectUri.getPort());
+ }
+
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12SslAuthTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java?rev=1403869&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java Tue Oct 30 21:09:15 2012
@@ -0,0 +1,387 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.stomp;
+
+import java.io.IOException;
+import java.net.Socket;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.UnknownHostException;
+
+import javax.jms.Connection;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.CombinationTestSupport;
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class Stomp12Test extends CombinationTestSupport {
+
+ private static final Logger LOG = LoggerFactory.getLogger(Stomp12Test.class);
+
+ protected String bindAddress = "stomp://localhost:61613";
+ protected String confUri = "xbean:org/apache/activemq/transport/stomp/stomp-auth-broker.xml";
+ protected String jmsUri = "vm://localhost";
+
+ private BrokerService broker;
+ private StompConnection stompConnection = new StompConnection();
+ private Connection connection;
+
+ @Override
+ protected void setUp() throws Exception {
+
+ broker = BrokerFactory.createBroker(new URI(confUri));
+ broker.start();
+ broker.waitUntilStarted();
+
+ stompConnect();
+
+ ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(jmsUri);
+ connection = cf.createConnection("system", "manager");
+ connection.start();
+ }
+
+ private void stompConnect() throws IOException, URISyntaxException, UnknownHostException {
+ URI connectUri = new URI(bindAddress);
+ stompConnection.open(createSocket(connectUri));
+ }
+
+ protected Socket createSocket(URI connectUri) throws IOException {
+ return new Socket("127.0.0.1", connectUri.getPort());
+ }
+
+ protected String getQueueName() {
+ return getClass().getName() + "." + getName();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ try {
+ stompDisconnect();
+ } catch(Exception e) {
+ // Some tests explicitly disconnect from stomp so can ignore
+ } finally {
+ broker.stop();
+ broker.waitUntilStopped();
+ }
+ }
+
+ private void stompDisconnect() throws IOException {
+ if (stompConnection != null) {
+ stompConnection.close();
+ stompConnection = null;
+ }
+ }
+
+ @Test
+ public void testTelnetStyleSends() throws Exception {
+
+ stompConnection.setVersion(Stomp.V1_2);
+
+ String connect = "CONNECT\r\n" +
+ "accept-version:1.2\r\n" +
+ "login:system\r\n" +
+ "passcode:manager\r\n" +
+ "\r\n" +
+ "\u0000\r\n";
+
+ stompConnection.sendFrame(connect);
+
+ String f = stompConnection.receiveFrame();
+ LOG.info("Broker sent: " + f);
+
+ assertTrue(f.startsWith("CONNECTED"));
+ assertTrue(f.indexOf("version:1.2") >= 0);
+ assertTrue(f.indexOf("session:") >= 0);
+
+ String send = "SUBSCRIBE\r\n" +
+ "id:1\r\n" +
+ "destination:/queue/" + getQueueName() + "\r\n" +
+ "receipt:1\r\n" +
+ "\r\n"+
+ "\u0000\r\n";
+
+ stompConnection.sendFrame(send);
+
+ StompFrame receipt = stompConnection.receive();
+ LOG.info("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ String receiptId = receipt.getHeaders().get("receipt-id");
+ assertEquals("1", receiptId);
+
+ String disconnect = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(disconnect);
+ }
+
+ @Test
+ public void testClientAckWithoutAckId() throws Exception {
+
+ stompConnection.setVersion(Stomp.V1_2);
+
+ String connect = "STOMP\r\n" +
+ "accept-version:1.2\r\n" +
+ "login:system\r\n" +
+ "passcode:manager\r\n" +
+ "\r\n" +
+ "\u0000\r\n";
+
+ stompConnection.sendFrame(connect);
+
+ String f = stompConnection.receiveFrame();
+ LOG.info("Broker sent: " + f);
+
+ assertTrue(f.startsWith("CONNECTED"));
+ assertTrue(f.indexOf("version:1.2") >= 0);
+ assertTrue(f.indexOf("session:") >= 0);
+
+ String subscribe = "SUBSCRIBE\n" +
+ "id:1\n" +
+ "ack:client\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "receipt:1\n" +
+ "\n" + Stomp.NULL;
+
+ stompConnection.sendFrame(subscribe);
+
+ StompFrame receipt = stompConnection.receive();
+ LOG.info("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ String receiptId = receipt.getHeaders().get("receipt-id");
+ assertEquals("1", receiptId);
+
+ String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "1" + Stomp.NULL;
+ stompConnection.sendFrame(message);
+
+ StompFrame received = stompConnection.receive();
+ assertTrue(received.getAction().equals("MESSAGE"));
+ assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+ assertEquals("1", received.getBody());
+
+ String frame = "ACK\n" + "message-id:" +
+ received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ received = stompConnection.receive();
+ assertTrue(received.getAction().equals("ERROR"));
+ LOG.info("Broker sent: " + received);
+
+ String disconnect = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(disconnect);
+ }
+
+ @Test
+ public void testClientAck() throws Exception {
+
+ stompConnection.setVersion(Stomp.V1_2);
+
+ String connect = "STOMP\r\n" +
+ "accept-version:1.2\r\n" +
+ "login:system\r\n" +
+ "passcode:manager\r\n" +
+ "\r\n" +
+ "\u0000\r\n";
+
+ stompConnection.sendFrame(connect);
+
+ String f = stompConnection.receiveFrame();
+ LOG.info("Broker sent: " + f);
+
+ assertTrue(f.startsWith("CONNECTED"));
+ assertTrue(f.indexOf("version:1.2") >= 0);
+ assertTrue(f.indexOf("session:") >= 0);
+
+ String subscribe = "SUBSCRIBE\n" +
+ "id:1\n" +
+ "ack:client\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "receipt:1\n" +
+ "\n" + Stomp.NULL;
+
+ stompConnection.sendFrame(subscribe);
+
+ StompFrame receipt = stompConnection.receive();
+ LOG.info("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ String receiptId = receipt.getHeaders().get("receipt-id");
+ assertEquals("1", receiptId);
+
+ String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "1" + Stomp.NULL;
+ stompConnection.sendFrame(message);
+ message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "2" + Stomp.NULL;
+ stompConnection.sendFrame(message);
+
+ StompFrame received = stompConnection.receive();
+ LOG.info("Stomp Message: {}", received);
+ assertTrue(received.getAction().equals("MESSAGE"));
+ assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+ assertEquals("1", received.getBody());
+
+ received = stompConnection.receive();
+ LOG.info("Stomp Message: {}", received);
+ assertTrue(received.getAction().equals("MESSAGE"));
+ assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+ assertEquals("2", received.getBody());
+
+ String frame = "ACK\n" + "id:" +
+ received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = "DISCONNECT\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ try {
+ Thread.sleep(400);
+ } catch (InterruptedException e){}
+
+ // reconnect and send some messages to the offline subscribers and then try to get
+ // them after subscribing again.
+ stompConnect();
+ stompConnection.sendFrame(connect);
+ frame = stompConnection.receiveFrame();
+ LOG.debug("Broker sent: " + frame);
+ assertTrue(frame.startsWith("CONNECTED"));
+
+ stompConnection.sendFrame(subscribe);
+
+ receipt = stompConnection.receive();
+ LOG.info("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ receiptId = receipt.getHeaders().get("receipt-id");
+ assertEquals("1", receiptId);
+
+ message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "3" + Stomp.NULL;
+ stompConnection.sendFrame(message);
+
+ received = stompConnection.receive();
+ LOG.info("Stomp Message: {}", received);
+ assertTrue(received.getAction().equals("MESSAGE"));
+ assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+ assertEquals("3", received.getBody());
+
+ frame = "ACK\n" + "id:" +
+ received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ String disconnect = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(disconnect);
+ }
+
+ @Test
+ public void testClientIndividualAck() throws Exception {
+
+ stompConnection.setVersion(Stomp.V1_2);
+
+ String connect = "STOMP\r\n" +
+ "accept-version:1.2\r\n" +
+ "login:system\r\n" +
+ "passcode:manager\r\n" +
+ "\r\n" +
+ "\u0000\r\n";
+
+ stompConnection.sendFrame(connect);
+
+ String f = stompConnection.receiveFrame();
+ LOG.info("Broker sent: " + f);
+
+ assertTrue(f.startsWith("CONNECTED"));
+ assertTrue(f.indexOf("version:1.2") >= 0);
+ assertTrue(f.indexOf("session:") >= 0);
+
+ String subscribe = "SUBSCRIBE\n" +
+ "id:1\n" +
+ "ack:client-individual\n" +
+ "destination:/queue/" + getQueueName() + "\n" +
+ "receipt:1\n" +
+ "\n" + Stomp.NULL;
+
+ stompConnection.sendFrame(subscribe);
+
+ StompFrame receipt = stompConnection.receive();
+ LOG.info("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ String receiptId = receipt.getHeaders().get("receipt-id");
+ assertEquals("1", receiptId);
+
+ String message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "1" + Stomp.NULL;
+ stompConnection.sendFrame(message);
+ message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "2" + Stomp.NULL;
+ stompConnection.sendFrame(message);
+
+ StompFrame received = stompConnection.receive();
+ assertTrue(received.getAction().equals("MESSAGE"));
+ assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+ assertEquals("1", received.getBody());
+
+ received = stompConnection.receive();
+ assertTrue(received.getAction().equals("MESSAGE"));
+ assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+ assertEquals("2", received.getBody());
+
+ String frame = "ACK\n" + "id:" +
+ received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+ try {
+ Thread.sleep(400);
+ } catch (InterruptedException e){}
+
+ // reconnect and send some messages to the offline subscribers and then try to get
+ // them after subscribing again.
+ stompConnect();
+ stompConnection.sendFrame(connect);
+ frame = stompConnection.receiveFrame();
+ LOG.debug("Broker sent: " + frame);
+ assertTrue(frame.startsWith("CONNECTED"));
+
+ stompConnection.sendFrame(subscribe);
+
+ receipt = stompConnection.receive();
+ LOG.info("Broker sent: " + receipt);
+ assertTrue(receipt.getAction().startsWith("RECEIPT"));
+ receiptId = receipt.getHeaders().get("receipt-id");
+ assertEquals("1", receiptId);
+
+ message = "SEND\n" + "destination:/queue/" + getQueueName() + "\n\n" + "3" + Stomp.NULL;
+ stompConnection.sendFrame(message);
+
+ received = stompConnection.receive();
+ assertTrue(received.getAction().equals("MESSAGE"));
+ assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+ assertEquals("1", received.getBody());
+
+ frame = "ACK\n" + "id:" +
+ received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ received = stompConnection.receive();
+ assertTrue(received.getAction().equals("MESSAGE"));
+ assertTrue(received.getHeaders().containsKey(Stomp.Headers.Message.ACK_ID));
+ assertEquals("3", received.getBody());
+
+ frame = "ACK\n" + "id:" +
+ received.getHeaders().get(Stomp.Headers.Message.ACK_ID) + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(frame);
+
+ String disconnect = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
+ stompConnection.sendFrame(disconnect);
+ }
+
+}
Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/stomp/Stomp12Test.java
------------------------------------------------------------------------------
svn:eol-style = native