You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/11/13 22:02:44 UTC
[5/8] activemq-artemis git commit: ARTEMIS-1511 Enable WebSocket
Transport in STOMP test client
ARTEMIS-1511 Enable WebSocket Transport in STOMP test client
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c6e5163a
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c6e5163a
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c6e5163a
Branch: refs/heads/master
Commit: c6e5163a5189f5584746f774ae5ea97b82751aef
Parents: 5211afd
Author: Martyn Taylor <mt...@redhat.com>
Authored: Fri Nov 10 12:33:31 2017 +0000
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Nov 13 16:55:47 2017 -0500
----------------------------------------------------------------------
.../stomp/util/AbstractClientStompFrame.java | 12 ++
.../util/AbstractStompClientConnection.java | 157 ++++++++++++++-----
.../stomp/util/ClientStompFrame.java | 6 +
.../stomp/util/StompClientConnection.java | 1 +
.../util/StompClientConnectionFactory.java | 31 ++++
.../stomp/util/StompClientConnectionV10.java | 6 +
.../stomp/util/StompClientConnectionV11.java | 19 ++-
.../stomp/util/StompClientConnectionV12.java | 5 +
8 files changed, 191 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6e5163a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractClientStompFrame.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractClientStompFrame.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractClientStompFrame.java
index c48fd8d..2f8a11f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractClientStompFrame.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractClientStompFrame.java
@@ -24,6 +24,8 @@ import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
public abstract class AbstractClientStompFrame implements ClientStompFrame {
@@ -88,6 +90,16 @@ public abstract class AbstractClientStompFrame implements ClientStompFrame {
return toByteBufferInternal(str);
}
+ @Override
+ public ByteBuf toNettyByteBuf() {
+ return Unpooled.copiedBuffer(toByteBuffer());
+ }
+
+ @Override
+ public ByteBuf toNettyByteBufWithExtras(String str) {
+ return Unpooled.copiedBuffer(toByteBufferWithExtra(str));
+ }
+
public ByteBuffer toByteBufferInternal(String str) {
StringBuffer sb = new StringBuffer();
sb.append(command + EOL);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6e5163a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java
index 707c8a1..78c9c4b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java
@@ -17,9 +17,8 @@
package org.apache.activemq.artemis.tests.integration.stomp.util;
import java.io.IOException;
-import java.net.InetSocketAddress;
+import java.net.URI;
import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
@@ -27,8 +26,15 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.transport.netty.NettyTransport;
+import org.apache.activemq.transport.netty.NettyTransportFactory;
+import org.apache.activemq.transport.netty.NettyTransportListener;
public abstract class AbstractStompClientConnection implements StompClientConnection {
@@ -39,41 +45,53 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
protected String username;
protected String passcode;
protected StompFrameFactory factory;
- protected final SocketChannel socketChannel;
+ protected NettyTransport transport;
protected ByteBuffer readBuffer;
protected List<Byte> receiveList;
protected BlockingQueue<ClientStompFrame> frameQueue = new LinkedBlockingQueue<>();
protected boolean connected = false;
protected int serverPingCounter;
- protected ReaderThread readerThread;
+ //protected ReaderThread readerThread;
+ protected String scheme;
+ @Deprecated
public AbstractStompClientConnection(String version, String host, int port) throws IOException {
this.version = version;
this.host = host;
this.port = port;
+ this.scheme = "tcp";
+
this.factory = StompFrameFactoryFactory.getFactory(version);
- socketChannel = SocketChannel.open();
- initSocket();
}
- private void initSocket() throws IOException {
- socketChannel.configureBlocking(true);
- InetSocketAddress remoteAddr = new InetSocketAddress(host, port);
- socketChannel.connect(remoteAddr);
-
- startReaderThread();
- }
+ public AbstractStompClientConnection(URI uri) throws Exception {
+ parseURI(uri);
+ this.factory = StompFrameFactoryFactory.getFactory(version);
- private void startReaderThread() {
readBuffer = ByteBuffer.allocateDirect(10240);
receiveList = new ArrayList<>(10240);
- readerThread = new ReaderThread();
- readerThread.start();
+ transport = NettyTransportFactory.createTransport(uri);
+ transport.setTransportListener(new StompTransportListener());
+ transport.connect();
+
+ Wait.waitFor(new Wait.Condition() {
+ @Override
+ public boolean isSatisfied() throws Exception {
+ return transport.isConnected();
+ }
+ }, 10000);
+
+ if (!transport.isConnected()) {
+ throw new RuntimeException("Could not connect transport");
+ }
}
- public void killReaderThread() {
- readerThread.stop();
+ private void parseURI(URI uri) {
+ scheme = uri.getScheme() == null ? "tcp" : uri.getScheme();
+ host = uri.getHost();
+ port = uri.getPort();
+ this.version = StompClientConnectionFactory.getStompVersionFromURI(uri);
}
private ClientStompFrame sendFrameInternal(ClientStompFrame frame, boolean wicked) throws IOException, InterruptedException {
@@ -85,8 +103,17 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
} else {
buffer = frame.toByteBuffer();
}
- while (buffer.remaining() > 0) {
- socketChannel.write(buffer);
+
+ ByteBuf buf = Unpooled.copiedBuffer(buffer);
+
+ try {
+ buf.retain();
+ ChannelFuture future = transport.send(buf);
+ if (future != null) {
+ future.awaitUninterruptibly();
+ }
+ } finally {
+ buf.release();
}
//now response
@@ -179,35 +206,78 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
}
protected void close() throws IOException {
- socketChannel.close();
+ transport.close();
}
- private class ReaderThread extends Thread {
+ private class StompTransportListener implements NettyTransportListener {
+ /**
+ * Called when new incoming data has become available.
+ *
+ * @param incoming the next incoming packet of data.
+ */
@Override
- public void run() {
- try {
- int n = socketChannel.read(readBuffer);
-
- while (n >= 0) {
- if (n > 0) {
- receiveBytes(n);
- }
- n = socketChannel.read(readBuffer);
- }
- //peer closed
- close();
-
- } catch (IOException e) {
- try {
- close();
- } catch (IOException e1) {
- //ignore
+ public void onData(ByteBuf incoming) {
+ while (incoming.readableBytes() > 0) {
+ int bytes = incoming.readableBytes();
+ if (incoming.readableBytes() < readBuffer.remaining()) {
+ ByteBuffer byteBuffer = ByteBuffer.allocate(incoming.readableBytes());
+ incoming.readBytes(byteBuffer);
+ byteBuffer.rewind();
+ readBuffer.put(byteBuffer);
+ receiveBytes(bytes);
+ } else {
+ incoming.readBytes(readBuffer);
+ receiveBytes(bytes - incoming.readableBytes());
}
}
}
+
+ /**
+ * Called if the connection state becomes closed.
+ */
+ @Override
+ public void onTransportClosed() {
+ }
+
+ /**
+ * Called when an error occurs during normal Transport operations.
+ *
+ * @param cause the error that triggered this event.
+ */
+ @Override
+ public void onTransportError(Throwable cause) {
+ throw new RuntimeException(cause);
+ }
}
+// private class ReaderThread extends Thread {
+//
+// @Override
+// public void run() {
+// try {
+// transport.setTransportListener();
+// int n = Z..read(readBuffer);
+//
+// while (n >= 0) {
+// if (n > 0) {
+// receiveBytes(n);
+// }
+// n = socketChannel.read(readBuffer);
+// }
+// //peer closed
+// close();
+//
+// } catch (IOException e) {
+// try {
+// close();
+// } catch (IOException e1) {
+// //ignore
+// }
+// }
+// }
+// }
+
@Override
public ClientStompFrame connect() throws Exception {
return connect(null, null);
@@ -230,7 +300,7 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
@Override
public boolean isConnected() {
- return connected && socketChannel.isConnected();
+ return connected && transport.isConnected();
}
@Override
@@ -243,6 +313,11 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
return this.frameQueue.size();
}
+ @Override
+ public void closeTransport() throws IOException {
+ transport.close();
+ }
+
protected class Pinger extends Thread {
long pingInterval;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6e5163a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrame.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrame.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrame.java
index 93801f9..1b77e12 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrame.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrame.java
@@ -18,6 +18,8 @@ package org.apache.activemq.artemis.tests.integration.stomp.util;
import java.nio.ByteBuffer;
+import io.netty.buffer.ByteBuf;
+
/**
* pls use factory to create frames.
*/
@@ -25,6 +27,8 @@ public interface ClientStompFrame {
ByteBuffer toByteBuffer();
+ ByteBuf toNettyByteBuf();
+
boolean needsReply();
ClientStompFrame setCommand(String command);
@@ -41,6 +45,8 @@ public interface ClientStompFrame {
ByteBuffer toByteBufferWithExtra(String str);
+ ByteBuf toNettyByteBufWithExtras(String str);
+
boolean isPing();
ClientStompFrame setForceOneway();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6e5163a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnection.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnection.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnection.java
index 7be09a5..012bb49 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnection.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnection.java
@@ -53,5 +53,6 @@ public interface StompClientConnection {
int getServerPingNumber();
+ void closeTransport() throws IOException;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6e5163a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionFactory.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionFactory.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionFactory.java
index 3a40c99..06d1845 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionFactory.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionFactory.java
@@ -17,9 +17,12 @@
package org.apache.activemq.artemis.tests.integration.stomp.util;
import java.io.IOException;
+import java.net.URI;
public class StompClientConnectionFactory {
+ public static final String LATEST_VERSION = "1.2";
+
//create a raw connection to the host.
public static StompClientConnection createClientConnection(String version,
String host,
@@ -36,6 +39,34 @@ public class StompClientConnectionFactory {
return null;
}
+ public static StompClientConnection createClientConnection(URI uri) throws Exception {
+ String version = getStompVersionFromURI(uri);
+ if ("1.0".equals(version)) {
+ return new StompClientConnectionV10(uri);
+ }
+ if ("1.1".equals(version)) {
+ return new StompClientConnectionV11(uri);
+ }
+ if ("1.2".equals(version)) {
+ return new StompClientConnectionV12(uri);
+ }
+ return null;
+ }
+
+ public static String getStompVersionFromURI(URI uri) {
+ String scheme = uri.getScheme();
+ if (scheme.contains("10")) {
+ return "1.0";
+ }
+ if (scheme.contains("11")) {
+ return "1.1";
+ }
+ if (scheme.contains("12")) {
+ return "1.2";
+ }
+ return LATEST_VERSION;
+ }
+
public static void main(String[] args) throws Exception {
StompClientConnection connection = StompClientConnectionFactory.createClientConnection("1.0", "localhost", 61613);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6e5163a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java
index d32823b..56c72db 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java
@@ -17,12 +17,14 @@
package org.apache.activemq.artemis.tests.integration.stomp.util;
import java.io.IOException;
+import java.net.URI;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
public class StompClientConnectionV10 extends AbstractStompClientConnection {
+
public StompClientConnectionV10(String host, int port) throws IOException {
super("1.0", host, port);
}
@@ -31,6 +33,10 @@ public class StompClientConnectionV10 extends AbstractStompClientConnection {
super(version, host, port);
}
+ public StompClientConnectionV10(URI uri) throws Exception {
+ super(uri);
+ }
+
@Override
public ClientStompFrame connect(String username, String passcode) throws IOException, InterruptedException {
return connect(username, passcode, null);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6e5163a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV11.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV11.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV11.java
index 6ef88cb..5f0cca3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV11.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV11.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.integration.stomp.util;
import java.io.IOException;
+import java.net.URI;
import java.util.UUID;
import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
@@ -31,6 +32,10 @@ public class StompClientConnectionV11 extends StompClientConnectionV10 {
super(version, host, port);
}
+ public StompClientConnectionV11(URI uri) throws Exception {
+ super(uri);
+ }
+
@Override
public ClientStompFrame connect(String username, String passcode, String clientID) throws IOException, InterruptedException {
ClientStompFrame frame = factory.newFrame(Stomp.Commands.CONNECT);
@@ -96,12 +101,16 @@ public class StompClientConnectionV11 extends StompClientConnectionV10 {
frame.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
- ClientStompFrame result = this.sendFrame(frame);
-
- if (result == null || (!Stomp.Responses.RECEIPT.equals(result.getCommand())) || (!uuid.equals(result.getHeader(Stomp.Headers.Response.RECEIPT_ID)))) {
- throw new IOException("Disconnect failed! " + result);
+ try {
+ if (!transport.isConnected()) {
+ ClientStompFrame result = this.sendFrame(frame);
+ if (result == null || (!Stomp.Responses.RECEIPT.equals(result.getCommand())) || (!uuid.equals(result.getHeader(Stomp.Headers.Response.RECEIPT_ID)))) {
+ throw new IOException("Disconnect failed! " + result);
+ }
+ }
+ } catch (Exception e) {
+ // Transport may have been closed
}
-
close();
connected = false;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6e5163a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV12.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV12.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV12.java
index 2d8f354..afa1f08 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV12.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV12.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.tests.integration.stomp.util;
import java.io.IOException;
+import java.net.URI;
public class StompClientConnectionV12 extends StompClientConnectionV11 {
@@ -24,6 +25,10 @@ public class StompClientConnectionV12 extends StompClientConnectionV11 {
super("1.2", host, port);
}
+ public StompClientConnectionV12(URI uri) throws Exception {
+ super(uri);
+ }
+
public ClientStompFrame createAnyFrame(String command) {
return factory.newAnyFrame(command);
}