You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2017/11/13 22:02:44 UTC

[5/8] activemq-artemis git commit: ARTEMIS-1511 Enable WebSocket Transport in STOMP test client

ARTEMIS-1511 Enable WebSocket Transport in STOMP test client


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c6e5163a
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c6e5163a
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c6e5163a

Branch: refs/heads/master
Commit: c6e5163a5189f5584746f774ae5ea97b82751aef
Parents: 5211afd
Author: Martyn Taylor <mt...@redhat.com>
Authored: Fri Nov 10 12:33:31 2017 +0000
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Nov 13 16:55:47 2017 -0500

----------------------------------------------------------------------
 .../stomp/util/AbstractClientStompFrame.java    |  12 ++
 .../util/AbstractStompClientConnection.java     | 157 ++++++++++++++-----
 .../stomp/util/ClientStompFrame.java            |   6 +
 .../stomp/util/StompClientConnection.java       |   1 +
 .../util/StompClientConnectionFactory.java      |  31 ++++
 .../stomp/util/StompClientConnectionV10.java    |   6 +
 .../stomp/util/StompClientConnectionV11.java    |  19 ++-
 .../stomp/util/StompClientConnectionV12.java    |   5 +
 8 files changed, 191 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6e5163a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractClientStompFrame.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractClientStompFrame.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractClientStompFrame.java
index c48fd8d..2f8a11f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractClientStompFrame.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractClientStompFrame.java
@@ -24,6 +24,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
 
 public abstract class AbstractClientStompFrame implements ClientStompFrame {
@@ -88,6 +90,16 @@ public abstract class AbstractClientStompFrame implements ClientStompFrame {
       return toByteBufferInternal(str);
    }
 
+   @Override
+   public ByteBuf toNettyByteBuf() {
+      return Unpooled.copiedBuffer(toByteBuffer());
+   }
+
+   @Override
+   public ByteBuf toNettyByteBufWithExtras(String str) {
+      return Unpooled.copiedBuffer(toByteBufferWithExtra(str));
+   }
+
    public ByteBuffer toByteBufferInternal(String str) {
       StringBuffer sb = new StringBuffer();
       sb.append(command + EOL);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6e5163a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java
index 707c8a1..78c9c4b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/AbstractStompClientConnection.java
@@ -17,9 +17,8 @@
 package org.apache.activemq.artemis.tests.integration.stomp.util;
 
 import java.io.IOException;
-import java.net.InetSocketAddress;
+import java.net.URI;
 import java.nio.ByteBuffer;
-import java.nio.channels.SocketChannel;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.List;
@@ -27,8 +26,15 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
 import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.artemis.tests.util.Wait;
+import org.apache.activemq.transport.netty.NettyTransport;
+import org.apache.activemq.transport.netty.NettyTransportFactory;
+import org.apache.activemq.transport.netty.NettyTransportListener;
 
 public abstract class AbstractStompClientConnection implements StompClientConnection {
 
@@ -39,41 +45,53 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
    protected String username;
    protected String passcode;
    protected StompFrameFactory factory;
-   protected final SocketChannel socketChannel;
+   protected NettyTransport transport;
    protected ByteBuffer readBuffer;
    protected List<Byte> receiveList;
    protected BlockingQueue<ClientStompFrame> frameQueue = new LinkedBlockingQueue<>();
    protected boolean connected = false;
    protected int serverPingCounter;
-   protected ReaderThread readerThread;
+   //protected ReaderThread readerThread;
+   protected String scheme;
 
+   @Deprecated
    public AbstractStompClientConnection(String version, String host, int port) throws IOException {
       this.version = version;
       this.host = host;
       this.port = port;
+      this.scheme = "tcp";
+
       this.factory = StompFrameFactoryFactory.getFactory(version);
-      socketChannel = SocketChannel.open();
-      initSocket();
    }
 
-   private void initSocket() throws IOException {
-      socketChannel.configureBlocking(true);
-      InetSocketAddress remoteAddr = new InetSocketAddress(host, port);
-      socketChannel.connect(remoteAddr);
-
-      startReaderThread();
-   }
+   public AbstractStompClientConnection(URI uri) throws Exception {
+      parseURI(uri);
+      this.factory = StompFrameFactoryFactory.getFactory(version);
 
-   private void startReaderThread() {
       readBuffer = ByteBuffer.allocateDirect(10240);
       receiveList = new ArrayList<>(10240);
 
-      readerThread = new ReaderThread();
-      readerThread.start();
+      transport = NettyTransportFactory.createTransport(uri);
+      transport.setTransportListener(new StompTransportListener());
+      transport.connect();
+
+      Wait.waitFor(new Wait.Condition() {
+         @Override
+         public boolean isSatisfied() throws Exception {
+            return transport.isConnected();
+         }
+      }, 10000);
+
+      if (!transport.isConnected()) {
+         throw new RuntimeException("Could not connect transport");
+      }
    }
 
-   public void killReaderThread() {
-      readerThread.stop();
+   private void parseURI(URI uri) {
+      scheme = uri.getScheme() == null ? "tcp" : uri.getScheme();
+      host = uri.getHost();
+      port = uri.getPort();
+      this.version = StompClientConnectionFactory.getStompVersionFromURI(uri);
    }
 
    private ClientStompFrame sendFrameInternal(ClientStompFrame frame, boolean wicked) throws IOException, InterruptedException {
@@ -85,8 +103,17 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
       } else {
          buffer = frame.toByteBuffer();
       }
-      while (buffer.remaining() > 0) {
-         socketChannel.write(buffer);
+
+      ByteBuf buf = Unpooled.copiedBuffer(buffer);
+
+      try {
+         buf.retain();
+         ChannelFuture future = transport.send(buf);
+         if (future != null) {
+            future.awaitUninterruptibly();
+         }
+      } finally {
+         buf.release();
       }
 
       //now response
@@ -179,35 +206,78 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
    }
 
    protected void close() throws IOException {
-      socketChannel.close();
+      transport.close();
    }
 
-   private class ReaderThread extends Thread {
+   private class StompTransportListener implements NettyTransportListener {
 
+      /**
+       * Called when new incoming data has become available.
+       *
+       * @param incoming the next incoming packet of data.
+       */
       @Override
-      public void run() {
-         try {
-            int n = socketChannel.read(readBuffer);
-
-            while (n >= 0) {
-               if (n > 0) {
-                  receiveBytes(n);
-               }
-               n = socketChannel.read(readBuffer);
-            }
-            //peer closed
-            close();
-
-         } catch (IOException e) {
-            try {
-               close();
-            } catch (IOException e1) {
-               //ignore
+      public void onData(ByteBuf incoming) {
+         while (incoming.readableBytes() > 0) {
+            int bytes = incoming.readableBytes();
+            if (incoming.readableBytes() < readBuffer.remaining()) {
+               ByteBuffer byteBuffer = ByteBuffer.allocate(incoming.readableBytes());
+               incoming.readBytes(byteBuffer);
+               byteBuffer.rewind();
+               readBuffer.put(byteBuffer);
+               receiveBytes(bytes);
+            } else {
+               incoming.readBytes(readBuffer);
+               receiveBytes(bytes - incoming.readableBytes());
             }
          }
       }
+
+      /**
+       * Called if the connection state becomes closed.
+       */
+      @Override
+      public void onTransportClosed() {
+      }
+
+      /**
+       * Called when an error occurs during normal Transport operations.
+       *
+       * @param cause the error that triggered this event.
+       */
+      @Override
+      public void onTransportError(Throwable cause) {
+         throw new RuntimeException(cause);
+      }
    }
 
+//   private class ReaderThread extends Thread {
+//
+//      @Override
+//      public void run() {
+//         try {
+//            transport.setTransportListener();
+//            int n = Z..read(readBuffer);
+//
+//            while (n >= 0) {
+//               if (n > 0) {
+//                  receiveBytes(n);
+//               }
+//               n = socketChannel.read(readBuffer);
+//            }
+//            //peer closed
+//            close();
+//
+//         } catch (IOException e) {
+//            try {
+//               close();
+//            } catch (IOException e1) {
+//               //ignore
+//            }
+//         }
+//      }
+//   }
+
    @Override
    public ClientStompFrame connect() throws Exception {
       return connect(null, null);
@@ -230,7 +300,7 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
 
    @Override
    public boolean isConnected() {
-      return connected && socketChannel.isConnected();
+      return connected && transport.isConnected();
    }
 
    @Override
@@ -243,6 +313,11 @@ public abstract class AbstractStompClientConnection implements StompClientConnec
       return this.frameQueue.size();
    }
 
+   @Override
+   public void closeTransport() throws IOException {
+      transport.close();
+   }
+
    protected class Pinger extends Thread {
 
       long pingInterval;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6e5163a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrame.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrame.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrame.java
index 93801f9..1b77e12 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrame.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/ClientStompFrame.java
@@ -18,6 +18,8 @@ package org.apache.activemq.artemis.tests.integration.stomp.util;
 
 import java.nio.ByteBuffer;
 
+import io.netty.buffer.ByteBuf;
+
 /**
  * pls use factory to create frames.
  */
@@ -25,6 +27,8 @@ public interface ClientStompFrame {
 
    ByteBuffer toByteBuffer();
 
+   ByteBuf toNettyByteBuf();
+
    boolean needsReply();
 
    ClientStompFrame setCommand(String command);
@@ -41,6 +45,8 @@ public interface ClientStompFrame {
 
    ByteBuffer toByteBufferWithExtra(String str);
 
+   ByteBuf toNettyByteBufWithExtras(String str);
+
    boolean isPing();
 
    ClientStompFrame setForceOneway();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6e5163a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnection.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnection.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnection.java
index 7be09a5..012bb49 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnection.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnection.java
@@ -53,5 +53,6 @@ public interface StompClientConnection {
 
    int getServerPingNumber();
 
+   void closeTransport() throws IOException;
 }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6e5163a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionFactory.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionFactory.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionFactory.java
index 3a40c99..06d1845 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionFactory.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionFactory.java
@@ -17,9 +17,12 @@
 package org.apache.activemq.artemis.tests.integration.stomp.util;
 
 import java.io.IOException;
+import java.net.URI;
 
 public class StompClientConnectionFactory {
 
+   public static final String LATEST_VERSION = "1.2";
+
    //create a raw connection to the host.
    public static StompClientConnection createClientConnection(String version,
                                                               String host,
@@ -36,6 +39,34 @@ public class StompClientConnectionFactory {
       return null;
    }
 
+   public static StompClientConnection createClientConnection(URI uri) throws Exception {
+      String version = getStompVersionFromURI(uri);
+      if ("1.0".equals(version)) {
+         return new StompClientConnectionV10(uri);
+      }
+      if ("1.1".equals(version)) {
+         return new StompClientConnectionV11(uri);
+      }
+      if ("1.2".equals(version)) {
+         return new StompClientConnectionV12(uri);
+      }
+      return null;
+   }
+
+   public static String getStompVersionFromURI(URI uri) {
+      String scheme = uri.getScheme();
+      if (scheme.contains("10")) {
+         return "1.0";
+      }
+      if (scheme.contains("11")) {
+         return "1.1";
+      }
+      if (scheme.contains("12")) {
+         return "1.2";
+      }
+      return LATEST_VERSION;
+   }
+
    public static void main(String[] args) throws Exception {
       StompClientConnection connection = StompClientConnectionFactory.createClientConnection("1.0", "localhost", 61613);
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6e5163a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java
index d32823b..56c72db 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV10.java
@@ -17,12 +17,14 @@
 package org.apache.activemq.artemis.tests.integration.stomp.util;
 
 import java.io.IOException;
+import java.net.URI;
 
 import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
 import org.apache.activemq.artemis.tests.integration.IntegrationTestLogger;
 
 public class StompClientConnectionV10 extends AbstractStompClientConnection {
 
+
    public StompClientConnectionV10(String host, int port) throws IOException {
       super("1.0", host, port);
    }
@@ -31,6 +33,10 @@ public class StompClientConnectionV10 extends AbstractStompClientConnection {
       super(version, host, port);
    }
 
+   public StompClientConnectionV10(URI uri) throws Exception {
+      super(uri);
+   }
+
    @Override
    public ClientStompFrame connect(String username, String passcode) throws IOException, InterruptedException {
       return connect(username, passcode, null);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6e5163a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV11.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV11.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV11.java
index 6ef88cb..5f0cca3 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV11.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV11.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.tests.integration.stomp.util;
 
 import java.io.IOException;
+import java.net.URI;
 import java.util.UUID;
 
 import org.apache.activemq.artemis.core.protocol.stomp.Stomp;
@@ -31,6 +32,10 @@ public class StompClientConnectionV11 extends StompClientConnectionV10 {
       super(version, host, port);
    }
 
+   public StompClientConnectionV11(URI uri) throws Exception {
+      super(uri);
+   }
+
    @Override
    public ClientStompFrame connect(String username, String passcode, String clientID) throws IOException, InterruptedException {
       ClientStompFrame frame = factory.newFrame(Stomp.Commands.CONNECT);
@@ -96,12 +101,16 @@ public class StompClientConnectionV11 extends StompClientConnectionV10 {
 
       frame.addHeader(Stomp.Headers.RECEIPT_REQUESTED, uuid);
 
-      ClientStompFrame result = this.sendFrame(frame);
-
-      if (result == null || (!Stomp.Responses.RECEIPT.equals(result.getCommand())) || (!uuid.equals(result.getHeader(Stomp.Headers.Response.RECEIPT_ID)))) {
-         throw new IOException("Disconnect failed! " + result);
+      try {
+         if (!transport.isConnected()) {
+            ClientStompFrame result = this.sendFrame(frame);
+            if (result == null || (!Stomp.Responses.RECEIPT.equals(result.getCommand())) || (!uuid.equals(result.getHeader(Stomp.Headers.Response.RECEIPT_ID)))) {
+               throw new IOException("Disconnect failed! " + result);
+            }
+         }
+      } catch (Exception e) {
+         // Transport may have been closed
       }
-
       close();
 
       connected = false;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/c6e5163a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV12.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV12.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV12.java
index 2d8f354..afa1f08 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV12.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/stomp/util/StompClientConnectionV12.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.tests.integration.stomp.util;
 
 import java.io.IOException;
+import java.net.URI;
 
 public class StompClientConnectionV12 extends StompClientConnectionV11 {
 
@@ -24,6 +25,10 @@ public class StompClientConnectionV12 extends StompClientConnectionV11 {
       super("1.2", host, port);
    }
 
+   public StompClientConnectionV12(URI uri) throws Exception {
+      super(uri);
+   }
+
    public ClientStompFrame createAnyFrame(String command) {
       return factory.newAnyFrame(command);
    }