You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/11/11 19:23:36 UTC
[06/11] activemq-artemis git commit: Stomp refactor + track
autocreation for addresses
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
index 26f2a2f..a55f471 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestBase.java
@@ -26,34 +26,17 @@ import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import java.io.IOException;
-import java.net.Socket;
-import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import io.netty.bootstrap.Bootstrap;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInitializer;
-import io.netty.channel.ChannelOption;
-import io.netty.channel.EventLoopGroup;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.channel.nio.NioEventLoopGroup;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.channel.socket.nio.NioSocketChannel;
-import io.netty.handler.codec.string.StringDecoder;
-import io.netty.handler.codec.string.StringEncoder;
+import java.util.UUID;
+
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.protocol.mqtt.MQTTProtocolManagerFactory;
+import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory;
import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory;
@@ -63,6 +46,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.security.Role;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory;
import org.apache.activemq.artemis.jms.server.JMSServerManager;
@@ -73,13 +57,16 @@ import org.apache.activemq.artemis.jms.server.config.impl.TopicConfigurationImpl
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
import org.apache.activemq.artemis.tests.unit.util.InVMNamingContext;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
-import org.junit.After;
import org.junit.Before;
public abstract class StompTestBase extends ActiveMQTestBase {
+ protected String hostname = "127.0.0.1";
+
protected final int port = 61613;
private ConnectionFactory connectionFactory;
@@ -98,98 +85,56 @@ public abstract class StompTestBase extends ActiveMQTestBase {
protected String defPass = "wombats";
- protected boolean autoCreateServer = true;
-
- private List<Bootstrap> bootstraps = new ArrayList<>();
-
- // private Channel channel;
-
- private List<BlockingQueue<String>> priorityQueues = new ArrayList<>();
-
- private List<EventLoopGroup> groups = new ArrayList<>();
-
- private List<Channel> channels = new ArrayList<>();
-
// Implementation methods
// -------------------------------------------------------------------------
- @Override
- @Before
- public void setUp() throws Exception {
- super.setUp();
- if (autoCreateServer) {
- server = createServer();
- addServer(server.getActiveMQServer());
- server.start();
- connectionFactory = createConnectionFactory();
- createBootstrap();
-
- if (isSecurityEnabled()) {
- connection = connectionFactory.createConnection("brianm", "wombats");
- } else {
- connection = connectionFactory.createConnection();
- }
- session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- queue = session.createQueue(getQueueName());
- topic = session.createTopic(getTopicName());
- connection.start();
- }
+ public boolean isCompressLargeMessages() {
+ return false;
}
- private void createBootstrap() {
- createBootstrap(0, port);
+ public boolean isSecurityEnabled() {
+ return false;
}
- protected void createBootstrap(int port) {
- createBootstrap(0, port);
+ public boolean isPersistenceEnabled() {
+ return false;
}
- protected void createBootstrap(final int index, int port) {
- priorityQueues.add(index, new ArrayBlockingQueue<String>(1000));
- groups.add(index, new NioEventLoopGroup());
- bootstraps.add(index, new Bootstrap());
- bootstraps.get(index).group(groups.get(index)).channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true).handler(new ChannelInitializer<SocketChannel>() {
- @Override
- public void initChannel(SocketChannel ch) throws Exception {
- addChannelHandlers(index, ch);
- }
- });
-
- // Start the client.
- try {
- channels.add(index, bootstraps.get(index).connect("localhost", port).sync().channel());
- handshake();
- } catch (InterruptedException e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
-
+ public boolean isEnableStompMessageId() {
+ return false;
}
- protected void handshake() throws InterruptedException {
+ public Integer getStompMinLargeMessageSize() {
+ return null;
}
- protected void addChannelHandlers(int index, SocketChannel ch) throws URISyntaxException {
- ch.pipeline().addLast("decoder", new StringDecoder(StandardCharsets.UTF_8));
- ch.pipeline().addLast("encoder", new StringEncoder(StandardCharsets.UTF_8));
- ch.pipeline().addLast(new StompClientHandler(index));
+ public List<String> getIncomingInterceptors() {
+ return null;
}
- protected void setUpAfterServer() throws Exception {
- setUpAfterServer(false);
+ public List<String> getOutgoingInterceptors() {
+ return null;
}
- protected void setUpAfterServer(boolean jmsCompressLarge) throws Exception {
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+
+ server = createServer();
+ server.start();
connectionFactory = createConnectionFactory();
- ActiveMQConnectionFactory activeMQConnectionFactory = (ActiveMQConnectionFactory) connectionFactory;
- activeMQConnectionFactory.setCompressLargeMessage(jmsCompressLarge);
- createBootstrap();
+ ((ActiveMQConnectionFactory)connectionFactory).setCompressLargeMessage(isCompressLargeMessages());
- connection = connectionFactory.createConnection();
- connection.start();
+ if (isSecurityEnabled()) {
+ connection = connectionFactory.createConnection("brianm", "wombats");
+ } else {
+ connection = connectionFactory.createConnection();
+ }
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
queue = session.createQueue(getQueueName());
topic = session.createTopic(getTopicName());
-
+ connection.start();
}
/**
@@ -198,14 +143,30 @@ public abstract class StompTestBase extends ActiveMQTestBase {
*/
protected JMSServerManager createServer() throws Exception {
Map<String, Object> params = new HashMap<>();
- params.put(TransportConstants.PROTOCOLS_PROP_NAME, StompProtocolManagerFactory.STOMP_PROTOCOL_NAME);
+ params.put(TransportConstants.PROTOCOLS_PROP_NAME, StompProtocolManagerFactory.STOMP_PROTOCOL_NAME + "," + MQTTProtocolManagerFactory.MQTT_PROTOCOL_NAME);
params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
params.put(TransportConstants.STOMP_CONSUMERS_CREDIT, "-1");
+ if (isEnableStompMessageId()) {
+ params.put(TransportConstants.STOMP_ENABLE_MESSAGE_ID, true);
+ }
+ if (getStompMinLargeMessageSize() != null) {
+ params.put(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE, 2048);
+ }
TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
- TransportConfiguration allTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName());
- Configuration config = createBasicConfig().setSecurityEnabled(isSecurityEnabled()).setPersistenceEnabled(true).addAcceptorConfiguration(stompTransport).addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
- config.addAcceptorConfiguration(allTransport);
+ Configuration config = createBasicConfig().setSecurityEnabled(isSecurityEnabled())
+ .setPersistenceEnabled(isPersistenceEnabled())
+ .addAcceptorConfiguration(stompTransport)
+ .addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName()))
+ .setConnectionTtlCheckInterval(500);
+
+ if (getIncomingInterceptors() != null) {
+ config.setIncomingInterceptorClassNames(getIncomingInterceptors());
+ }
+
+ if (getOutgoingInterceptors() != null) {
+ config.setOutgoingInterceptorClassNames(getOutgoingInterceptors());
+ }
ActiveMQServer activeMQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass));
@@ -222,195 +183,348 @@ public abstract class StompTestBase extends ActiveMQTestBase {
}
JMSConfiguration jmsConfig = new JMSConfigurationImpl();
- jmsConfig.getQueueConfigurations().add(new JMSQueueConfigurationImpl().setName(getQueueName()).setDurable(false).setBindings(getQueueName()));
+ jmsConfig.getQueueConfigurations().add(new JMSQueueConfigurationImpl().setName(getQueueName()).setBindings(getQueueName()));
jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl().setName(getTopicName()).setBindings(getTopicName()));
server = new JMSServerManagerImpl(activeMQServer, jmsConfig);
server.setRegistry(new JndiBindingRegistry(new InVMNamingContext()));
return server;
}
- @Override
- @After
- public void tearDown() throws Exception {
- if (autoCreateServer) {
- connection.close();
-
- for (EventLoopGroup group : groups) {
- if (group != null) {
- for (Channel channel : channels) {
- channel.close();
- }
- group.shutdownGracefully(0, 5000, TimeUnit.MILLISECONDS);
- }
- }
- }
- super.tearDown();
+ protected ConnectionFactory createConnectionFactory() {
+ return new ActiveMQJMSConnectionFactory(false, new TransportConfiguration(InVMConnectorFactory.class.getName()));
}
- protected void cleanUp() throws Exception {
- connection.close();
- if (groups.get(0) != null) {
- groups.get(0).shutdown();
- }
+ protected String getQueueName() {
+ return "testQueue";
}
- protected void reconnect() throws Exception {
- reconnect(0);
+ protected String getQueuePrefix() {
+ return "";
}
- protected void reconnect(long sleep) throws Exception {
- groups.get(0).shutdown();
+ protected String getTopicName() {
+ return "testtopic";
+ }
- if (sleep > 0) {
- Thread.sleep(sleep);
- }
+ protected String getTopicPrefix() {
+ return "";
+ }
- createBootstrap();
+ public void sendJmsMessage(String msg) throws Exception {
+ sendJmsMessage(msg, queue);
}
- protected ConnectionFactory createConnectionFactory() {
- return new ActiveMQJMSConnectionFactory(false, new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ public void sendJmsMessage(String msg, Destination destination) throws Exception {
+ MessageProducer producer = session.createProducer(destination);
+ TextMessage message = session.createTextMessage(msg);
+ producer.send(message);
+ IntegrationTestLogger.LOGGER.info("Sent message from JMS client to: " + destination);
}
- protected Socket createSocket() throws IOException {
- return new Socket("localhost", port);
+ public void sendJmsMessage(byte[] data, Destination destination) throws Exception {
+ sendJmsMessage(data, "foo", "xyz", destination);
}
- protected String getQueueName() {
- return "test";
+ public void sendJmsMessage(String msg, String propertyName, String propertyValue) throws Exception {
+ sendJmsMessage(msg.getBytes(StandardCharsets.UTF_8), propertyName, propertyValue, queue);
}
- protected String getQueuePrefix() {
- return "";
+ public void sendJmsMessage(byte[] data,
+ String propertyName,
+ String propertyValue,
+ Destination destination) throws Exception {
+ MessageProducer producer = session.createProducer(destination);
+ BytesMessage message = session.createBytesMessage();
+ message.setStringProperty(propertyName, propertyValue);
+ message.writeBytes(data);
+ producer.send(message);
}
- protected String getTopicName() {
- return "testtopic";
+ public void abortTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException {
+ ClientStompFrame abortFrame = conn.createFrame(Stomp.Commands.ABORT)
+ .addHeader(Stomp.Headers.TRANSACTION, txID);
+
+ conn.sendFrame(abortFrame);
}
- protected String getTopicPrefix() {
- return "";
+ public void beginTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException {
+ ClientStompFrame beginFrame = conn.createFrame(Stomp.Commands.BEGIN)
+ .addHeader(Stomp.Headers.TRANSACTION, txID);
+
+ conn.sendFrame(beginFrame);
}
- protected void assertChannelClosed() throws InterruptedException {
- assertChannelClosed(0);
+ public void commitTransaction(StompClientConnection conn, String txID) throws IOException, InterruptedException {
+ commitTransaction(conn, txID, false);
}
- protected void assertChannelClosed(int index) throws InterruptedException {
- boolean closed = channels.get(index).closeFuture().await(5000);
- assertTrue("channel not closed", closed);
+ public void commitTransaction(StompClientConnection conn,
+ String txID,
+ boolean receipt) throws IOException, InterruptedException {
+ ClientStompFrame beginFrame = conn.createFrame(Stomp.Commands.COMMIT)
+ .addHeader(Stomp.Headers.TRANSACTION, txID);
+ String uuid = UUID.randomUUID().toString();
+ if (receipt) {
+ beginFrame.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
+ }
+ ClientStompFrame resp = conn.sendFrame(beginFrame);
+ if (receipt) {
+ assertEquals(uuid, resp.getHeader(Stomp.Headers.Response.RECEIPT_ID));
+ }
}
- public void sendFrame(String data) throws Exception {
- IntegrationTestLogger.LOGGER.info("Sending: " + data);
- sendFrame(0, data);
+ public void ack(StompClientConnection conn,
+ String subscriptionId,
+ ClientStompFrame messageIdFrame) throws IOException, InterruptedException {
+ String messageID = messageIdFrame.getHeader(Stomp.Headers.Message.MESSAGE_ID);
+
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.ACK)
+ .addHeader(Stomp.Headers.Message.MESSAGE_ID, messageID);
+
+ if (subscriptionId != null) {
+ frame.addHeader(Stomp.Headers.Ack.SUBSCRIPTION, subscriptionId);
+ }
+
+ ClientStompFrame response = conn.sendFrame(frame);
+ if (response != null) {
+ throw new IOException("failed to ack " + response);
+ }
}
- public void sendFrame(int index, String data) throws Exception {
- channels.get(index).writeAndFlush(data);
+ public void ack(StompClientConnection conn,
+ String subscriptionId,
+ String mid,
+ String txID) throws IOException, InterruptedException {
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.ACK)
+ .addHeader(Stomp.Headers.Ack.SUBSCRIPTION, subscriptionId)
+ .addHeader(Stomp.Headers.Message.MESSAGE_ID, mid);
+ if (txID != null) {
+ frame.addHeader(Stomp.Headers.TRANSACTION, txID);
+ }
+
+ conn.sendFrame(frame);
}
- public void sendFrame(byte[] data) throws Exception {
- sendFrame(0, data);
+ public void nack(StompClientConnection conn, String subscriptionId, String messageId) throws IOException, InterruptedException {
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.NACK)
+ .addHeader(Stomp.Headers.Ack.SUBSCRIPTION, subscriptionId)
+ .addHeader(Stomp.Headers.Message.MESSAGE_ID, messageId);
+
+ conn.sendFrame(frame);
}
- public void sendFrame(int index, byte[] data) throws Exception {
- ByteBuf buffer = Unpooled.buffer(data.length);
- buffer.writeBytes(data);
- channels.get(index).writeAndFlush(buffer);
+ public ClientStompFrame subscribe(StompClientConnection conn,
+ String subscriptionId) throws IOException, InterruptedException {
+ return subscribe(conn, subscriptionId, Stomp.Headers.Subscribe.AckModeValues.AUTO, null, null);
}
- public String receiveFrame(long timeOut) throws Exception {
- return receiveFrame(0, timeOut);
+ public ClientStompFrame subscribe(StompClientConnection conn,
+ String subscriptionId,
+ String ack) throws IOException, InterruptedException {
+ return subscribe(conn, subscriptionId, ack, null, null);
}
- public String receiveFrame(int index, long timeOut) throws Exception {
- String msg = priorityQueues.get(index).poll(timeOut, TimeUnit.MILLISECONDS);
- return msg;
+ public ClientStompFrame subscribe(StompClientConnection conn,
+ String subscriptionId,
+ String ack,
+ String durableId) throws IOException, InterruptedException {
+ return subscribe(conn, subscriptionId, ack, durableId, null);
}
- public void sendMessage(String msg) throws Exception {
- sendMessage(msg, queue);
+ public ClientStompFrame subscribe(StompClientConnection conn,
+ String subscriptionId,
+ String ack,
+ String durableId,
+ boolean receipt) throws IOException, InterruptedException {
+ return subscribe(conn, subscriptionId, ack, durableId, null, receipt);
}
- public void sendMessage(String msg, Destination destination) throws Exception {
- MessageProducer producer = session.createProducer(destination);
- TextMessage message = session.createTextMessage(msg);
- producer.send(message);
+ public ClientStompFrame subscribe(StompClientConnection conn,
+ String subscriptionId,
+ String ack,
+ String durableId,
+ String selector) throws IOException, InterruptedException {
+ return subscribe(conn, subscriptionId, ack, durableId, selector, false);
}
- public void sendMessage(byte[] data, Destination destination) throws Exception {
- sendMessage(data, "foo", "xyz", destination);
+ public ClientStompFrame subscribe(StompClientConnection conn,
+ String subscriptionId,
+ String ack,
+ String durableId,
+ String selector,
+ boolean receipt) throws IOException, InterruptedException {
+ return subscribe(conn, subscriptionId, ack, durableId, selector, getQueuePrefix() + getQueueName(), receipt);
}
- public void sendMessage(String msg, String propertyName, String propertyValue) throws Exception {
- sendMessage(msg.getBytes(StandardCharsets.UTF_8), propertyName, propertyValue, queue);
+ public ClientStompFrame subscribe(StompClientConnection conn,
+ String subscriptionId,
+ String ack,
+ String durableId,
+ String selector,
+ String destination,
+ boolean receipt) throws IOException, InterruptedException {
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
+ .addHeader(Stomp.Headers.Subscribe.SUBSCRIPTION_TYPE, AddressInfo.RoutingType.ANYCAST.toString())
+ .addHeader(Stomp.Headers.Subscribe.DESTINATION, destination);
+ if (subscriptionId != null) {
+ frame.addHeader(Stomp.Headers.Subscribe.ID, subscriptionId);
+ }
+ if (ack != null) {
+ frame.addHeader(Stomp.Headers.Subscribe.ACK_MODE, ack);
+ }
+ if (durableId != null) {
+ frame.addHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME, durableId);
+ }
+ if (selector != null) {
+ frame.addHeader(Stomp.Headers.Subscribe.SELECTOR, selector);
+ }
+ String uuid = UUID.randomUUID().toString();
+ if (receipt) {
+ frame.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
+ }
+
+ frame = conn.sendFrame(frame);
+
+ if (receipt) {
+ assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
+ }
+
+ return frame;
}
- public void sendMessage(byte[] data,
- String propertyName,
- String propertyValue,
- Destination destination) throws Exception {
- MessageProducer producer = session.createProducer(destination);
- BytesMessage message = session.createBytesMessage();
- message.setStringProperty(propertyName, propertyValue);
- message.writeBytes(data);
- producer.send(message);
+ public ClientStompFrame subscribeTopic(StompClientConnection conn,
+ String subscriptionId,
+ String ack,
+ String durableId) throws IOException, InterruptedException {
+ return subscribeTopic(conn, subscriptionId, ack, durableId, false);
}
- protected void waitForReceipt() throws Exception {
- String frame = receiveFrame(50000);
- assertNotNull(frame);
- assertTrue(frame.indexOf("RECEIPT") > -1);
+ public ClientStompFrame subscribeTopic(StompClientConnection conn,
+ String subscriptionId,
+ String ack,
+ String durableId,
+ boolean receipt) throws IOException, InterruptedException {
+ return subscribeTopic(conn, subscriptionId, ack, durableId, receipt, false);
}
- protected void waitForFrameToTakeEffect() throws InterruptedException {
- // bit of a dirty hack :)
- // another option would be to force some kind of receipt to be returned
- // from the frame
- Thread.sleep(500);
+ public ClientStompFrame subscribeTopic(StompClientConnection conn,
+ String subscriptionId,
+ String ack,
+ String durableId,
+ boolean receipt,
+ boolean noLocal) throws IOException, InterruptedException {
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.SUBSCRIBE)
+ .addHeader(Stomp.Headers.Subscribe.DESTINATION, getTopicPrefix() + getTopicName());
+ if (subscriptionId != null) {
+ frame.addHeader(Stomp.Headers.Subscribe.ID, subscriptionId);
+ }
+ if (ack != null) {
+ frame.addHeader(Stomp.Headers.Subscribe.ACK_MODE, ack);
+ }
+ if (durableId != null) {
+ frame.addHeader(Stomp.Headers.Subscribe.DURABLE_SUBSCRIPTION_NAME, durableId);
+ }
+ String uuid = UUID.randomUUID().toString();
+ if (receipt) {
+ frame.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
+ }
+ if (noLocal) {
+ frame.addHeader(Stomp.Headers.Subscribe.NO_LOCAL, "true");
+ }
+
+ frame = conn.sendFrame(frame);
+
+ if (receipt) {
+ assertNotNull("Requested receipt, but response is null", frame);
+ assertTrue(frame.getHeader(Stomp.Headers.Response.RECEIPT_ID).equals(uuid));
+ }
+
+ return frame;
}
- public boolean isSecurityEnabled() {
- return false;
+ public ClientStompFrame unsubscribe(StompClientConnection conn, String subscriptionId) throws IOException, InterruptedException {
+ return unsubscribe(conn, subscriptionId, null, false, false);
}
- class StompClientHandler extends SimpleChannelInboundHandler<String> {
+ public ClientStompFrame unsubscribe(StompClientConnection conn,
+ String subscriptionId,
+ boolean receipt) throws IOException, InterruptedException {
+ return unsubscribe(conn, subscriptionId, null, receipt, false);
+ }
- int index = 0;
+ public ClientStompFrame unsubscribe(StompClientConnection conn,
+ String subscriptionId,
+ String destination,
+ boolean receipt,
+ boolean durable) throws IOException, InterruptedException {
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.UNSUBSCRIBE);
+ if (durable && subscriptionId != null) {
+ frame.addHeader(Stomp.Headers.Unsubscribe.DURABLE_SUBSCRIPTION_NAME, subscriptionId);
+ } else if (!durable && subscriptionId != null) {
+ frame.addHeader(Stomp.Headers.Unsubscribe.ID, subscriptionId);
+ }
- StompClientHandler(int index) {
- this.index = index;
+ if (destination != null) {
+ frame.addHeader(Stomp.Headers.Unsubscribe.DESTINATION, destination);
}
- StringBuffer currentMessage = new StringBuffer("");
+ String uuid = UUID.randomUUID().toString();
- @Override
- protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception {
- currentMessage.append(msg);
- String fullMessage = currentMessage.toString();
- if (fullMessage.contains("\0\n")) {
- int messageEnd = fullMessage.indexOf("\0\n");
- String actualMessage = fullMessage.substring(0, messageEnd);
- fullMessage = fullMessage.substring(messageEnd + 2);
- currentMessage = new StringBuffer("");
- BlockingQueue queue = priorityQueues.get(index);
- if (queue == null) {
- queue = new ArrayBlockingQueue(1000);
- priorityQueues.add(index, queue);
- }
- queue.add(actualMessage);
- if (fullMessage.length() > 0) {
- channelRead(ctx, fullMessage);
- }
- }
+ if (receipt) {
+ frame.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
}
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- cause.printStackTrace();
- ctx.close();
+ frame = conn.sendFrame(frame);
+
+ if (receipt) {
+ assertEquals(Stomp.Responses.RECEIPT, frame.getCommand());
+ assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
}
+
+ return frame;
+ }
+
+ public ClientStompFrame send(StompClientConnection conn, String destination, String contentType, String body) throws IOException, InterruptedException {
+ return send(conn, destination, contentType, body, false);
}
+ public ClientStompFrame send(StompClientConnection conn, String destination, String contentType, String body, boolean receipt) throws IOException, InterruptedException {
+ return send(conn, destination, contentType, body, receipt, null);
+ }
+
+ public ClientStompFrame send(StompClientConnection conn, String destination, String contentType, String body, boolean receipt, AddressInfo.RoutingType destinationType) throws IOException, InterruptedException {
+ return send(conn, destination, contentType, body, receipt, destinationType, null);
+ }
+
+ public ClientStompFrame send(StompClientConnection conn, String destination, String contentType, String body, boolean receipt, AddressInfo.RoutingType destinationType, String txId) throws IOException, InterruptedException {
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND)
+ .addHeader(Stomp.Headers.Send.DESTINATION, destination)
+ .setBody(body);
+
+ if (contentType != null) {
+ frame.addHeader(Stomp.Headers.CONTENT_TYPE, contentType);
+ }
+
+ if (destinationType != null) {
+ frame.addHeader(Stomp.Headers.Send.DESTINATION_TYPE, destinationType.toString());
+ }
+
+ if (txId != null) {
+ frame.addHeader(Stomp.Headers.TRANSACTION, txId);
+ }
+
+ String uuid = UUID.randomUUID().toString();
+
+ if (receipt) {
+ frame.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
+ }
+ frame = conn.sendFrame(frame);
+
+ if (receipt) {
+ assertEquals(Stomp.Responses.RECEIPT, frame.getCommand());
+ assertEquals(uuid, frame.getHeader(Stomp.Headers.Response.RECEIPT_ID));
+ }
+
+ return frame;
+ }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithInterceptors.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithInterceptors.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithInterceptors.java
new file mode 100644
index 0000000..9bb9bf2
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithInterceptors.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.stomp;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.artemis.api.core.Interceptor;
+import org.apache.activemq.artemis.core.protocol.core.Packet;
+import org.apache.activemq.artemis.core.protocol.stomp.StompFrame;
+import org.apache.activemq.artemis.core.protocol.stomp.StompFrameInterceptor;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StompTestWithInterceptors extends StompTestBase {
+
+ @Override
+ public List<String> getIncomingInterceptors() {
+ List<String> stompIncomingInterceptor = new ArrayList<>();
+ stompIncomingInterceptor.add("org.apache.activemq.artemis.tests.integration.stomp.StompTestWithInterceptors$MyIncomingStompFrameInterceptor");
+ stompIncomingInterceptor.add("org.apache.activemq.artemis.tests.integration.stomp.StompTestWithInterceptors$MyCoreInterceptor");
+
+ return stompIncomingInterceptor;
+ }
+
+ @Override
+ public List<String> getOutgoingInterceptors() {
+ List<String> stompOutgoingInterceptor = new ArrayList<>();
+ stompOutgoingInterceptor.add("org.apache.activemq.artemis.tests.integration.stomp.StompTestWithInterceptors$MyOutgoingStompFrameInterceptor");
+
+ return stompOutgoingInterceptor;
+ }
+
+ @Test
+ public void stompFrameInterceptor() throws Exception {
+ MyIncomingStompFrameInterceptor.incomingInterceptedFrames.clear();
+ MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.clear();
+
+ Thread.sleep(200);
+
+ // So we clear them here
+ MyCoreInterceptor.incomingInterceptedFrames.clear();
+
+ StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
+ conn.connect(defUser, defPass);
+
+ ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("subscription-type", "ANYCAST");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+ conn.sendFrame(subFrame);
+
+ assertEquals(0, MyCoreInterceptor.incomingInterceptedFrames.size());
+ sendJmsMessage(getName());
+
+ // Something was supposed to be called on sendMessages
+ assertTrue("core interceptor is not working", MyCoreInterceptor.incomingInterceptedFrames.size() > 0);
+
+ conn.receiveFrame(10000);
+
+ ClientStompFrame frame = conn.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.setBody("Hello World");
+ conn.sendFrame(frame);
+
+ conn.disconnect();
+
+ List<String> incomingCommands = new ArrayList<>(4);
+ incomingCommands.add("CONNECT");
+ incomingCommands.add("SUBSCRIBE");
+ incomingCommands.add("SEND");
+ incomingCommands.add("DISCONNECT");
+
+ List<String> outgoingCommands = new ArrayList<>(3);
+ outgoingCommands.add("CONNECTED");
+ outgoingCommands.add("MESSAGE");
+ outgoingCommands.add("MESSAGE");
+
+ long timeout = System.currentTimeMillis() + 1000;
+
+ // Things are async, giving some time to things arrive before we actually assert
+ while (MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size() < 4 &&
+ MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size() < 3 &&
+ timeout > System.currentTimeMillis()) {
+ Thread.sleep(10);
+ }
+
+ Assert.assertEquals(4, MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size());
+ Assert.assertEquals(3, MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size());
+
+ for (int i = 0; i < MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size(); i++) {
+ Assert.assertEquals(incomingCommands.get(i), MyIncomingStompFrameInterceptor.incomingInterceptedFrames.get(i).getCommand());
+ Assert.assertEquals("incomingInterceptedVal", MyIncomingStompFrameInterceptor.incomingInterceptedFrames.get(i).getHeader("incomingInterceptedProp"));
+ }
+
+ for (int i = 0; i < MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size(); i++) {
+ Assert.assertEquals(outgoingCommands.get(i), MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.get(i).getCommand());
+ }
+
+ Assert.assertEquals("incomingInterceptedVal", MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.get(2).getHeader("incomingInterceptedProp"));
+ Assert.assertEquals("outgoingInterceptedVal", MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.get(2).getHeader("outgoingInterceptedProp"));
+ }
+
+ public static class MyCoreInterceptor implements Interceptor {
+
+ static List<Packet> incomingInterceptedFrames = new ArrayList<>();
+
+ @Override
+ public boolean intercept(Packet packet, RemotingConnection connection) {
+ IntegrationTestLogger.LOGGER.info("Core intercepted: " + packet);
+ incomingInterceptedFrames.add(packet);
+ return true;
+ }
+ }
+
+ public static class MyIncomingStompFrameInterceptor implements StompFrameInterceptor {
+
+ static List<StompFrame> incomingInterceptedFrames = new ArrayList<>();
+
+ @Override
+ public boolean intercept(StompFrame stompFrame, RemotingConnection connection) {
+ incomingInterceptedFrames.add(stompFrame);
+ stompFrame.addHeader("incomingInterceptedProp", "incomingInterceptedVal");
+ return true;
+ }
+ }
+
+ public static class MyOutgoingStompFrameInterceptor implements StompFrameInterceptor {
+
+ static List<StompFrame> outgoingInterceptedFrames = new ArrayList<>();
+
+ @Override
+ public boolean intercept(StompFrame stompFrame, RemotingConnection connection) {
+ outgoingInterceptedFrames.add(stompFrame);
+ stompFrame.addHeader("outgoingInterceptedProp", "outgoingInterceptedVal");
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithLargeMessages.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithLargeMessages.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithLargeMessages.java
new file mode 100644
index 0000000..929534f
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithLargeMessages.java
@@ -0,0 +1,438 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.stomp;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.TransportConfiguration;
+import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory;
+import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
+import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory;
+import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
+import org.apache.activemq.artemis.core.security.Role;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServers;
+import org.apache.activemq.artemis.jms.server.JMSServerManager;
+import org.apache.activemq.artemis.jms.server.config.JMSConfiguration;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSQueueConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.config.impl.TopicConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQJAASSecurityManager;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase;
+import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
+import org.apache.activemq.artemis.tests.unit.util.InVMNamingContext;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class StompTestWithLargeMessages extends StompTestBase {
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ }
+
+ @Override
+ public boolean isCompressLargeMessages() {
+ return true;
+ }
+
+ @Override
+ public boolean isPersistenceEnabled() {
+ return true;
+ }
+
+ @Override
+ public Integer getStompMinLargeMessageSize() {
+ return 2048;
+ }
+
+ //stomp sender -> large -> stomp receiver
+ @Test
+ public void testSendReceiveLargePersistentMessages() throws Exception {
+ StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
+ conn.connect(defUser, defPass);
+
+ int count = 10;
+ int msgSize = 1024 * 1024;
+ char[] contents = new char[msgSize];
+ for (int i = 0; i < msgSize; i++) {
+ contents[i] = 'A';
+ }
+ String body = new String(contents);
+
+ for (int i = 0; i < count; i++) {
+ ClientStompFrame frame = conn.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("persistent", "true");
+ frame.setBody(body);
+ conn.sendFrame(frame);
+ }
+
+ ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("subscription-type", "ANYCAST");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+ conn.sendFrame(subFrame);
+
+ for (int i = 0; i < count; i++) {
+ ClientStompFrame frame = conn.receiveFrame(60000);
+ Assert.assertNotNull(frame);
+ System.out.println("part of frame: " + frame.getBody().substring(0, 200));
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+ Assert.assertTrue(frame.getHeader("destination").equals(getQueuePrefix() + getQueueName()));
+ int index = frame.getBody().indexOf("AAAA");
+ assertEquals(msgSize, (frame.getBody().length() - index));
+ }
+
+ ClientStompFrame unsubFrame = conn.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ unsubFrame.addHeader("receipt", "567");
+ ClientStompFrame response = conn.sendFrame(unsubFrame);
+ assertNotNull(response);
+ assertNotNull(response.getCommand().equals("RECEIPT"));
+
+ conn.disconnect();
+ }
+
+ //core sender -> large -> stomp receiver
+ @Test
+ public void testReceiveLargePersistentMessagesFromCore() throws Exception {
+ StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
+ conn.connect(defUser, defPass);
+
+ int msgSize = 3 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+ char[] contents = new char[msgSize];
+ for (int i = 0; i < msgSize; i++) {
+ contents[i] = 'B';
+ }
+ String msg = new String(contents);
+
+ int count = 10;
+ for (int i = 0; i < count; i++) {
+ this.sendJmsMessage(msg);
+ }
+
+ ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("subscription-type", "ANYCAST");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+ conn.sendFrame(subFrame);
+
+ for (int i = 0; i < count; i++) {
+ ClientStompFrame frame = conn.receiveFrame(60000);
+ Assert.assertNotNull(frame);
+ System.out.println("part of frame: " + frame.getBody().substring(0, 200));
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+ Assert.assertTrue(frame.getHeader("destination").equals(getQueuePrefix() + getQueueName()));
+ int index = frame.getBody().indexOf("BBB");
+ assertEquals(msgSize, (frame.getBody().length() - index));
+ }
+
+ ClientStompFrame unsubFrame = conn.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ unsubFrame.addHeader("receipt", "567");
+ ClientStompFrame response = conn.sendFrame(unsubFrame);
+ assertNotNull(response);
+ assertNotNull(response.getCommand().equals("RECEIPT"));
+
+ conn.disconnect();
+ }
+
+ //stomp v12 sender -> large -> stomp v12 receiver
+ @Test
+ public void testSendReceiveLargePersistentMessagesV12() throws Exception {
+ StompClientConnection connV12 = StompClientConnectionFactory.createClientConnection("1.2", "localhost", port);
+ connV12.connect(defUser, defPass);
+
+ int count = 10;
+ int szBody = 1024 * 1024;
+ char[] contents = new char[szBody];
+ for (int i = 0; i < szBody; i++) {
+ contents[i] = 'A';
+ }
+ String body = new String(contents);
+
+ ClientStompFrame frame = connV12.createFrame("SEND");
+ frame.addHeader("destination-type", "ANYCAST");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.addHeader("persistent", "true");
+ frame.setBody(body);
+
+ for (int i = 0; i < count; i++) {
+ connV12.sendFrame(frame);
+ }
+
+ ClientStompFrame subFrame = connV12.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("subscription-type", "ANYCAST");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ connV12.sendFrame(subFrame);
+
+ for (int i = 0; i < count; i++) {
+ ClientStompFrame receiveFrame = connV12.receiveFrame(30000);
+
+ Assert.assertNotNull(receiveFrame);
+ System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 20));
+ Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE"));
+ Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName());
+ assertEquals(szBody, receiveFrame.getBody().length());
+ }
+
+ // remove susbcription
+ ClientStompFrame unsubFrame = connV12.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+ connV12.sendFrame(unsubFrame);
+
+ connV12.disconnect();
+ }
+
+ //core sender -> large -> stomp v12 receiver
+ @Test
+ public void testReceiveLargePersistentMessagesFromCoreV12() throws Exception {
+ int msgSize = 3 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
+ char[] contents = new char[msgSize];
+ for (int i = 0; i < msgSize; i++) {
+ contents[i] = 'B';
+ }
+ String msg = new String(contents);
+
+ int count = 10;
+ for (int i = 0; i < count; i++) {
+ this.sendJmsMessage(msg);
+ }
+
+ StompClientConnection connV12 = StompClientConnectionFactory.createClientConnection("1.2", "localhost", port);
+ connV12.connect(defUser, defPass);
+
+ ClientStompFrame subFrame = connV12.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("subscription-type", "ANYCAST");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+ connV12.sendFrame(subFrame);
+
+ for (int i = 0; i < count; i++) {
+ ClientStompFrame receiveFrame = connV12.receiveFrame(30000);
+
+ Assert.assertNotNull(receiveFrame);
+ System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 20));
+ Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE"));
+ Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName());
+ assertEquals(msgSize, receiveFrame.getBody().length());
+ }
+
+ // remove susbcription
+ ClientStompFrame unsubFrame = connV12.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+ connV12.sendFrame(unsubFrame);
+
+ connV12.disconnect();
+ }
+
+ //core sender -> large (compressed regular) -> stomp v10 receiver
+ @Test
+ public void testReceiveLargeCompressedToRegularPersistentMessagesFromCore() throws Exception {
+ StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
+ conn.connect(defUser, defPass);
+
+ LargeMessageTestBase.TestLargeMessageInputStream input = new LargeMessageTestBase.TestLargeMessageInputStream(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, true);
+ LargeMessageTestBase.adjustLargeCompression(true, input, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+ char[] contents = input.toArray();
+ String msg = new String(contents);
+
+ String leadingPart = msg.substring(0, 100);
+
+ int count = 10;
+ for (int i = 0; i < count; i++) {
+ this.sendJmsMessage(msg);
+ }
+
+ ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("subscription-type", "ANYCAST");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+ conn.sendFrame(subFrame);
+
+ for (int i = 0; i < count; i++) {
+ ClientStompFrame receiveFrame = conn.receiveFrame(30000);
+ Assert.assertNotNull(receiveFrame);
+ System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 250));
+ Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE"));
+ Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName());
+ int index = receiveFrame.getBody().indexOf(leadingPart);
+ assertEquals(msg.length(), (receiveFrame.getBody().length() - index));
+ }
+
+ // remove suscription
+ ClientStompFrame unsubFrame = conn.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ unsubFrame.addHeader("receipt", "567");
+ ClientStompFrame response = conn.sendFrame(unsubFrame);
+ assertNotNull(response);
+ assertNotNull(response.getCommand().equals("RECEIPT"));
+
+ conn.disconnect();
+ }
+
+ //core sender -> large (compressed regular) -> stomp v12 receiver
+ @Test
+ public void testReceiveLargeCompressedToRegularPersistentMessagesFromCoreV12() throws Exception {
+ LargeMessageTestBase.TestLargeMessageInputStream input = new LargeMessageTestBase.TestLargeMessageInputStream(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, true);
+ LargeMessageTestBase.adjustLargeCompression(true, input, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+ char[] contents = input.toArray();
+ String msg = new String(contents);
+
+ int count = 10;
+ for (int i = 0; i < count; i++) {
+ this.sendJmsMessage(msg);
+ }
+
+ StompClientConnection connV12 = StompClientConnectionFactory.createClientConnection("1.2", "localhost", port);
+ connV12.connect(defUser, defPass);
+
+ ClientStompFrame subFrame = connV12.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("subscription-type", "ANYCAST");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ connV12.sendFrame(subFrame);
+
+ for (int i = 0; i < count; i++) {
+ ClientStompFrame receiveFrame = connV12.receiveFrame(30000);
+
+ Assert.assertNotNull(receiveFrame);
+ System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 20));
+ Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE"));
+ Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName());
+ assertEquals(contents.length, receiveFrame.getBody().length());
+ }
+
+ // remove susbcription
+ ClientStompFrame unsubFrame = connV12.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+ connV12.sendFrame(unsubFrame);
+
+ connV12.disconnect();
+ }
+
+ //core sender -> large (compressed large) -> stomp v12 receiver
+ @Test
+ public void testReceiveLargeCompressedToLargePersistentMessagesFromCoreV12() throws Exception {
+ LargeMessageTestBase.TestLargeMessageInputStream input = new LargeMessageTestBase.TestLargeMessageInputStream(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, true);
+ input.setSize(10 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+ LargeMessageTestBase.adjustLargeCompression(false, input, 10 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+ char[] contents = input.toArray();
+ String msg = new String(contents);
+
+ int count = 10;
+ for (int i = 0; i < count; i++) {
+ this.sendJmsMessage(msg);
+ }
+
+ IntegrationTestLogger.LOGGER.info("Message count for " + getQueueName() + ": " + server.getActiveMQServer().locateQueue(SimpleString.toSimpleString(getQueueName())).getMessageCount());
+
+ StompClientConnection connV12 = StompClientConnectionFactory.createClientConnection("1.2", hostname, port);
+ connV12.connect(defUser, defPass);
+
+ ClientStompFrame subFrame = connV12.createFrame("SUBSCRIBE");
+ subFrame.addHeader("id", "a-sub");
+ subFrame.addHeader("subscription-type", "ANYCAST");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+
+ connV12.sendFrame(subFrame);
+
+ for (int i = 0; i < count; i++) {
+ ClientStompFrame receiveFrame = connV12.receiveFrame(30000);
+
+ Assert.assertNotNull(receiveFrame);
+ System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 20));
+ Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE"));
+ Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName());
+ assertEquals(contents.length, receiveFrame.getBody().length());
+ }
+
+ // remove susbcription
+ ClientStompFrame unsubFrame = connV12.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("id", "a-sub");
+ connV12.sendFrame(unsubFrame);
+
+ connV12.disconnect();
+ }
+
+ //core sender -> large (compressed large) -> stomp v10 receiver
+ @Test
+ public void testReceiveLargeCompressedToLargePersistentMessagesFromCore() throws Exception {
+ LargeMessageTestBase.TestLargeMessageInputStream input = new LargeMessageTestBase.TestLargeMessageInputStream(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, true);
+ input.setSize(10 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+ LargeMessageTestBase.adjustLargeCompression(false, input, 10 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
+
+ char[] contents = input.toArray();
+ String msg = new String(contents);
+
+ String leadingPart = msg.substring(0, 100);
+
+ int count = 10;
+ for (int i = 0; i < count; i++) {
+ this.sendJmsMessage(msg);
+ }
+
+ StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
+ conn.connect(defUser, defPass);
+
+ ClientStompFrame subFrame = conn.createFrame("SUBSCRIBE");
+ subFrame.addHeader("subscription-type", "ANYCAST");
+ subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ subFrame.addHeader("ack", "auto");
+ conn.sendFrame(subFrame);
+
+ for (int i = 0; i < count; i++) {
+ ClientStompFrame frame = conn.receiveFrame(60000);
+ Assert.assertNotNull(frame);
+ System.out.println("part of frame: " + frame.getBody().substring(0, 250));
+ Assert.assertTrue(frame.getCommand().equals("MESSAGE"));
+ Assert.assertTrue(frame.getHeader("destination").equals(getQueuePrefix() + getQueueName()));
+ int index = frame.getBody().toString().indexOf(leadingPart);
+ assertEquals(msg.length(), (frame.getBody().toString().length() - index));
+ }
+
+ ClientStompFrame unsubFrame = conn.createFrame("UNSUBSCRIBE");
+ unsubFrame.addHeader("destination", getQueuePrefix() + getQueueName());
+ unsubFrame.addHeader("receipt", "567");
+ conn.sendFrame(unsubFrame);
+
+ conn.disconnect();
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithMessageID.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithMessageID.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithMessageID.java
new file mode 100644
index 0000000..fc44437
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithMessageID.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.stomp;
+
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.QueueBrowser;
+import javax.jms.TextMessage;
+import java.util.Enumeration;
+
+import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class StompTestWithMessageID extends StompTestBase {
+
+ public boolean isEnableStompMessageId() {
+ return true;
+ }
+
+ @Test
+ public void testEnableMessageID() throws Exception {
+ StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
+ conn.connect(defUser, defPass);
+
+ ClientStompFrame frame = conn.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.setBody("Hello World 1");
+ conn.sendFrame(frame);
+
+ frame = conn.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.setBody("Hello World 2");
+ conn.sendFrame(frame);
+
+ QueueBrowser browser = session.createBrowser(queue);
+
+ Enumeration enu = browser.getEnumeration();
+
+ while (enu.hasMoreElements()) {
+ Message msg = (Message) enu.nextElement();
+ String msgId = msg.getStringProperty("amqMessageId");
+ assertNotNull(msgId);
+ assertTrue(msgId.indexOf("STOMP") == 0);
+ }
+
+ browser.close();
+
+ MessageConsumer consumer = session.createConsumer(queue);
+
+ TextMessage message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+
+ message = (TextMessage) consumer.receive(1000);
+ Assert.assertNotNull(message);
+
+ message = (TextMessage) consumer.receive(2000);
+ Assert.assertNull(message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithSecurity.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithSecurity.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithSecurity.java
index e9d5550..a6ce6c9 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithSecurity.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompTestWithSecurity.java
@@ -19,27 +19,34 @@ package org.apache.activemq.artemis.tests.integration.stomp;
import javax.jms.MessageConsumer;
import javax.jms.TextMessage;
-import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
+import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
import org.junit.Assert;
import org.junit.Test;
public class StompTestWithSecurity extends StompTestBase {
+ @Override
+ public boolean isSecurityEnabled() {
+ return true;
+ }
+
@Test
public void testJMSXUserID() throws Exception {
server.getActiveMQServer().getConfiguration().setPopulateValidatedUser(true);
MessageConsumer consumer = session.createConsumer(queue);
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = receiveFrame(10000);
- Assert.assertTrue(frame.startsWith("CONNECTED"));
+ StompClientConnection conn = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
+ conn.connect(defUser, defPass);
- frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
+ ClientStompFrame frame = conn.createFrame("SEND");
+ frame.addHeader("destination", getQueuePrefix() + getQueueName());
+ frame.setBody("Hello World");
+ conn.sendFrame(frame);
- sendFrame(frame);
+ conn.disconnect();
TextMessage message = (TextMessage) consumer.receive(1000);
Assert.assertNotNull(message);
@@ -54,9 +61,4 @@ public class StompTestWithSecurity extends StompTestBase {
long tmsg = message.getJMSTimestamp();
Assert.assertTrue(Math.abs(tnow - tmsg) < 1000);
}
-
- @Override
- public boolean isSecurityEnabled() {
- return true;
- }
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractClientStompFrame.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractClientStompFrame.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractClientStompFrame.java
index 06771bb..c48fd8d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractClientStompFrame.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractClientStompFrame.java
@@ -24,9 +24,9 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
-public abstract class AbstractClientStompFrame implements ClientStompFrame {
+import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
- protected static final String HEADER_RECEIPT = "receipt";
+public abstract class AbstractClientStompFrame implements ClientStompFrame {
protected static final Set<String> validCommands = new HashSet<>();
protected String command;
@@ -36,19 +36,19 @@ public abstract class AbstractClientStompFrame implements ClientStompFrame {
protected String EOL = "\n";
static {
- validCommands.add("CONNECT");
- validCommands.add("CONNECTED");
- validCommands.add("SEND");
- validCommands.add("RECEIPT");
- validCommands.add("SUBSCRIBE");
- validCommands.add("UNSUBSCRIBE");
- validCommands.add("MESSAGE");
- validCommands.add("BEGIN");
- validCommands.add("COMMIT");
- validCommands.add("ABORT");
- validCommands.add("ACK");
- validCommands.add("DISCONNECT");
- validCommands.add("ERROR");
+ validCommands.add(Stomp.Commands.CONNECT);
+ validCommands.add(Stomp.Responses.CONNECTED);
+ validCommands.add(Stomp.Commands.SEND);
+ validCommands.add(Stomp.Responses.RECEIPT);
+ validCommands.add(Stomp.Commands.SUBSCRIBE);
+ validCommands.add(Stomp.Commands.UNSUBSCRIBE);
+ validCommands.add(Stomp.Responses.MESSAGE);
+ validCommands.add(Stomp.Commands.BEGIN);
+ validCommands.add(Stomp.Commands.COMMIT);
+ validCommands.add(Stomp.Commands.ABORT);
+ validCommands.add(Stomp.Commands.ACK);
+ validCommands.add(Stomp.Commands.DISCONNECT);
+ validCommands.add(Stomp.Responses.ERROR);
}
public AbstractClientStompFrame(String command) {
@@ -80,37 +80,15 @@ public abstract class AbstractClientStompFrame implements ClientStompFrame {
@Override
public ByteBuffer toByteBuffer() {
- if (isPing()) {
- ByteBuffer buffer = ByteBuffer.allocateDirect(1);
- buffer.put((byte) 0x0A);
- buffer.rewind();
- return buffer;
- }
- StringBuffer sb = new StringBuffer();
- sb.append(command + EOL);
- int n = headers.size();
- for (int i = 0; i < n; i++) {
- sb.append(headers.get(i).key + ":" + headers.get(i).val + EOL);
- }
- sb.append(EOL);
- if (body != null) {
- sb.append(body);
- }
- sb.append((char) 0);
-
- String data = sb.toString();
-
- byte[] byteValue = data.getBytes(StandardCharsets.UTF_8);
-
- ByteBuffer buffer = ByteBuffer.allocateDirect(byteValue.length);
- buffer.put(byteValue);
-
- buffer.rewind();
- return buffer;
+ return toByteBufferInternal(null);
}
@Override
public ByteBuffer toByteBufferWithExtra(String str) {
+ return toByteBufferInternal(str);
+ }
+
+ public ByteBuffer toByteBufferInternal(String str) {
StringBuffer sb = new StringBuffer();
sb.append(command + EOL);
int n = headers.size();
@@ -122,7 +100,9 @@ public abstract class AbstractClientStompFrame implements ClientStompFrame {
sb.append(body);
}
sb.append((char) 0);
- sb.append(str);
+ if (str != null) {
+ sb.append(str);
+ }
String data = sb.toString();
@@ -137,26 +117,29 @@ public abstract class AbstractClientStompFrame implements ClientStompFrame {
@Override
public boolean needsReply() {
- if ("CONNECT".equals(command) || headerKeys.contains(HEADER_RECEIPT)) {
+ if (Stomp.Commands.CONNECT.equals(command) || headerKeys.contains(Stomp.Headers.RECEIPT_REQUESTED)) {
return true;
}
return false;
}
@Override
- public void setCommand(String command) {
+ public ClientStompFrame setCommand(String command) {
this.command = command;
+ return this;
}
@Override
- public void addHeader(String key, String val) {
+ public ClientStompFrame addHeader(String key, String val) {
headers.add(new Header(key, val));
headerKeys.add(key);
+ return this;
}
@Override
- public void setBody(String body) {
+ public ClientStompFrame setBody(String body) {
this.body = body;
+ return this;
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java
index ce94ec3..d8a487e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java
@@ -27,29 +27,12 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
public abstract class AbstractStompClientConnection implements StompClientConnection {
- public static final String STOMP_COMMAND = "STOMP";
-
- public static final String ACCEPT_HEADER = "accept-version";
- public static final String HOST_HEADER = "host";
- public static final String VERSION_HEADER = "version";
- public static final String RECEIPT_HEADER = "receipt";
-
- protected static final String CONNECT_COMMAND = "CONNECT";
- protected static final String CONNECTED_COMMAND = "CONNECTED";
- protected static final String DISCONNECT_COMMAND = "DISCONNECT";
-
- protected static final String LOGIN_HEADER = "login";
- protected static final String PASSCODE_HEADER = "passcode";
-
- //ext
- protected static final String CLIENT_ID_HEADER = "client-id";
-
protected Pinger pinger;
-
protected String version;
protected String host;
protected int port;
@@ -58,13 +41,10 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
protected StompFrameFactory factory;
protected final SocketChannel socketChannel;
protected ByteBuffer readBuffer;
-
protected List<Byte> receiveList;
-
protected BlockingQueue<ClientStompFrame> frameQueue = new LinkedBlockingQueue<>();
-
protected boolean connected = false;
- private int serverPingCounter;
+ protected int serverPingCounter;
public AbstractStompClientConnection(String version, String host, int port) throws IOException {
this.version = version;
@@ -90,11 +70,15 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
new ReaderThread().start();
}
- @Override
- public ClientStompFrame sendFrame(ClientStompFrame frame) throws IOException, InterruptedException {
+ private ClientStompFrame sendFrameInternal(ClientStompFrame frame, boolean wicked) throws IOException, InterruptedException {
ClientStompFrame response = null;
- IntegrationTestLogger.LOGGER.info("Sending frame:\n" + frame);
- ByteBuffer buffer = frame.toByteBuffer();
+ IntegrationTestLogger.LOGGER.info("Sending " + (wicked ? "*wicked* " : "") + "frame:\n" + frame);
+ ByteBuffer buffer;
+ if (wicked) {
+ buffer = frame.toByteBufferWithExtra("\n");
+ } else {
+ buffer = frame.toByteBuffer();
+ }
while (buffer.remaining() > 0) {
socketChannel.write(buffer);
}
@@ -105,7 +89,7 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
//filter out server ping
while (response != null) {
- if (response.getCommand().equals("STOMP")) {
+ if (response.getCommand().equals(Stomp.Commands.STOMP)) {
response = receiveFrame();
} else {
break;
@@ -113,32 +97,19 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
}
}
+ IntegrationTestLogger.LOGGER.info("Received:\n" + response);
+
return response;
}
@Override
- public ClientStompFrame sendWickedFrame(ClientStompFrame frame) throws IOException, InterruptedException {
- ClientStompFrame response = null;
- ByteBuffer buffer = frame.toByteBufferWithExtra("\n");
-
- while (buffer.remaining() > 0) {
- socketChannel.write(buffer);
- }
-
- //now response
- if (frame.needsReply()) {
- response = receiveFrame();
+ public ClientStompFrame sendFrame(ClientStompFrame frame) throws IOException, InterruptedException {
+ return sendFrameInternal(frame, false);
+ }
- //filter out server ping
- while (response != null) {
- if (response.getCommand().equals("STOMP")) {
- response = receiveFrame();
- } else {
- break;
- }
- }
- }
- return response;
+ @Override
+ public ClientStompFrame sendWickedFrame(ClientStompFrame frame) throws IOException, InterruptedException {
+ return sendFrameInternal(frame, true);
}
@Override
@@ -186,17 +157,12 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
readBuffer.rewind();
}
- @Override
- public int getServerPingNumber() {
- return serverPingCounter;
- }
-
protected void incrementServerPing() {
serverPingCounter++;
}
private boolean validateFrame(ClientStompFrame f) {
- String h = f.getHeader("content-length");
+ String h = f.getHeader(Stomp.Headers.CONTENT_LENGTH);
if (h != null) {
int len = Integer.valueOf(h);
if (f.getBody().getBytes(StandardCharsets.UTF_8).length < len) {
@@ -271,34 +237,15 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
return this.frameQueue.size();
}
- @Override
- public void startPinger(long interval) {
- pinger = new Pinger(interval);
- pinger.startPing();
- }
-
- @Override
- public void stopPinger() {
- if (pinger != null) {
- pinger.stopPing();
- try {
- pinger.join();
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- pinger = null;
- }
- }
-
- private class Pinger extends Thread {
+ protected class Pinger extends Thread {
long pingInterval;
ClientStompFrame pingFrame;
volatile boolean stop = false;
- private Pinger(long interval) {
+ Pinger(long interval) {
this.pingInterval = interval;
- pingFrame = createFrame("STOMP");
+ pingFrame = createFrame(Stomp.Commands.STOMP);
pingFrame.setBody("\n");
pingFrame.setForceOneway();
pingFrame.setPing(true);
@@ -329,5 +276,4 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
}
}
}
-
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrame.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrame.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrame.java
index 53bced4..93801f9 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrame.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrame.java
@@ -27,11 +27,11 @@ public interface ClientStompFrame {
boolean needsReply();
- void setCommand(String command);
+ ClientStompFrame setCommand(String command);
- void addHeader(String string, String string2);
+ ClientStompFrame addHeader(String string, String string2);
- void setBody(String string);
+ ClientStompFrame setBody(String string);
String getCommand();
@@ -43,8 +43,8 @@ public interface ClientStompFrame {
boolean isPing();
- void setForceOneway();
+ ClientStompFrame setForceOneway();
- void setPing(boolean b);
+ ClientStompFrame setPing(boolean b);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV10.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV10.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV10.java
index 5273236..92629ab 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV10.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV10.java
@@ -30,18 +30,18 @@ public class ClientStompFrameV10 extends AbstractClientStompFrame {
}
@Override
- public boolean isPing() {
- return false;
+ public ClientStompFrame setForceOneway() {
+ throw new IllegalStateException("Doesn't apply with V1.0!");
}
@Override
- public void setForceOneway() {
+ public ClientStompFrame setPing(boolean b) {
throw new IllegalStateException("Doesn't apply with V1.0!");
}
@Override
- public void setPing(boolean b) {
- throw new IllegalStateException("Doesn't apply with V1.0!");
+ public boolean isPing() {
+ return false;
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV11.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV11.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV11.java
index 22d7146..4d8d1e0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV11.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV11.java
@@ -16,14 +16,13 @@
*/
package org.apache.activemq.artemis.tests.integration.stomp.util;
-/**
- * pls use factory to create frames.
- */
-public class ClientStompFrameV11 extends AbstractClientStompFrame {
+import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+
+public class ClientStompFrameV11 extends ClientStompFrameV10 {
static {
- validCommands.add("NACK");
- validCommands.add("STOMP");
+ validCommands.add(Stomp.Commands.NACK);
+ validCommands.add(Stomp.Commands.STOMP);
}
boolean forceOneway = false;
@@ -38,8 +37,9 @@ public class ClientStompFrameV11 extends AbstractClientStompFrame {
}
@Override
- public void setForceOneway() {
+ public ClientStompFrame setForceOneway() {
forceOneway = true;
+ return this;
}
@Override
@@ -47,15 +47,17 @@ public class ClientStompFrameV11 extends AbstractClientStompFrame {
if (forceOneway)
return false;
- if ("CONNECT".equals(command) || "STOMP".equals(command) || headerKeys.contains(HEADER_RECEIPT)) {
+ if (Stomp.Commands.STOMP.equals(command)) {
return true;
}
- return false;
+
+ return super.needsReply();
}
@Override
- public void setPing(boolean b) {
+ public ClientStompFrame setPing(boolean b) {
isPing = b;
+ return this;
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV12.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV12.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV12.java
index 5ca530e..eaffd1c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV12.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrameV12.java
@@ -16,17 +16,7 @@
*/
package org.apache.activemq.artemis.tests.integration.stomp.util;
-/**
- */
-public class ClientStompFrameV12 extends AbstractClientStompFrame {
-
- static {
- validCommands.add("NACK");
- validCommands.add("STOMP");
- }
-
- boolean forceOneway = false;
- boolean isPing = false;
+public class ClientStompFrameV12 extends ClientStompFrameV11 {
public ClientStompFrameV12(String command) {
this(command, true, true);
@@ -45,32 +35,6 @@ public class ClientStompFrameV12 extends AbstractClientStompFrame {
}
@Override
- public void setForceOneway() {
- forceOneway = true;
- }
-
- @Override
- public boolean needsReply() {
- if (forceOneway)
- return false;
-
- if ("CONNECT".equals(command) || "STOMP".equals(command) || headerKeys.contains(HEADER_RECEIPT)) {
- return true;
- }
- return false;
- }
-
- @Override
- public void setPing(boolean b) {
- isPing = b;
- }
-
- @Override
- public boolean isPing() {
- return isPing;
- }
-
- @Override
public String toString() {
return "[1.2]" + super.toString();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnection.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnection.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnection.java
index 12f52d0..7be09a5 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnection.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnection.java
@@ -18,9 +18,6 @@ package org.apache.activemq.artemis.tests.integration.stomp.util;
import java.io.IOException;
-/**
- * pls use factory to create frames.
- */
public interface StompClientConnection {
ClientStompFrame sendFrame(ClientStompFrame frame) throws IOException, InterruptedException;
@@ -35,7 +32,7 @@ public interface StompClientConnection {
ClientStompFrame connect(String defUser, String defPass) throws Exception;
- void connect(String defUser, String defPass, String clientId) throws Exception;
+ ClientStompFrame connect(String defUser, String defPass, String clientId) throws Exception;
boolean isConnected();