You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2016/11/11 19:23:35 UTC
[05/11] activemq-artemis git commit: Stomp refactor + track
autocreation for addresses
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java
index 7a1a529..d32823b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java
@@ -18,52 +18,47 @@ package org.apache.activemq.artemis.tests.integration.stomp.util;
import java.io.IOException;
-/**
- * pls use factory to create frames.
- */
+import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+
public class StompClientConnectionV10 extends AbstractStompClientConnection {
public StompClientConnectionV10(String host, int port) throws IOException {
super("1.0", host, port);
}
+ public StompClientConnectionV10(String version, String host, int port) throws IOException {
+ super(version, host, port);
+ }
+
@Override
public ClientStompFrame connect(String username, String passcode) throws IOException, InterruptedException {
- ClientStompFrame frame = factory.newFrame(CONNECT_COMMAND);
- frame.addHeader(LOGIN_HEADER, username);
- frame.addHeader(PASSCODE_HEADER, passcode);
-
- ClientStompFrame response = this.sendFrame(frame);
-
- if (response.getCommand().equals(CONNECTED_COMMAND)) {
- connected = true;
- } else {
- System.out.println("Connection failed with: " + response);
- connected = false;
- }
- return response;
+ return connect(username, passcode, null);
}
@Override
- public void connect(String username, String passcode, String clientID) throws IOException, InterruptedException {
- ClientStompFrame frame = factory.newFrame(CONNECT_COMMAND);
- frame.addHeader(LOGIN_HEADER, username);
- frame.addHeader(PASSCODE_HEADER, passcode);
- frame.addHeader(CLIENT_ID_HEADER, clientID);
+ public ClientStompFrame connect(String username, String passcode, String clientID) throws IOException, InterruptedException {
+ ClientStompFrame frame = factory.newFrame(Stomp.Commands.CONNECT);
+ frame.addHeader(Stomp.Headers.Connect.LOGIN, username);
+ frame.addHeader(Stomp.Headers.Connect.PASSCODE, passcode);
+ if (clientID != null) {
+ frame.addHeader(Stomp.Headers.Connect.CLIENT_ID, clientID);
+ }
ClientStompFrame response = this.sendFrame(frame);
- if (response.getCommand().equals(CONNECTED_COMMAND)) {
+ if (response.getCommand().equals(Stomp.Responses.CONNECTED)) {
connected = true;
} else {
- System.out.println("Connection failed with: " + response);
+ IntegrationTestLogger.LOGGER.warn("Connection failed with: " + response);
connected = false;
}
+ return response;
}
@Override
public void disconnect() throws IOException, InterruptedException {
- ClientStompFrame frame = factory.newFrame(DISCONNECT_COMMAND);
+ ClientStompFrame frame = factory.newFrame(Stomp.Commands.DISCONNECT);
this.sendFrame(frame);
close();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV11.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV11.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV11.java
index aa07145..cfc8f92 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV11.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV11.java
@@ -17,55 +17,40 @@
package org.apache.activemq.artemis.tests.integration.stomp.util;
import java.io.IOException;
+import java.util.UUID;
-public class StompClientConnectionV11 extends AbstractStompClientConnection {
+import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+
+public class StompClientConnectionV11 extends StompClientConnectionV10 {
public StompClientConnectionV11(String host, int port) throws IOException {
super("1.1", host, port);
}
- @Override
- public ClientStompFrame connect(String username, String passcode) throws IOException, InterruptedException {
- ClientStompFrame frame = factory.newFrame(CONNECT_COMMAND);
- frame.addHeader(ACCEPT_HEADER, "1.1");
- frame.addHeader(HOST_HEADER, "localhost");
- if (username != null) {
- frame.addHeader(LOGIN_HEADER, username);
- frame.addHeader(PASSCODE_HEADER, passcode);
- }
-
- ClientStompFrame response = this.sendFrame(frame);
-
- if (response.getCommand().equals(CONNECTED_COMMAND)) {
- String version = response.getHeader(VERSION_HEADER);
- assert (version.equals("1.1"));
-
- this.username = username;
- this.passcode = passcode;
- this.connected = true;
- } else {
- connected = false;
- }
- return response;
+ public StompClientConnectionV11(String version, String host, int port) throws IOException {
+ super(version, host, port);
}
@Override
- public void connect(String username, String passcode, String clientID) throws IOException, InterruptedException {
- ClientStompFrame frame = factory.newFrame(CONNECT_COMMAND);
- frame.addHeader(ACCEPT_HEADER, "1.1");
- frame.addHeader(HOST_HEADER, "localhost");
- frame.addHeader(CLIENT_ID_HEADER, clientID);
+ public ClientStompFrame connect(String username, String passcode, String clientID) throws IOException, InterruptedException {
+ ClientStompFrame frame = factory.newFrame(Stomp.Commands.CONNECT);
+ frame.addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, getVersion());
+ frame.addHeader(Stomp.Headers.Connect.HOST, "localhost");
+ if (clientID != null) {
+ frame.addHeader(Stomp.Headers.Connect.CLIENT_ID, clientID);
+ }
if (username != null) {
- frame.addHeader(LOGIN_HEADER, username);
- frame.addHeader(PASSCODE_HEADER, passcode);
+ frame.addHeader(Stomp.Headers.Connect.LOGIN, username);
+ frame.addHeader(Stomp.Headers.Connect.PASSCODE, passcode);
}
ClientStompFrame response = this.sendFrame(frame);
- if (response.getCommand().equals(CONNECTED_COMMAND)) {
- String version = response.getHeader(VERSION_HEADER);
- assert (version.equals("1.1"));
+ if (Stomp.Responses.CONNECTED.equals(response.getCommand())) {
+ String version = response.getHeader(Stomp.Headers.Connected.VERSION);
+ if (!version.equals(getVersion()))
+ throw new IllegalStateException("incorrect version!");
this.username = username;
this.passcode = passcode;
@@ -73,22 +58,24 @@ public class StompClientConnectionV11 extends AbstractStompClientConnection {
} else {
connected = false;
}
+ return response;
}
public void connect1(String username, String passcode) throws IOException, InterruptedException {
- ClientStompFrame frame = factory.newFrame(STOMP_COMMAND);
- frame.addHeader(ACCEPT_HEADER, "1.0,1.1");
- frame.addHeader(HOST_HEADER, "127.0.0.1");
+ ClientStompFrame frame = factory.newFrame(Stomp.Commands.STOMP);
+ frame.addHeader(Stomp.Headers.Connect.ACCEPT_VERSION, "1.0,1.1");
+ frame.addHeader(Stomp.Headers.Connect.HOST, "127.0.0.1");
if (username != null) {
- frame.addHeader(LOGIN_HEADER, username);
- frame.addHeader(PASSCODE_HEADER, passcode);
+ frame.addHeader(Stomp.Headers.Connect.LOGIN, username);
+ frame.addHeader(Stomp.Headers.Connect.PASSCODE, passcode);
}
ClientStompFrame response = this.sendFrame(frame);
- if (response.getCommand().equals(CONNECTED_COMMAND)) {
- String version = response.getHeader(VERSION_HEADER);
- assert (version.equals("1.1"));
+ if (Stomp.Responses.CONNECTED.equals(response.getCommand())) {
+ String version = response.getHeader(Stomp.Headers.Connected.VERSION);
+ if (!version.equals(getVersion()))
+ throw new IllegalStateException("incorrect version!");
this.username = username;
this.passcode = passcode;
@@ -103,12 +90,15 @@ public class StompClientConnectionV11 extends AbstractStompClientConnection {
public void disconnect() throws IOException, InterruptedException {
stopPinger();
- ClientStompFrame frame = factory.newFrame(DISCONNECT_COMMAND);
- frame.addHeader("receipt", "1");
+ ClientStompFrame frame = factory.newFrame(Stomp.Commands.DISCONNECT);
+
+ String uuid = UUID.randomUUID().toString();
+
+ frame.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
ClientStompFrame result = this.sendFrame(frame);
- if (result == null || (!"RECEIPT".equals(result.getCommand())) || (!"1".equals(result.getHeader("receipt-id")))) {
+ if (result == null || (!Stomp.Responses.RECEIPT.equals(result.getCommand())) || (!uuid.equals(result.getHeader(Stomp.Headers.Response.RECEIPT_ID)))) {
throw new IOException("Disconnect failed! " + result);
}
@@ -122,4 +112,28 @@ public class StompClientConnectionV11 extends AbstractStompClientConnection {
return factory.newFrame(command);
}
+ @Override
+ public void startPinger(long interval) {
+ pinger = new Pinger(interval);
+ pinger.startPing();
+ }
+
+ @Override
+ public void stopPinger() {
+ if (pinger != null) {
+ pinger.stopPing();
+ try {
+ pinger.join();
+ } catch (InterruptedException e) {
+ e.printStackTrace();
+ }
+ pinger = null;
+ }
+ }
+
+ @Override
+ public int getServerPingNumber() {
+ return serverPingCounter;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV12.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV12.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV12.java
index fb77832..2d8f354 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV12.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV12.java
@@ -18,90 +18,13 @@ package org.apache.activemq.artemis.tests.integration.stomp.util;
import java.io.IOException;
-public class StompClientConnectionV12 extends AbstractStompClientConnection {
+public class StompClientConnectionV12 extends StompClientConnectionV11 {
public StompClientConnectionV12(String host, int port) throws IOException {
super("1.2", host, port);
}
- @Override
- public ClientStompFrame createFrame(String command) {
- return factory.newFrame(command);
- }
-
- @Override
- public ClientStompFrame connect(String username, String passcode) throws IOException, InterruptedException {
- ClientStompFrame frame = factory.newFrame(CONNECT_COMMAND);
- frame.addHeader(ACCEPT_HEADER, "1.2");
- frame.addHeader(HOST_HEADER, "localhost");
- if (username != null) {
- frame.addHeader(LOGIN_HEADER, username);
- frame.addHeader(PASSCODE_HEADER, passcode);
- }
-
- ClientStompFrame response = this.sendFrame(frame);
-
- if (response.getCommand().equals(CONNECTED_COMMAND)) {
- String version = response.getHeader(VERSION_HEADER);
- if (!version.equals("1.2"))
- throw new IllegalStateException("incorrect version!");
-
- this.username = username;
- this.passcode = passcode;
- this.connected = true;
- } else {
- connected = false;
- }
- return response;
- }
-
- @Override
- public void disconnect() throws IOException, InterruptedException {
- stopPinger();
-
- ClientStompFrame frame = factory.newFrame(DISCONNECT_COMMAND);
- frame.addHeader("receipt", "1");
-
- ClientStompFrame result = this.sendFrame(frame);
-
- if (result == null || (!"RECEIPT".equals(result.getCommand())) || (!"1".equals(result.getHeader("receipt-id")))) {
- throw new IOException("Disconnect failed! " + result);
- }
-
- close();
-
- connected = false;
- }
-
- @Override
- public void connect(String username, String passcode, String clientID) throws Exception {
- ClientStompFrame frame = factory.newFrame(CONNECT_COMMAND);
- frame.addHeader(ACCEPT_HEADER, "1.2");
- frame.addHeader(HOST_HEADER, "localhost");
- frame.addHeader(CLIENT_ID_HEADER, clientID);
-
- if (username != null) {
- frame.addHeader(LOGIN_HEADER, username);
- frame.addHeader(PASSCODE_HEADER, passcode);
- }
-
- ClientStompFrame response = this.sendFrame(frame);
-
- if (response.getCommand().equals(CONNECTED_COMMAND)) {
- String version = response.getHeader(VERSION_HEADER);
- if (!version.equals("1.2"))
- throw new IllegalStateException("incorrect version!");
-
- this.username = username;
- this.passcode = passcode;
- this.connected = true;
- } else {
- connected = false;
- }
- }
-
public ClientStompFrame createAnyFrame(String command) {
return factory.newAnyFrame(command);
}
-
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactory.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactory.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactory.java
index 3ec03cf..1c78c5a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactory.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactory.java
@@ -24,4 +24,6 @@ public interface StompFrameFactory {
ClientStompFrame newAnyFrame(String command);
+ String[] handleHeaders(String header);
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV10.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV10.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV10.java
index 5ed8566..8813fd6 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV10.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV10.java
@@ -38,18 +38,18 @@ import java.util.StringTokenizer;
public class StompFrameFactoryV10 implements StompFrameFactory {
@Override
- public ClientStompFrame createFrame(String data) {
+ public ClientStompFrame createFrame(final String data) {
//split the string at "\n\n"
String[] dataFields = data.split("\n\n");
StringTokenizer tokenizer = new StringTokenizer(dataFields[0], "\n");
String command = tokenizer.nextToken();
- ClientStompFrame frame = new ClientStompFrameV10(command);
+ ClientStompFrame frame = newFrame(command);
while (tokenizer.hasMoreTokens()) {
String header = tokenizer.nextToken();
- String[] fields = header.split(":");
+ String[] fields = handleHeaders(header);
frame.addHeader(fields[0], fields[1]);
}
@@ -61,6 +61,11 @@ public class StompFrameFactoryV10 implements StompFrameFactory {
}
@Override
+ public String[] handleHeaders(String header) {
+ return header.split(":");
+ }
+
+ @Override
public ClientStompFrame newFrame(String command) {
return new ClientStompFrameV10(command);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV11.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV11.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV11.java
index 4de0d7d..807b3f0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV11.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV11.java
@@ -36,32 +36,10 @@ import java.util.StringTokenizer;
* 13. RECEIPT
* 14. ERROR
*/
-public class StompFrameFactoryV11 implements StompFrameFactory {
+public class StompFrameFactoryV11 extends StompFrameFactoryV10 {
@Override
- public ClientStompFrame createFrame(final String data) {
- //split the string at "\n\n"
- String[] dataFields = data.split("\n\n");
-
- StringTokenizer tokenizer = new StringTokenizer(dataFields[0], "\n");
-
- String command = tokenizer.nextToken();
- ClientStompFrame frame = new ClientStompFrameV11(command);
-
- while (tokenizer.hasMoreTokens()) {
- String header = tokenizer.nextToken();
- String[] fields = splitAndDecodeHeader(header);
- frame.addHeader(fields[0], fields[1]);
- }
-
- //body (without null byte)
- if (dataFields.length == 2) {
- frame.setBody(dataFields[1]);
- }
- return frame;
- }
-
- private String[] splitAndDecodeHeader(String header) {
+ public String[] handleHeaders(String header) {
// split the header into the key and value at the ":" since there shouldn't be any unescaped colons in the header
// except for the one separating the key and value
String[] result = header.split(":");
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV12.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV12.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV12.java
index 5223b4e..dbd32e0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV12.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompFrameFactoryV12.java
@@ -18,42 +18,10 @@ package org.apache.activemq.artemis.tests.integration.stomp.util;
import java.util.StringTokenizer;
-public class StompFrameFactoryV12 implements StompFrameFactory {
+public class StompFrameFactoryV12 extends StompFrameFactoryV11 {
@Override
- public ClientStompFrame createFrame(String data) {
- //split the string at "\n\n"
- String[] dataFields = data.split("\n\n");
-
- StringTokenizer tokenizer = new StringTokenizer(dataFields[0], "\n");
-
- String command = tokenizer.nextToken();
- ClientStompFrame frame = new ClientStompFrameV12(command);
-
- while (tokenizer.hasMoreTokens()) {
- String header = tokenizer.nextToken();
- String[] fields = splitAndDecodeHeader(header);
- frame.addHeader(fields[0], fields[1]);
- }
-
- //body (without null byte)
- if (dataFields.length == 2) {
- frame.setBody(dataFields[1]);
- }
- return frame;
- }
-
- public void printByteHeader(String headers) {
- StringBuffer buffer = new StringBuffer();
-
- for (int i = 0; i < headers.length(); i++) {
- char c = headers.charAt(i);
- buffer.append((byte) c + " ");
- }
- System.out.println("header in byte : " + buffer.toString());
- }
-
- private String[] splitAndDecodeHeader(String header) {
+ public String[] handleHeaders(String header) {
// split the header into the key and value at the ":" since there shouldn't be any unescaped colons in the header
// except for the one separating the key and value
String[] result = header.split(":");
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/84df373a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/ExtraStompTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/ExtraStompTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/ExtraStompTest.java
index a5d3068..5414a9f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/ExtraStompTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/v11/ExtraStompTest.java
@@ -18,6 +18,8 @@ package org.apache.activemq.artemis.tests.integration.stomp.v11;
import java.nio.charset.StandardCharsets;
+import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
+import org.apache.activemq.artemis.tests.integration.stomp.StompTestBase;
import org.apache.activemq.artemis.tests.integration.stomp.util.ClientStompFrame;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnection;
import org.apache.activemq.artemis.tests.integration.stomp.util.StompClientConnectionFactory;
@@ -28,15 +30,18 @@ import org.junit.Test;
/*
* Some Stomp tests against server with persistence enabled are put here.
*/
-public class ExtraStompTest extends StompV11TestBase {
+public class ExtraStompTest extends StompTestBase {
private StompClientConnection connV10;
private StompClientConnection connV11;
+ public boolean isPersistenceEnabled() {
+ return true;
+ }
+
@Override
@Before
public void setUp() throws Exception {
- persistenceEnabled = true;
super.setUp();
connV10 = StompClientConnectionFactory.createClientConnection("1.0", hostname, port);
connV10.connect(defUser, defPass);
@@ -57,331 +62,142 @@ public class ExtraStompTest extends StompV11TestBase {
@Test
public void testSendAndReceive10() throws Exception {
- String msg1 = "Hello World 1!";
- String msg2 = "Hello World 2!";
-
- ClientStompFrame frame = connV10.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-length", String.valueOf(msg1.getBytes(StandardCharsets.UTF_8).length));
- frame.addHeader("persistent", "true");
- frame.setBody(msg1);
-
- connV10.sendFrame(frame);
-
- ClientStompFrame frame2 = connV10.createFrame("SEND");
- frame2.addHeader("destination", getQueuePrefix() + getQueueName());
- frame2.addHeader("content-length", String.valueOf(msg2.getBytes(StandardCharsets.UTF_8).length));
- frame2.addHeader("persistent", "true");
- frame2.setBody(msg2);
-
- connV10.sendFrame(frame2);
-
- ClientStompFrame subFrame = connV10.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- connV10.sendFrame(subFrame);
-
- frame = connV10.receiveFrame();
-
- System.out.println("received " + frame);
-
- assertEquals("MESSAGE", frame.getCommand());
-
- assertEquals("a-sub", frame.getHeader("subscription"));
-
- assertNotNull(frame.getHeader("message-id"));
-
- assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader("destination"));
-
- assertEquals(msg1, frame.getBody());
-
- frame = connV10.receiveFrame();
-
- System.out.println("received " + frame);
-
- assertEquals("MESSAGE", frame.getCommand());
-
- assertEquals("a-sub", frame.getHeader("subscription"));
-
- assertNotNull(frame.getHeader("message-id"));
-
- assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader("destination"));
-
- assertEquals(msg2, frame.getBody());
-
- //unsub
- ClientStompFrame unsubFrame = connV10.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
- connV10.sendFrame(unsubFrame);
-
+ testSendAndReceive(connV10);
}
@Test
public void testSendAndReceive11() throws Exception {
+ testSendAndReceive(connV11);
+ }
+
+ public void testSendAndReceive(StompClientConnection conn) throws Exception {
String msg1 = "Hello World 1!";
String msg2 = "Hello World 2!";
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-length", String.valueOf(msg1.getBytes(StandardCharsets.UTF_8).length));
- frame.addHeader("persistent", "true");
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND);
+ frame.addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName());
+ frame.addHeader(Stomp.Headers.CONTENT_LENGTH, String.valueOf(msg1.getBytes(StandardCharsets.UTF_8).length));
+ frame.addHeader(Stomp.Headers.Send.PERSISTENT, Boolean.TRUE.toString());
frame.setBody(msg1);
- connV11.sendFrame(frame);
+ conn.sendFrame(frame);
- ClientStompFrame frame2 = connV11.createFrame("SEND");
- frame2.addHeader("destination", getQueuePrefix() + getQueueName());
- frame2.addHeader("content-length", String.valueOf(msg2.getBytes(StandardCharsets.UTF_8).length));
- frame2.addHeader("persistent", "true");
+ ClientStompFrame frame2 = conn.createFrame(Stomp.Commands.SEND);
+ frame2.addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName());
+ frame2.addHeader(Stomp.Headers.CONTENT_LENGTH, String.valueOf(msg2.getBytes(StandardCharsets.UTF_8).length));
+ frame2.addHeader(Stomp.Headers.Send.PERSISTENT, Boolean.TRUE.toString());
frame2.setBody(msg2);
- connV11.sendFrame(frame2);
-
- ClientStompFrame subFrame = connV11.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- connV11.sendFrame(subFrame);
-
- frame = connV11.receiveFrame();
-
- System.out.println("received " + frame);
-
- assertEquals("MESSAGE", frame.getCommand());
+ conn.sendFrame(frame2);
- assertEquals("a-sub", frame.getHeader("subscription"));
+ subscribe(conn, "a-sub");
- assertNotNull(frame.getHeader("message-id"));
-
- assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader("destination"));
+ frame = conn.receiveFrame();
+ assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+ assertEquals("a-sub", frame.getHeader(Stomp.Headers.Message.SUBSCRIPTION));
+ assertNotNull(frame.getHeader(Stomp.Headers.Message.MESSAGE_ID));
+ assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader(Stomp.Headers.Subscribe.DESTINATION));
assertEquals(msg1, frame.getBody());
- frame = connV11.receiveFrame();
-
- System.out.println("received " + frame);
-
- assertEquals("MESSAGE", frame.getCommand());
-
- assertEquals("a-sub", frame.getHeader("subscription"));
-
- assertNotNull(frame.getHeader("message-id"));
-
- assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader("destination"));
+ frame = conn.receiveFrame();
+ assertEquals(Stomp.Responses.MESSAGE, frame.getCommand());
+ assertEquals("a-sub", frame.getHeader(Stomp.Headers.Message.SUBSCRIPTION));
+ assertNotNull(frame.getHeader(Stomp.Headers.Message.MESSAGE_ID));
+ assertEquals(getQueuePrefix() + getQueueName(), frame.getHeader(Stomp.Headers.Subscribe.DESTINATION));
assertEquals(msg2, frame.getBody());
- //unsub
- ClientStompFrame unsubFrame = connV11.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
- connV11.sendFrame(unsubFrame);
+ unsubscribe(conn, "a-sub");
}
@Test
public void testNoGarbageAfterPersistentMessageV10() throws Exception {
- ClientStompFrame subFrame = connV10.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- connV10.sendFrame(subFrame);
+ testNoGarbageAfterPersistentMessage(connV10);
+ }
- ClientStompFrame frame = connV10.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-length", "11");
- frame.addHeader("persistent", "true");
+ @Test
+ public void testNoGarbageAfterPersistentMessageV11() throws Exception {
+ testNoGarbageAfterPersistentMessage(connV11);
+ }
+
+ public void testNoGarbageAfterPersistentMessage(StompClientConnection conn) throws Exception {
+ subscribe(conn, "a-sub");
+
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND);
+ frame.addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName());
+ frame.addHeader(Stomp.Headers.CONTENT_LENGTH, "11");
+ frame.addHeader(Stomp.Headers.Send.PERSISTENT, Boolean.TRUE.toString());
frame.setBody("Hello World");
- connV10.sendFrame(frame);
+ conn.sendFrame(frame);
- frame = connV10.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-length", "11");
- frame.addHeader("persistent", "true");
+ frame = conn.createFrame(Stomp.Commands.SEND);
+ frame.addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName());
+ frame.addHeader(Stomp.Headers.CONTENT_LENGTH, "11");
+ frame.addHeader(Stomp.Headers.Send.PERSISTENT, Boolean.TRUE.toString());
frame.setBody("Hello World");
- connV10.sendFrame(frame);
-
- frame = connV10.receiveFrame(10000);
+ conn.sendFrame(frame);
- System.out.println("received: " + frame);
+ frame = conn.receiveFrame(10000);
assertEquals("Hello World", frame.getBody());
//if activemq sends trailing garbage bytes, the second message
//will not be normal
- frame = connV10.receiveFrame(10000);
-
- System.out.println("received: " + frame);
+ frame = conn.receiveFrame(10000);
assertEquals("Hello World", frame.getBody());
- //unsub
- ClientStompFrame unsubFrame = connV10.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
- connV10.sendFrame(unsubFrame);
-
+ unsubscribe(conn, "a-sub");
}
@Test
public void testNoGarbageOnPersistentRedeliveryV10() throws Exception {
-
- ClientStompFrame frame = connV10.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-length", "11");
- frame.addHeader("persistent", "true");
- frame.setBody("Hello World");
-
- connV10.sendFrame(frame);
-
- frame = connV10.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-length", "11");
- frame.addHeader("persistent", "true");
- frame.setBody("Hello World");
-
- connV10.sendFrame(frame);
-
- ClientStompFrame subFrame = connV10.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "client");
-
- connV10.sendFrame(subFrame);
-
- // receive but don't ack
- frame = connV10.receiveFrame(10000);
- frame = connV10.receiveFrame(10000);
-
- System.out.println("received: " + frame);
-
- //unsub
- ClientStompFrame unsubFrame = connV10.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
- connV10.sendFrame(unsubFrame);
-
- subFrame = connV10.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- connV10.sendFrame(subFrame);
-
- frame = connV10.receiveFrame(10000);
- frame = connV10.receiveFrame(10000);
-
- //second receive will get problem if trailing bytes
- assertEquals("Hello World", frame.getBody());
-
- System.out.println("received again: " + frame);
-
- //unsub
- unsubFrame = connV10.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
- connV10.sendFrame(unsubFrame);
+ testNoGarbageOnPersistentRedelivery(connV10);
}
@Test
- public void testNoGarbageAfterPersistentMessageV11() throws Exception {
- ClientStompFrame subFrame = connV11.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
-
- connV11.sendFrame(subFrame);
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-length", "11");
- frame.addHeader("persistent", "true");
- frame.setBody("Hello World");
-
- connV11.sendFrame(frame);
-
- frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-length", "11");
- frame.addHeader("persistent", "true");
- frame.setBody("Hello World");
-
- connV11.sendFrame(frame);
- frame = connV11.receiveFrame(10000);
-
- System.out.println("received: " + frame);
-
- assertEquals("Hello World", frame.getBody());
-
- //if activemq sends trailing garbage bytes, the second message
- //will not be normal
- frame = connV11.receiveFrame(10000);
-
- System.out.println("received: " + frame);
-
- assertEquals("Hello World", frame.getBody());
-
- //unsub
- ClientStompFrame unsubFrame = connV11.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
- connV11.sendFrame(unsubFrame);
+ public void testNoGarbageOnPersistentRedeliveryV11() throws Exception {
+ testNoGarbageOnPersistentRedelivery(connV11);
}
- @Test
- public void testNoGarbageOnPersistentRedeliveryV11() throws Exception {
- ClientStompFrame frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-length", "11");
- frame.addHeader("persistent", "true");
+ public void testNoGarbageOnPersistentRedelivery(StompClientConnection conn) throws Exception {
+ ClientStompFrame frame = conn.createFrame(Stomp.Commands.SEND);
+ frame.addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName());
+ frame.addHeader(Stomp.Headers.CONTENT_LENGTH, "11");
+ frame.addHeader(Stomp.Headers.Send.PERSISTENT, Boolean.TRUE.toString());
frame.setBody("Hello World");
- connV11.sendFrame(frame);
+ conn.sendFrame(frame);
- frame = connV11.createFrame("SEND");
- frame.addHeader("destination", getQueuePrefix() + getQueueName());
- frame.addHeader("content-length", "11");
- frame.addHeader("persistent", "true");
+ frame = conn.createFrame(Stomp.Commands.SEND);
+ frame.addHeader(Stomp.Headers.Subscribe.DESTINATION, getQueuePrefix() + getQueueName());
+ frame.addHeader(Stomp.Headers.CONTENT_LENGTH, "11");
+ frame.addHeader(Stomp.Headers.Send.PERSISTENT, Boolean.TRUE.toString());
frame.setBody("Hello World");
- connV11.sendFrame(frame);
-
- ClientStompFrame subFrame = connV11.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "client");
+ conn.sendFrame(frame);
- connV11.sendFrame(subFrame);
+ subscribe(conn, "a-sub", Stomp.Headers.Subscribe.AckModeValues.CLIENT);
// receive but don't ack
- frame = connV11.receiveFrame(10000);
- frame = connV11.receiveFrame(10000);
-
- System.out.println("received: " + frame);
+ frame = conn.receiveFrame(10000);
+ frame = conn.receiveFrame(10000);
- //unsub
- ClientStompFrame unsubFrame = connV11.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
- connV11.sendFrame(unsubFrame);
-
- subFrame = connV11.createFrame("SUBSCRIBE");
- subFrame.addHeader("id", "a-sub");
- subFrame.addHeader("destination", getQueuePrefix() + getQueueName());
- subFrame.addHeader("ack", "auto");
+ unsubscribe(conn, "a-sub");
- connV11.sendFrame(subFrame);
+ subscribe(conn, "a-sub");
- frame = connV11.receiveFrame(10000);
- frame = connV11.receiveFrame(10000);
+ frame = conn.receiveFrame(10000);
+ frame = conn.receiveFrame(10000);
//second receive will get problem if trailing bytes
assertEquals("Hello World", frame.getBody());
- System.out.println("received again: " + frame);
-
//unsub
- unsubFrame = connV11.createFrame("UNSUBSCRIBE");
- unsubFrame.addHeader("id", "a-sub");
- connV11.sendFrame(unsubFrame);
+ unsubscribe(conn, "a-sub");
}
}