You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/11/11 19:23:38 UTC
[08/11] activemq-artemis git commit: Stomp refactor + track
autocreation for addresses
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
index 159a285..8822015 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/HangConsumerTest.java
@@ -513,7 +513,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
}
/* (non-Javadoc)
- * @see SessionCallback#sendMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, int)
+ * @see SessionCallback#sendJmsMessage(org.apache.activemq.artemis.core.server.ServerMessage, long, int)
*/
@Override
public int sendMessage(MessageReference ref, ServerMessage message, ServerConsumer consumer, int deliveryCount) {
@@ -592,7 +592,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
SessionCallback callback,
OperationContext context,
boolean autoCreateQueue) throws Exception {
- return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, getConfiguration().isPersistDeliveryCountBeforeDelivery(), xa, connection, getStorageManager(), getPostOffice(), getResourceManager(), getSecurityStore(), getManagementService(), this, getConfiguration().getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), new MyCallback(callback), context, null, getPagingManager());
+ return new ServerSessionImpl(name, username, password, validatedUser, minLargeMessageSize, autoCommitSends, autoCommitAcks, preAcknowledge, getConfiguration().isPersistDeliveryCountBeforeDelivery(), xa, connection, getStorageManager(), getPostOffice(), getResourceManager(), getSecurityStore(), getManagementService(), this, getConfiguration().getManagementAddress(), defaultAddress == null ? null : new SimpleString(defaultAddress), new MyCallback(callback), context, getPagingManager());
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/ConcurrentStompTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/ConcurrentStompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/ConcurrentStompTest.java
deleted file mode 100644
index 6917cfb..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/ConcurrentStompTest.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.integration.stomp;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class ConcurrentStompTest extends StompTestBase {
-
- private Socket stompSocket_2;
-
- private ByteArrayOutputStream inputBuffer_2;
-
- /**
- * Send messages on 1 socket and receives them concurrently on another socket.
- */
- @Test
- public void testSendManyMessages() throws Exception {
- try {
- String connect = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-
- sendFrame(connect);
- String connected = receiveFrame(10000);
- Assert.assertTrue(connected.startsWith("CONNECTED"));
-
- stompSocket_2 = createSocket();
- inputBuffer_2 = new ByteArrayOutputStream();
-
- sendFrame(stompSocket_2, connect);
- connected = receiveFrame(stompSocket_2, inputBuffer_2, 10000);
- Assert.assertTrue(connected.startsWith("CONNECTED"));
-
- final int count = 1000;
- final CountDownLatch latch = new CountDownLatch(count);
-
- String subscribe = "SUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() + "\n" +
- "ack:auto\n\n" +
- Stomp.NULL;
- sendFrame(stompSocket_2, subscribe);
- Thread.sleep(2000);
-
- new Thread() {
- @Override
- public void run() {
- int i = 0;
- while (true) {
- try {
- String frame = receiveFrame(stompSocket_2, inputBuffer_2, 10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue(frame.indexOf("destination:") > 0);
- System.out.println("<<< " + i++);
- latch.countDown();
- } catch (Exception e) {
- break;
- }
- }
- }
- }.start();
-
- String send = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n";
- for (int i = 1; i <= count; i++) {
- // Thread.sleep(1);
- System.out.println(">>> " + i);
- sendFrame(send + "count:" + i + "\n\n" + Stomp.NULL);
- }
-
- assertTrue(latch.await(60, TimeUnit.SECONDS));
-
- } finally {
- stompSocket_2.close();
- inputBuffer_2.close();
- }
-
- }
-
- // Implementation methods
- // -------------------------------------------------------------------------
- public void sendFrame(Socket socket, String data) throws Exception {
- byte[] bytes = data.getBytes(StandardCharsets.UTF_8);
- OutputStream outputStream = socket.getOutputStream();
- for (byte b : bytes) {
- outputStream.write(b);
- }
- outputStream.flush();
- }
-
- public String receiveFrame(Socket socket, ByteArrayOutputStream input, long timeOut) throws Exception {
- socket.setSoTimeout((int) timeOut);
- InputStream is = socket.getInputStream();
- int c = 0;
- for (;;) {
- c = is.read();
- if (c < 0) {
- throw new IOException("socket closed.");
- } else if (c == 0) {
- c = is.read();
- if (c != '\n') {
- byte[] ba = input.toByteArray();
- System.out.println(new String(ba, StandardCharsets.UTF_8));
- }
- Assert.assertEquals("Expecting stomp frame to terminate with \0\n", c, '\n');
- byte[] ba = input.toByteArray();
- input.reset();
- return new String(ba, StandardCharsets.UTF_8);
- } else {
- input.write(c);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/ExtraStompTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/ExtraStompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/ExtraStompTest.java
deleted file mode 100644
index a0dcdbf..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/ExtraStompTest.java
+++ /dev/null
@@ -1,848 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.integration.stomp;
-
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.QueueBrowser;
-import javax.jms.TextMessage;
-import java.util.ArrayList;
-import java.util.Enumeration;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.activemq.artemis.api.core.Interceptor;
-import org.apache.activemq.artemis.api.core.TransportConfiguration;
-import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
-import org.apache.activemq.artemis.core.config.Configuration;
-import org.apache.activemq.artemis.core.protocol.core.Packet;
-import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
-import org.apache.activemq.artemis.core.protocol.stomp.StompFrame;
-import org.apache.activemq.artemis.core.protocol.stomp.StompFrameInterceptor;
-import org.apache.activemq.artemis.core.protocol.stomp.StompProtocolManagerFactory;
-import org.apache.activemq.artemis.core.registry.JndiBindingRegistry;
-import org.apache.activemq.artemis.core.remoting.impl.invm.InVMAcceptorFactory;
-import org.apache.activemq.artemis.core.remoting.impl.netty.NettyAcceptorFactory;
-import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.ActiveMQServers;
-import org.apache.activemq.artemis.jms.server.JMSServerManager;
-import org.apache.activemq.artemis.jms.server.config.JMSConfiguration;
-import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
-import org.apache.activemq.artemis.jms.server.config.impl.JMSQueueConfigurationImpl;
-import org.apache.activemq.artemis.jms.server.config.impl.TopicConfigurationImpl;
-import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.tests.integration.largemessage.LargeMessageTestBase;
-import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
-import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
-import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
-import org.apache.activemq.artemis.tests.unit.util.InVMNamingContext;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-public class ExtraStompTest extends StompTestBase {
-
- @Override
- @Before
- public void setUp() throws Exception {
- autoCreateServer = false;
- super.setUp();
- }
-
- @Test
- public void testConnectionTTL() throws Exception {
- try {
- server = createServerWithTTL("2000");
- server.start();
-
- setUpAfterServer();
-
- String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "request-id: 1\n" + "\n" + Stomp.NULL;
- sendFrame(connect_frame);
-
- String f = receiveFrame(10000);
- Assert.assertTrue(f.startsWith("CONNECTED"));
- Assert.assertTrue(f.indexOf("response-id:1") >= 0);
-
- String frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World 1" + Stomp.NULL;
- sendFrame(frame);
-
- assertChannelClosed();
-
- MessageConsumer consumer = session.createConsumer(queue);
-
- TextMessage message = (TextMessage) consumer.receiveNoWait();
- Assert.assertNotNull(message);
-
- message = (TextMessage) consumer.receiveNoWait();
- Assert.assertNull(message);
- } finally {
- cleanUp();
- server.stop();
- }
- }
-
- @Test
- public void testEnableMessageID() throws Exception {
- enableMessageIDTest(true);
- }
-
- @Test
- public void testDisableMessageID() throws Exception {
- enableMessageIDTest(false);
- }
-
- @Test
- public void testDefaultEnableMessageID() throws Exception {
- enableMessageIDTest(null);
- }
-
- //stomp sender -> large -> stomp receiver
- @Test
- public void testSendReceiveLargePersistentMessages() throws Exception {
- try {
- server = createPersistentServerWithStompMinLargeSize(2048);
- server.start();
-
- setUpAfterServer();
-
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
- frame = receiveFrame(10000);
-
- Assert.assertTrue(frame.startsWith("CONNECTED"));
- int count = 10;
- int szBody = 1024 * 1024;
- char[] contents = new char[szBody];
- for (int i = 0; i < szBody; i++) {
- contents[i] = 'A';
- }
- String body = new String(contents);
-
- frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "persistent:true\n" + "\n\n" + body + Stomp.NULL;
-
- for (int i = 0; i < count; i++) {
- sendFrame(frame);
- }
-
- frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\nfff" + Stomp.NULL;
- sendFrame(frame);
-
- for (int i = 0; i < count; i++) {
- frame = receiveFrame(60000);
- Assert.assertNotNull(frame);
- System.out.println("part of frame: " + frame.substring(0, 200));
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue(frame.indexOf("destination:") > 0);
- int index = frame.indexOf("AAAA");
- assertEquals(szBody, (frame.length() - index));
- }
-
- // remove suscription
- frame = "UNSUBSCRIBE\n" + "destination:" +
- getQueuePrefix() +
- getQueueName() +
- "\n" +
- "receipt:567\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- waitForReceipt();
-
- frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
- } catch (Exception ex) {
- ex.printStackTrace();
- throw ex;
- } finally {
- cleanUp();
- server.stop();
- }
- }
-
- //core sender -> large -> stomp receiver
- @Test
- public void testReceiveLargePersistentMessagesFromCore() throws Exception {
- try {
- server = createPersistentServerWithStompMinLargeSize(2048);
- server.start();
-
- setUpAfterServer();
-
- int msgSize = 3 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
- char[] contents = new char[msgSize];
- for (int i = 0; i < msgSize; i++) {
- contents[i] = 'B';
- }
- String msg = new String(contents);
-
- int count = 10;
- for (int i = 0; i < count; i++) {
- this.sendMessage(msg);
- }
-
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
- frame = receiveFrame(10000);
-
- Assert.assertTrue(frame.startsWith("CONNECTED"));
-
- frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\nfff" + Stomp.NULL;
- sendFrame(frame);
-
- for (int i = 0; i < count; i++) {
- frame = receiveFrame(60000);
- Assert.assertNotNull(frame);
- System.out.println("part of frame: " + frame.substring(0, 250));
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue(frame.indexOf("destination:") > 0);
- int index = frame.indexOf("BBBB");
- assertEquals(msgSize, (frame.length() - index));
- }
-
- // remove suscription
- frame = "UNSUBSCRIBE\n" + "destination:" +
- getQueuePrefix() +
- getQueueName() +
- "\n" +
- "receipt:567\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- waitForReceipt();
-
- frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
- } catch (Exception ex) {
- ex.printStackTrace();
- throw ex;
- } finally {
- cleanUp();
- server.stop();
- }
- }
-
- //stomp v12 sender -> large -> stomp v12 receiver
- @Test
- public void testSendReceiveLargePersistentMessagesV12() throws Exception {
- try {
- server = createPersistentServerWithStompMinLargeSize(2048);
- server.start();
-
- setUpAfterServer();
-
- StompClientConnection connV12 = StompClientConnectionFactory.createClientConnection("1.2", "localhost", port);
- connV12.connect(defUser, defPass);
-
- int count = 10;
- int szBody = 1024 * 1024;
- char[] contents = new char[szBody];
- for (int i = 0; i < szBody; i++) {
- contents[i] = 'A';
- }
- String body = new String(contents);
-
- ClientStompFrame frame = connV12.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("persistent", "true");
- frame.setBody(body);
-
- for (int i = 0; i < count; i++) {
- connV12.sendFrame(frame);
- }
-
- ClientStompFrame subFrame = connV12.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- connV12.sendFrame(subFrame);
-
- for (int i = 0; i < count; i++) {
- ClientStompFrame receiveFrame = connV12.receiveFrame(30000);
-
- Assert.assertNotNull(receiveFrame);
- System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 20));
- Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE"));
- Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName());
- assertEquals(szBody, receiveFrame.getBody().length());
- }
-
- // remove susbcription
- ClientStompFrame unsubFrame = connV12.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
- connV12.sendFrame(unsubFrame);
-
- connV12.disconnect();
- } catch (Exception ex) {
- ex.printStackTrace();
- throw ex;
- } finally {
- cleanUp();
- server.stop();
- }
- }
-
- //core sender -> large -> stomp v12 receiver
- @Test
- public void testReceiveLargePersistentMessagesFromCoreV12() throws Exception {
- try {
- server = createPersistentServerWithStompMinLargeSize(2048);
- server.start();
-
- setUpAfterServer();
-
- int msgSize = 3 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE;
- char[] contents = new char[msgSize];
- for (int i = 0; i < msgSize; i++) {
- contents[i] = 'B';
- }
- String msg = new String(contents);
-
- int count = 10;
- for (int i = 0; i < count; i++) {
- this.sendMessage(msg);
- }
-
- StompClientConnection connV12 = StompClientConnectionFactory.createClientConnection("1.2", "localhost", port);
- connV12.connect(defUser, defPass);
-
- ClientStompFrame subFrame = connV12.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- connV12.sendFrame(subFrame);
-
- for (int i = 0; i < count; i++) {
- ClientStompFrame receiveFrame = connV12.receiveFrame(30000);
-
- Assert.assertNotNull(receiveFrame);
- System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 20));
- Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE"));
- Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName());
- assertEquals(msgSize, receiveFrame.getBody().length());
- }
-
- // remove susbcription
- ClientStompFrame unsubFrame = connV12.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
- connV12.sendFrame(unsubFrame);
-
- connV12.disconnect();
- } catch (Exception ex) {
- ex.printStackTrace();
- throw ex;
- } finally {
- cleanUp();
- server.stop();
- }
- }
-
- //core sender -> large (compressed regular) -> stomp v10 receiver
- @Test
- public void testReceiveLargeCompressedToRegularPersistentMessagesFromCore() throws Exception {
- try {
- server = createPersistentServerWithStompMinLargeSize(2048);
- server.start();
-
- setUpAfterServer(true);
-
- LargeMessageTestBase.TestLargeMessageInputStream input = new LargeMessageTestBase.TestLargeMessageInputStream(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, true);
- LargeMessageTestBase.adjustLargeCompression(true, input, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
-
- char[] contents = input.toArray();
- String msg = new String(contents);
-
- String leadingPart = msg.substring(0, 100);
-
- int count = 10;
- for (int i = 0; i < count; i++) {
- this.sendMessage(msg);
- }
-
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
- frame = receiveFrame(10000);
-
- Assert.assertTrue(frame.startsWith("CONNECTED"));
-
- frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\nfff" + Stomp.NULL;
- sendFrame(frame);
-
- for (int i = 0; i < count; i++) {
- frame = receiveFrame(60000);
- Assert.assertNotNull(frame);
- System.out.println("part of frame: " + frame.substring(0, 250));
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue(frame.indexOf("destination:") > 0);
- int index = frame.indexOf(leadingPart);
- assertEquals(msg.length(), (frame.length() - index));
- }
-
- // remove suscription
- frame = "UNSUBSCRIBE\n" + "destination:" +
- getQueuePrefix() +
- getQueueName() +
- "\n" +
- "receipt:567\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- waitForReceipt();
-
- frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
- } catch (Exception ex) {
- ex.printStackTrace();
- throw ex;
- } finally {
- cleanUp();
- server.stop();
- }
- }
-
- //core sender -> large (compressed regular) -> stomp v12 receiver
- @Test
- public void testReceiveLargeCompressedToRegularPersistentMessagesFromCoreV12() throws Exception {
- try {
- server = createPersistentServerWithStompMinLargeSize(2048);
- server.start();
-
- setUpAfterServer(true);
-
- LargeMessageTestBase.TestLargeMessageInputStream input = new LargeMessageTestBase.TestLargeMessageInputStream(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, true);
- LargeMessageTestBase.adjustLargeCompression(true, input, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
-
- char[] contents = input.toArray();
- String msg = new String(contents);
-
- int count = 10;
- for (int i = 0; i < count; i++) {
- this.sendMessage(msg);
- }
-
- StompClientConnection connV12 = StompClientConnectionFactory.createClientConnection("1.2", "localhost", port);
- connV12.connect(defUser, defPass);
-
- ClientStompFrame subFrame = connV12.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- connV12.sendFrame(subFrame);
-
- for (int i = 0; i < count; i++) {
- ClientStompFrame receiveFrame = connV12.receiveFrame(30000);
-
- Assert.assertNotNull(receiveFrame);
- System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 20));
- Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE"));
- Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName());
- assertEquals(contents.length, receiveFrame.getBody().length());
- }
-
- // remove susbcription
- ClientStompFrame unsubFrame = connV12.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
- connV12.sendFrame(unsubFrame);
-
- connV12.disconnect();
- } catch (Exception ex) {
- ex.printStackTrace();
- throw ex;
- } finally {
- cleanUp();
- server.stop();
- }
- }
-
- //core sender -> large (compressed large) -> stomp v12 receiver
- @Test
- public void testReceiveLargeCompressedToLargePersistentMessagesFromCoreV12() throws Exception {
- try {
- server = createPersistentServerWithStompMinLargeSize(2048);
- server.start();
-
- setUpAfterServer(true);
-
- LargeMessageTestBase.TestLargeMessageInputStream input = new LargeMessageTestBase.TestLargeMessageInputStream(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, true);
- input.setSize(10 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
- LargeMessageTestBase.adjustLargeCompression(false, input, 10 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
-
- char[] contents = input.toArray();
- String msg = new String(contents);
-
- int count = 10;
- for (int i = 0; i < count; i++) {
- this.sendMessage(msg);
- }
-
- StompClientConnection connV12 = StompClientConnectionFactory.createClientConnection("1.2", "localhost", port);
- connV12.connect(defUser, defPass);
-
- ClientStompFrame subFrame = connV12.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- connV12.sendFrame(subFrame);
-
- for (int i = 0; i < count; i++) {
- ClientStompFrame receiveFrame = connV12.receiveFrame(30000);
-
- Assert.assertNotNull(receiveFrame);
- System.out.println("part of frame: " + receiveFrame.getBody().substring(0, 20));
- Assert.assertTrue(receiveFrame.getCommand().equals("MESSAGE"));
- Assert.assertEquals(receiveFrame.getHeader("destination"), getQueuePrefix() + getQueueName());
- assertEquals(contents.length, receiveFrame.getBody().length());
- }
-
- // remove susbcription
- ClientStompFrame unsubFrame = connV12.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
- connV12.sendFrame(unsubFrame);
-
- connV12.disconnect();
- } catch (Exception ex) {
- ex.printStackTrace();
- throw ex;
- } finally {
- cleanUp();
- server.stop();
- }
- }
-
- //core sender -> large (compressed large) -> stomp v10 receiver
- @Test
- public void testReceiveLargeCompressedToLargePersistentMessagesFromCore() throws Exception {
- try {
- server = createPersistentServerWithStompMinLargeSize(2048);
- server.start();
-
- setUpAfterServer(true);
-
- LargeMessageTestBase.TestLargeMessageInputStream input = new LargeMessageTestBase.TestLargeMessageInputStream(ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, true);
- input.setSize(10 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
- LargeMessageTestBase.adjustLargeCompression(false, input, 10 * ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE);
-
- char[] contents = input.toArray();
- String msg = new String(contents);
-
- String leadingPart = msg.substring(0, 100);
-
- int count = 10;
- for (int i = 0; i < count; i++) {
- this.sendMessage(msg);
- }
-
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
- frame = receiveFrame(10000);
-
- Assert.assertTrue(frame.startsWith("CONNECTED"));
-
- frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\nfff" + Stomp.NULL;
- sendFrame(frame);
-
- for (int i = 0; i < count; i++) {
- frame = receiveFrame(60000);
- Assert.assertNotNull(frame);
- System.out.println("part of frame: " + frame.substring(0, 250));
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue(frame.indexOf("destination:") > 0);
- int index = frame.indexOf(leadingPart);
- assertEquals(msg.length(), (frame.length() - index));
- }
-
- // remove suscription
- frame = "UNSUBSCRIBE\n" + "destination:" +
- getQueuePrefix() +
- getQueueName() +
- "\n" +
- "receipt:567\n" +
- "\n\n" +
- Stomp.NULL;
- sendFrame(frame);
- waitForReceipt();
-
- frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
- sendFrame(frame);
- } catch (Exception ex) {
- ex.printStackTrace();
- throw ex;
- } finally {
- cleanUp();
- server.stop();
- }
- }
-
- protected JMSServerManager createPersistentServerWithStompMinLargeSize(int sz) throws Exception {
- Map<String, Object> params = new HashMap<>();
- params.put(TransportConstants.PROTOCOLS_PROP_NAME, StompProtocolManagerFactory.STOMP_PROTOCOL_NAME);
- params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
- params.put(TransportConstants.STOMP_CONSUMERS_CREDIT, "-1");
- params.put(TransportConstants.STOMP_MIN_LARGE_MESSAGE_SIZE, sz);
- TransportConfiguration stompTransport = new TransportConfiguration(NettyAcceptorFactory.class.getName(), params);
-
- Configuration config = createBasicConfig().setPersistenceEnabled(true).addAcceptorConfiguration(stompTransport).addAcceptorConfiguration(new TransportConfiguration(InVMAcceptorFactory.class.getName()));
-
- ActiveMQServer activeMQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass));
-
- JMSConfiguration jmsConfig = new JMSConfigurationImpl();
- jmsConfig.getQueueConfigurations().add(new JMSQueueConfigurationImpl().setName(getQueueName()).setBindings(getQueueName()));
- jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl().setName(getTopicName()).setBindings(getTopicName()));
- server = new JMSServerManagerImpl(activeMQServer, jmsConfig);
- server.setRegistry(new JndiBindingRegistry((new InVMNamingContext())));
- return server;
- }
-
- private void enableMessageIDTest(Boolean enable) throws Exception {
- try {
- server = createServerWithExtraStompOptions(null, enable);
- server.start();
-
- setUpAfterServer();
-
- String connect_frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n" + "request-id: 1\n" + "\n" + Stomp.NULL;
- sendFrame(connect_frame);
-
- String f = receiveFrame(10000);
- Assert.assertTrue(f.startsWith("CONNECTED"));
- Assert.assertTrue(f.indexOf("response-id:1") >= 0);
-
- String frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World 1" + Stomp.NULL;
- sendFrame(frame);
-
- frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World 2" + Stomp.NULL;
-
- sendFrame(frame);
-
- QueueBrowser browser = session.createBrowser(queue);
-
- Enumeration enu = browser.getEnumeration();
-
- while (enu.hasMoreElements()) {
- Message msg = (Message) enu.nextElement();
- String msgId = msg.getStringProperty("amqMessageId");
- if (enable != null && enable.booleanValue()) {
- assertNotNull(msgId);
- assertTrue(msgId.indexOf("STOMP") == 0);
- } else {
- assertNull(msgId);
- }
- }
-
- browser.close();
-
- MessageConsumer consumer = session.createConsumer(queue);
-
- TextMessage message = (TextMessage) consumer.receive(1000);
- Assert.assertNotNull(message);
-
- message = (TextMessage) consumer.receive(1000);
- Assert.assertNotNull(message);
-
- message = (TextMessage) consumer.receive(2000);
- Assert.assertNull(message);
- } finally {
- cleanUp();
- server.stop();
- }
- }
-
- protected JMSServerManager createServerWithTTL(String ttl) throws Exception {
- return createServerWithExtraStompOptions(ttl, null);
- }
-
- protected JMSServerManager createServerWithExtraStompOptions(String ttl, Boolean enableMessageID) throws Exception {
-
- Map<String, Object> params = new HashMap<>();
- params.put(TransportConstants.PROTOCOLS_PROP_NAME, StompProtocolManagerFactory.STOMP_PROTOCOL_NAME);
- params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
- if (ttl != null) {
- params.put(TransportConstants.CONNECTION_TTL, ttl);
- }
- if (enableMessageID != null) {
- params.put(TransportConstants.STOMP_ENABLE_MESSAGE_ID, enableMessageID);
- }
- params.put(TransportConstants.STOMP_CONSUMERS_CREDIT, "-1");
- TransportConfiguration stompTransport = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
-
- Configuration config = createBasicConfig().setPersistenceEnabled(false).addAcceptorConfiguration(stompTransport).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY));
-
- ActiveMQServer activeMQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass));
-
- JMSConfiguration jmsConfig = new JMSConfigurationImpl();
- jmsConfig.getQueueConfigurations().add(new JMSQueueConfigurationImpl().setName(getQueueName()).setDurable(false).setBindings(getQueueName()));
- jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl().setName(getTopicName()).setBindings(getTopicName()));
- server = new JMSServerManagerImpl(activeMQServer, jmsConfig);
- server.setRegistry(new JndiBindingRegistry(new InVMNamingContext()));
- return server;
- }
-
- public static class MyCoreInterceptor implements Interceptor {
-
- static List<Packet> incomingInterceptedFrames = new ArrayList<>();
-
- @Override
- public boolean intercept(Packet packet, RemotingConnection connection) {
- incomingInterceptedFrames.add(packet);
- return true;
- }
-
- }
-
- public static class MyIncomingStompFrameInterceptor implements StompFrameInterceptor {
-
- static List<StompFrame> incomingInterceptedFrames = new ArrayList<>();
-
- @Override
- public boolean intercept(StompFrame stompFrame, RemotingConnection connection) {
- incomingInterceptedFrames.add(stompFrame);
- stompFrame.addHeader("incomingInterceptedProp", "incomingInterceptedVal");
- return true;
- }
- }
-
- public static class MyOutgoingStompFrameInterceptor implements StompFrameInterceptor {
-
- static List<StompFrame> outgoingInterceptedFrames = new ArrayList<>();
-
- @Override
- public boolean intercept(StompFrame stompFrame, RemotingConnection connection) {
- outgoingInterceptedFrames.add(stompFrame);
- stompFrame.addHeader("outgoingInterceptedProp", "outgoingInterceptedVal");
- return true;
- }
- }
-
- @Test
- public void stompFrameInterceptor() throws Exception {
- MyIncomingStompFrameInterceptor.incomingInterceptedFrames.clear();
- MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.clear();
- try {
- List<String> incomingInterceptorList = new ArrayList<>();
- incomingInterceptorList.add("org.apache.activemq.artemis.tests.integration.stomp.ExtraStompTest$MyIncomingStompFrameInterceptor");
- incomingInterceptorList.add("org.apache.activemq.artemis.tests.integration.stomp.ExtraStompTest$MyCoreInterceptor");
- List<String> outgoingInterceptorList = new ArrayList<>();
- outgoingInterceptorList.add("org.apache.activemq.artemis.tests.integration.stomp.ExtraStompTest$MyOutgoingStompFrameInterceptor");
-
- server = createServerWithStompInterceptor(incomingInterceptorList, outgoingInterceptorList);
- server.start();
-
- setUpAfterServer(); // This will make some calls through core
-
- // So we clear them here
- MyCoreInterceptor.incomingInterceptedFrames.clear();
-
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = receiveFrame(100000);
-
- frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\nfff" + Stomp.NULL;
- sendFrame(frame);
-
- assertEquals(0, MyCoreInterceptor.incomingInterceptedFrames.size());
- sendMessage(getName());
-
- // Something was supposed to be called on sendMessages
- assertTrue("core interceptor is not working", MyCoreInterceptor.incomingInterceptedFrames.size() > 0);
-
- receiveFrame(10000);
-
- frame = "SEND\n" + "destination:" +
- getQueuePrefix() +
- getQueueName() +
- "\n\n" +
- "Hello World" +
- Stomp.NULL;
- sendFrame(frame);
-
- receiveFrame(10000);
-
- frame = "DISCONNECT\n" + "\n\n" + Stomp.NULL;
-
- sendFrame(frame);
-
- } finally {
- cleanUp();
- server.stop();
- }
-
- List<String> incomingCommands = new ArrayList<>(4);
- incomingCommands.add("CONNECT");
- incomingCommands.add("SUBSCRIBE");
- incomingCommands.add("SEND");
- incomingCommands.add("DISCONNECT");
-
- List<String> outgoingCommands = new ArrayList<>(3);
- outgoingCommands.add("CONNECTED");
- outgoingCommands.add("MESSAGE");
- outgoingCommands.add("MESSAGE");
-
- long timeout = System.currentTimeMillis() + 1000;
-
- // Things are async, giving some time to things arrive before we actually assert
- while (MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size() < 4 &&
- MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size() < 3 &&
- timeout > System.currentTimeMillis()) {
- Thread.sleep(10);
- }
-
- Assert.assertEquals(4, MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size());
- Assert.assertEquals(3, MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size());
-
- for (int i = 0; i < MyIncomingStompFrameInterceptor.incomingInterceptedFrames.size(); i++) {
- Assert.assertEquals(incomingCommands.get(i), MyIncomingStompFrameInterceptor.incomingInterceptedFrames.get(i).getCommand());
- Assert.assertEquals("incomingInterceptedVal", MyIncomingStompFrameInterceptor.incomingInterceptedFrames.get(i).getHeader("incomingInterceptedProp"));
- }
-
- for (int i = 0; i < MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.size(); i++) {
- Assert.assertEquals(outgoingCommands.get(i), MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.get(i).getCommand());
- }
-
- Assert.assertEquals("incomingInterceptedVal", MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.get(2).getHeader("incomingInterceptedProp"));
- Assert.assertEquals("outgoingInterceptedVal", MyOutgoingStompFrameInterceptor.outgoingInterceptedFrames.get(2).getHeader("outgoingInterceptedProp"));
- }
-
- protected JMSServerManager createServerWithStompInterceptor(List<String> stompIncomingInterceptor,
- List<String> stompOutgoingInterceptor) throws Exception {
-
- Map<String, Object> params = new HashMap<>();
- params.put(TransportConstants.PROTOCOLS_PROP_NAME, StompProtocolManagerFactory.STOMP_PROTOCOL_NAME);
- params.put(TransportConstants.PORT_PROP_NAME, TransportConstants.DEFAULT_STOMP_PORT);
- params.put(TransportConstants.STOMP_CONSUMERS_CREDIT, "-1");
- TransportConfiguration stompTransport = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
-
- Configuration config = createBasicConfig().setPersistenceEnabled(false).addAcceptorConfiguration(stompTransport).addAcceptorConfiguration(new TransportConfiguration(INVM_ACCEPTOR_FACTORY)).setIncomingInterceptorClassNames(stompIncomingInterceptor).setOutgoingInterceptorClassNames(stompOutgoingInterceptor);
-
- ActiveMQServer hornetQServer = addServer(ActiveMQServers.newActiveMQServer(config, defUser, defPass));
-
- JMSConfiguration jmsConfig = new JMSConfigurationImpl();
- jmsConfig.getQueueConfigurations().add(new JMSQueueConfigurationImpl().setName(getQueueName()).setDurable(false).setBindings(getQueueName()));
- jmsConfig.getTopicConfigurations().add(new TopicConfigurationImpl().setName(getTopicName()).setBindings(getTopicName()));
- server = new JMSServerManagerImpl(hornetQServer, jmsConfig);
- server.setRegistry(new JndiBindingRegistry(new InVMNamingContext()));
- return server;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompConnectionCleanupTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompConnectionCleanupTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompConnectionCleanupTest.java
index 419b339..ac89c1d 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompConnectionCleanupTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompConnectionCleanupTest.java
@@ -19,32 +19,20 @@ package org.apache.activemq.artemis.tests.integration.stomp;
import javax.jms.Message;
import javax.jms.MessageConsumer;
-import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.jms.server.JMSServerManager;
+import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
import org.junit.Test;
-public class StompConnectionCleanupTest extends StompTestBase {
+public class StompConnectionCleanupTest extends StompTest {
private static final long CONNECTION_TTL = 2000;
// ARTEMIS-231
@Test
public void testConnectionCleanupWithTopicSubscription() throws Exception {
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
- frame = receiveFrame(10000);
+ conn.connect(defUser, defPass);
- //We send and consumer a message to ensure a STOMP connection and server session is created
-
- System.out.println("Received frame: " + frame);
-
- assertTrue(frame.startsWith("CONNECTED"));
-
- frame = "SUBSCRIBE\n" + "destination:" + getTopicPrefix() + getTopicName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = "DISCONNECT\n\n" + Stomp.NULL;
- sendFrame(frame);
+ subscribeTopic(conn, null, "auto", null);
// Now we wait until the connection is cleared on the server, which will happen some time after ttl, since no data
// is being sent
@@ -72,25 +60,16 @@ public class StompConnectionCleanupTest extends StompTestBase {
@Test
public void testConnectionCleanup() throws Exception {
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
- frame = receiveFrame(10000);
+ conn.connect(defUser, defPass);
- //We send and consumer a message to ensure a STOMP connection and server session is created
+ subscribe(conn, null, "auto", null);
- System.out.println("Received frame: " + frame);
+ send(conn, getQueuePrefix() + getQueueName(), null, "Hello World");
- assertTrue(frame.startsWith("CONNECTED"));
+ ClientStompFrame frame = conn.receiveFrame(10000);
- frame = "SUBSCRIBE\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n" + "ack:auto\n\n" + Stomp.NULL;
- sendFrame(frame);
-
- frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
- sendFrame(frame);
-
- frame = receiveFrame(10000);
- assertTrue(frame.startsWith("MESSAGE"));
- assertTrue(frame.indexOf("destination:") > 0);
+ assertTrue(frame.getCommand().equals("MESSAGE"));
+ assertTrue(frame.getHeader("destination").equals(getQueuePrefix() + getQueueName()));
// Now we wait until the connection is cleared on the server, which will happen some time after ttl, since no data
// is being sent
@@ -118,13 +97,7 @@ public class StompConnectionCleanupTest extends StompTestBase {
@Test
public void testConnectionNotCleanedUp() throws Exception {
- String frame = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
- sendFrame(frame);
- frame = receiveFrame(10000);
-
- //We send and consumer a message to ensure a STOMP connection and server session is created
-
- assertTrue(frame.startsWith("CONNECTED"));
+ conn.connect(defUser, defPass);
MessageConsumer consumer = session.createConsumer(queue);
@@ -136,8 +109,7 @@ public class StompConnectionCleanupTest extends StompTestBase {
while (true) {
//Send and receive a msg
- frame = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n\n" + "Hello World" + Stomp.NULL;
- sendFrame(frame);
+ send(conn, getQueuePrefix() + getQueueName(), null, "Hello World");
Message msg = consumer.receive(1000);
assertNotNull(msg);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverHttpTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverHttpTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverHttpTest.java
deleted file mode 100644
index 138e37c..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverHttpTest.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.integration.stomp;
-
-import java.nio.charset.StandardCharsets;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.http.DefaultFullHttpRequest;
-import io.netty.handler.codec.http.DefaultHttpContent;
-import io.netty.handler.codec.http.FullHttpRequest;
-import io.netty.handler.codec.http.HttpHeaders;
-import io.netty.handler.codec.http.HttpMethod;
-import io.netty.handler.codec.http.HttpRequestEncoder;
-import io.netty.handler.codec.http.HttpResponseDecoder;
-import io.netty.handler.codec.http.HttpVersion;
-import io.netty.handler.codec.string.StringDecoder;
-import io.netty.handler.codec.string.StringEncoder;
-
-public class StompOverHttpTest extends StompTest {
-
- @Override
- protected void addChannelHandlers(int index, SocketChannel ch) {
- ch.pipeline().addLast(new HttpRequestEncoder());
- ch.pipeline().addLast(new HttpResponseDecoder());
- ch.pipeline().addLast(new HttpHandler());
- ch.pipeline().addLast("decoder", new StringDecoder(StandardCharsets.UTF_8));
- ch.pipeline().addLast("encoder", new StringEncoder(StandardCharsets.UTF_8));
- ch.pipeline().addLast(new StompClientHandler(index));
- }
-
- @Override
- public String receiveFrame(long timeOut) throws Exception {
- //we are request/response so may need to send an empty request so we get responses piggy backed
- sendFrame(new byte[]{});
- return super.receiveFrame(timeOut);
- }
-
- class HttpHandler extends ChannelDuplexHandler {
-
- @Override
- public void channelRead(final ChannelHandlerContext ctx, final Object msg) throws Exception {
- if (msg instanceof DefaultHttpContent) {
- DefaultHttpContent response = (DefaultHttpContent) msg;
- ctx.fireChannelRead(response.content());
- }
- }
-
- @Override
- public void write(final ChannelHandlerContext ctx, final Object msg, ChannelPromise promise) throws Exception {
- if (msg instanceof ByteBuf) {
- ByteBuf buf = (ByteBuf) msg;
- FullHttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, "", buf);
- httpRequest.headers().add(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(buf.readableBytes()));
- ctx.write(httpRequest, promise);
- } else {
- ctx.write(msg, promise);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverWebsocketTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverWebsocketTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverWebsocketTest.java
deleted file mode 100644
index 95801f7..0000000
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/StompOverWebsocketTest.java
+++ /dev/null
@@ -1,151 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.tests.integration.stomp;
-
-import java.net.URI;
-import java.net.URISyntaxException;
-import java.nio.charset.StandardCharsets;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-import io.netty.buffer.Unpooled;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelDuplexHandler;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.socket.SocketChannel;
-import io.netty.handler.codec.http.FullHttpResponse;
-import io.netty.handler.codec.http.HttpClientCodec;
-import io.netty.handler.codec.http.HttpObjectAggregator;
-import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.WebSocketClientHandshaker;
-import io.netty.handler.codec.http.websocketx.WebSocketClientHandshakerFactory;
-import io.netty.handler.codec.http.websocketx.WebSocketFrame;
-import io.netty.handler.codec.http.websocketx.WebSocketVersion;
-import io.netty.handler.codec.string.StringDecoder;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-@RunWith(Parameterized.class)
-public class StompOverWebsocketTest extends StompTest {
-
- private ChannelPromise handshakeFuture;
-
- private final boolean useBinaryFrames;
-
- @Parameterized.Parameters(name = "useBinaryFrames={0}")
- public static Collection<Object[]> data() {
- List<Object[]> list = Arrays.asList(new Object[][]{{Boolean.TRUE}, {Boolean.FALSE}});
- return list;
- }
-
- public StompOverWebsocketTest(Boolean useBinaryFrames) {
- super();
- this.useBinaryFrames = useBinaryFrames;
- }
-
- @Override
- protected void addChannelHandlers(int index, SocketChannel ch) throws URISyntaxException {
- ch.pipeline().addLast("http-codec", new HttpClientCodec());
- ch.pipeline().addLast("aggregator", new HttpObjectAggregator(8192));
- ch.pipeline().addLast(new WebsocketHandler(WebSocketClientHandshakerFactory.newHandshaker(new URI("ws://localhost:8080/websocket"), WebSocketVersion.V13, null, false, null)));
- ch.pipeline().addLast("decoder", new StringDecoder(StandardCharsets.UTF_8));
- ch.pipeline().addLast(new StompClientHandler(index));
- }
-
- @Override
- protected void handshake() throws InterruptedException {
- handshakeFuture.sync();
- }
-
- class WebsocketHandler extends ChannelDuplexHandler {
-
- private WebSocketClientHandshaker handshaker;
-
- WebsocketHandler(WebSocketClientHandshaker webSocketClientHandshaker) {
- this.handshaker = webSocketClientHandshaker;
- }
-
- @Override
- public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
- handshakeFuture = ctx.newPromise();
- }
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- handshaker.handshake(ctx.channel());
- }
-
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- System.out.println("WebSocket Client disconnected!");
- }
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- Channel ch = ctx.channel();
- if (!handshaker.isHandshakeComplete()) {
- handshaker.finishHandshake(ch, (FullHttpResponse) msg);
- System.out.println("WebSocket Client connected!");
- handshakeFuture.setSuccess();
- return;
- }
-
- if (msg instanceof FullHttpResponse) {
- FullHttpResponse response = (FullHttpResponse) msg;
- throw new Exception("Unexpected FullHttpResponse (getStatus=" + response.getStatus() + ", content=" + response.content().toString(StandardCharsets.UTF_8) + ')');
- }
-
- WebSocketFrame frame = (WebSocketFrame) msg;
- if (frame instanceof BinaryWebSocketFrame) {
- BinaryWebSocketFrame dataFrame = (BinaryWebSocketFrame) frame;
- super.channelRead(ctx, dataFrame.content());
- } else if (frame instanceof PongWebSocketFrame) {
- System.out.println("WebSocket Client received pong");
- } else if (frame instanceof CloseWebSocketFrame) {
- System.out.println("WebSocket Client received closing");
- ch.close();
- }
- }
-
- @Override
- public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
- try {
- if (msg instanceof String) {
- ctx.write(createFrame((String) msg), promise);
- } else {
- super.write(ctx, msg, promise);
- }
- } catch (Exception e) {
- e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
- }
- }
- }
-
- protected WebSocketFrame createFrame(String msg) {
- if (useBinaryFrames) {
- return new BinaryWebSocketFrame(Unpooled.copiedBuffer(msg, StandardCharsets.UTF_8));
- } else {
- return new TextWebSocketFrame(msg);
- }
- }
-
-}