You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/08/10 15:07:58 UTC

[2/2] activemq-artemis git commit: ARTEMIS-677 Support websocket subprotocol handshakes

ARTEMIS-677 Support websocket subprotocol handshakes


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7afd0fb0
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7afd0fb0
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7afd0fb0

Branch: refs/heads/master
Commit: 7afd0fb02842fc4317621a49c983e4abbe689a18
Parents: 2b71022
Author: Martyn Taylor <mt...@redhat.com>
Authored: Wed Aug 10 12:16:39 2016 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Aug 10 11:07:47 2016 -0400

----------------------------------------------------------------------
 .../protocol/proton/ProtonProtocolManager.java  |   8 +
 .../core/protocol/mqtt/MQTTProtocolManager.java |  14 +-
 .../openwire/OpenWireProtocolManager.java       |   8 +
 .../protocol/stomp/StompProtocolManager.java    |   9 +-
 .../artemis/core/protocol/ProtocolHandler.java  |  14 +-
 .../protocol/core/impl/CoreProtocolManager.java |   8 +
 .../protocol/stomp/WebSocketServerHandler.java  | 162 ------------------
 .../websocket/WebSocketServerHandler.java       | 170 +++++++++++++++++++
 .../spi/core/protocol/ProtocolManager.java      |   8 +
 .../impl/netty/NettyAcceptorFactoryTest.java    |   2 +-
 .../remoting/impl/netty/NettyAcceptorTest.java  |   2 +-
 11 files changed, 234 insertions(+), 171 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7afd0fb0/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
index edd8dd0..163fc6b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.protocol.proton;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.concurrent.Executor;
 
@@ -49,6 +50,8 @@ import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_MAX_FRAME
  */
 public class ProtonProtocolManager implements ProtocolManager<Interceptor>, NotificationListener {
 
+   private static final List<String> websocketRegistryNames = Arrays.asList("amqp");
+
    private final ActiveMQServer server;
 
    private MessageConverter protonConverter;
@@ -147,6 +150,11 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
    public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
    }
 
+   @Override
+   public List<String> websocketSubprotocolIdentifiers() {
+      return websocketRegistryNames;
+   }
+
    public String getPubSubPrefix() {
       return pubSubPrefix;
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7afd0fb0/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
index 1d38fcf..17f7e33 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
@@ -16,6 +16,10 @@
  */
 package org.apache.activemq.artemis.core.protocol.mqtt;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
 import io.netty.channel.ChannelPipeline;
 import io.netty.handler.codec.mqtt.MqttDecoder;
 import io.netty.handler.codec.mqtt.MqttEncoder;
@@ -34,15 +38,14 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
 import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
 import org.apache.activemq.artemis.spi.core.remoting.Connection;
 
-import java.util.ArrayList;
-import java.util.List;
-
 /**
  * MQTTProtocolManager
  */
 class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage,MQTTInterceptor,MQTTConnection>
         implements NotificationListener {
 
+   private static final List<String> websocketRegistryNames = Arrays.asList("mqtt", "mqttv3.1");
+
    private ActiveMQServer server;
 
    private MQTTLogger log = MQTTLogger.LOGGER;
@@ -138,6 +141,11 @@ class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage,MQTTInterc
    public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
    }
 
+   @Override
+   public List<String> websocketSubprotocolIdentifiers() {
+      return websocketRegistryNames;
+   }
+
    public void invokeIncoming(MqttMessage mqttMessage, MQTTConnection connection) {
       super.invokeInterceptors(this.incomingInterceptors, mqttMessage, connection);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7afd0fb0/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 50b047a..d8dd639 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -17,6 +17,7 @@
 package org.apache.activemq.artemis.core.protocol.openwire;
 
 import javax.jms.InvalidClientIDException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -74,6 +75,8 @@ import org.apache.activemq.util.LongSequenceGenerator;
 
 public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, ClusterTopologyListener {
 
+   private static final List<String> websocketRegistryNames = Collections.EMPTY_LIST;
+
    private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
    private static final IdGenerator ID_GENERATOR = new IdGenerator();
 
@@ -269,6 +272,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
    public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
    }
 
+   @Override
+   public List<String> websocketSubprotocolIdentifiers() {
+      return websocketRegistryNames;
+   }
+
    public void addConnection(OpenWireConnection connection, ConnectionInfo info) throws Exception {
       String username = info.getUserName();
       String password = info.getPassword();

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7afd0fb0/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index 9c92fd1..5de63d3 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.stomp;
 
 import javax.security.cert.X509Certificate;
 import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -56,9 +57,8 @@ import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProto
  * StompProtocolManager
  */
 public class StompProtocolManager extends AbstractProtocolManager<StompFrame,StompFrameInterceptor,StompConnection> {
-   // Constants -----------------------------------------------------
 
-   // Attributes ----------------------------------------------------
+   private static final List<String> websocketRegistryNames = Arrays.asList("v10.stomp", "v11.stomp", "v12.stomp");
 
    private final ActiveMQServer server;
 
@@ -192,6 +192,11 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame,Sto
       //Todo move handshake to here
    }
 
+   @Override
+   public List<String> websocketSubprotocolIdentifiers() {
+      return websocketRegistryNames;
+   }
+
    // Public --------------------------------------------------------
 
    public boolean send(final StompConnection connection, final StompFrame frame) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7afd0fb0/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
index f4fef21..b4d8de5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.protocol;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -45,7 +46,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
 import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
 import org.apache.activemq.artemis.core.remoting.impl.netty.PartialPooledByteBufAllocator;
 import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
-import org.apache.activemq.artemis.core.server.protocol.stomp.WebSocketServerHandler;
+import org.apache.activemq.artemis.core.server.protocol.websocket.WebSocketServerHandler;
 import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
 import org.apache.activemq.artemis.utils.ConfigurationHelper;
 
@@ -64,6 +65,8 @@ public class ProtocolHandler {
 
    private HttpKeepAliveRunnable httpKeepAliveRunnable;
 
+   private final List<String> websocketSubprotocolIds;
+
    public ProtocolHandler(Map<String, ProtocolManager> protocolMap,
                           NettyAcceptor nettyAcceptor,
                           final Map<String, Object> configuration,
@@ -72,6 +75,13 @@ public class ProtocolHandler {
       this.nettyAcceptor = nettyAcceptor;
       this.configuration = configuration;
       this.scheduledThreadPool = scheduledThreadPool;
+
+      websocketSubprotocolIds = new ArrayList<>();
+      for (ProtocolManager pm : protocolMap.values()) {
+         if (pm.websocketSubprotocolIdentifiers() != null) {
+            websocketSubprotocolIds.addAll(pm.websocketSubprotocolIdentifiers());
+         }
+      }
    }
 
    public ChannelHandler getProtocolDecoder() {
@@ -106,7 +116,7 @@ public class ProtocolHandler {
             HttpHeaders headers = request.headers();
             String upgrade = headers.get("upgrade");
             if (upgrade != null && upgrade.equalsIgnoreCase("websocket")) {
-               ctx.pipeline().addLast("websocket-handler", new WebSocketServerHandler());
+               ctx.pipeline().addLast("websocket-handler", new WebSocketServerHandler(websocketSubprotocolIds));
                ctx.pipeline().addLast(new ProtocolDecoder(false, false));
                ctx.pipeline().remove(this);
                ctx.pipeline().remove("http-handler");

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7afd0fb0/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
index 410b0c0..69db679 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.artemis.core.protocol.core.impl;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
@@ -63,6 +64,8 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> {
 
    private static final Logger logger = Logger.getLogger(CoreProtocolManager.class);
 
+   private static final List<String> websocketRegistryNames = Collections.EMPTY_LIST;
+
    private final ActiveMQServer server;
 
    private final List<Interceptor> incomingInterceptors;
@@ -181,6 +184,11 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> {
       }
    }
 
+   @Override
+   public List<String> websocketSubprotocolIdentifiers() {
+      return websocketRegistryNames;
+   }
+
    private boolean isArtemis(ActiveMQBuffer buffer) {
       return buffer.getByte(0) == 'A' &&
          buffer.getByte(1) == 'R' &&

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7afd0fb0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/stomp/WebSocketServerHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/stomp/WebSocketServerHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/stomp/WebSocketServerHandler.java
deleted file mode 100644
index e276047..0000000
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/stomp/WebSocketServerHandler.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.server.protocol.stomp;
-
-import java.nio.charset.StandardCharsets;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelOutboundHandlerAdapter;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.handler.codec.http.DefaultFullHttpResponse;
-import io.netty.handler.codec.http.FullHttpRequest;
-import io.netty.handler.codec.http.FullHttpResponse;
-import io.netty.handler.codec.http.HttpHeaders;
-import io.netty.handler.codec.http.HttpRequest;
-import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.WebSocketFrame;
-import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
-import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
-
-import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive;
-import static io.netty.handler.codec.http.HttpHeaders.setContentLength;
-import static io.netty.handler.codec.http.HttpMethod.GET;
-import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
-import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
-public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
-
-   private static final String WEBSOCKET_PATH = "/stomp";
-
-   private HttpRequest httpRequest;
-   private WebSocketServerHandshaker handshaker;
-   private static final BinaryWebSocketEncoder BINARY_WEBSOCKET_ENCODER = new BinaryWebSocketEncoder();
-
-   @Override
-   public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
-      if (msg instanceof FullHttpRequest) {
-         handleHttpRequest(ctx, (FullHttpRequest) msg);
-      }
-      else if (msg instanceof WebSocketFrame) {
-         WebSocketFrame frame = (WebSocketFrame) msg;
-         boolean handle = handleWebSocketFrame(ctx, frame);
-         if (handle) {
-            ctx.fireChannelRead(frame.content().retain());
-         }
-      }
-   }
-
-   private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {
-      // Allow only GET methods.
-      if (req.getMethod() != GET) {
-         sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN));
-         return;
-      }
-
-      // Handshake
-      WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(this.getWebSocketLocation(req), "v10.stomp,v11.stomp", false);
-      this.httpRequest = req;
-      this.handshaker = wsFactory.newHandshaker(req);
-      if (this.handshaker == null) {
-         WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
-      }
-      else {
-         ChannelFuture handshake = this.handshaker.handshake(ctx.channel(), req);
-         handshake.addListener(new ChannelFutureListener() {
-
-            @Override
-            public void operationComplete(ChannelFuture future) throws Exception {
-               if (future.isSuccess()) {
-                  // we need to insert an encoder that takes the underlying ChannelBuffer of a StompFrame.toActiveMQBuffer and
-                  // wrap it in a binary web socket frame before letting the wsencoder send it on the wire
-                  future.channel().pipeline().addAfter("wsencoder", "binary-websocket-encoder", BINARY_WEBSOCKET_ENCODER);
-               }
-               else {
-                  // Handshake failed, fire an exceptionCaught event
-                  future.channel().pipeline().fireExceptionCaught(future.cause());
-               }
-            }
-         });
-      }
-   }
-
-   private boolean handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
-
-      // Check for closing frame
-      if (frame instanceof CloseWebSocketFrame) {
-         this.handshaker.close(ctx.channel(), ((CloseWebSocketFrame) frame).retain());
-         return false;
-      }
-      else if (frame instanceof PingWebSocketFrame) {
-         ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
-         return false;
-      }
-      else if (!(frame instanceof TextWebSocketFrame) && !(frame instanceof BinaryWebSocketFrame)) {
-         throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));
-      }
-      return true;
-   }
-
-   private void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, FullHttpResponse res) {
-      // Generate an error page if response status code is not OK (200).
-      if (res.getStatus().code() != 200) {
-         res.content().writeBytes(res.getStatus().toString().getBytes(StandardCharsets.UTF_8));
-         setContentLength(res, res.content().readableBytes());
-      }
-
-      // Send the response and close the connection if necessary.
-      ChannelFuture f = ctx.writeAndFlush(res);
-      if (!isKeepAlive(req) || res.getStatus().code() != 200) {
-         f.addListener(ChannelFutureListener.CLOSE);
-      }
-   }
-
-   @Override
-   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
-      cause.printStackTrace();
-      ctx.close();
-   }
-
-   private String getWebSocketLocation(HttpRequest req) {
-      return "ws://" + req.headers().get(HttpHeaders.Names.HOST) + WEBSOCKET_PATH;
-   }
-
-   public HttpRequest getHttpRequest() {
-      return this.httpRequest;
-   }
-
-   @Sharable
-   private static final class BinaryWebSocketEncoder extends ChannelOutboundHandlerAdapter {
-
-      @Override
-      public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
-         if (msg instanceof ByteBuf) {
-            msg = new BinaryWebSocketFrame((ByteBuf) msg);
-         }
-
-         ctx.write(msg, promise);
-      }
-
-   }
-}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7afd0fb0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandler.java
new file mode 100644
index 0000000..671cb76
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandler.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.protocol.websocket;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.WebSocketFrame;
+import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
+import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
+import org.apache.activemq.artemis.utils.StringUtil;
+
+import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive;
+import static io.netty.handler.codec.http.HttpHeaders.setContentLength;
+import static io.netty.handler.codec.http.HttpMethod.GET;
+import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
+
+   private static final String WEBSOCKET_PATH = "/stomp";
+
+   private HttpRequest httpRequest;
+   private WebSocketServerHandshaker handshaker;
+   private List<String> supportedProtocols;
+   private static final BinaryWebSocketEncoder BINARY_WEBSOCKET_ENCODER = new BinaryWebSocketEncoder();
+
+   public WebSocketServerHandler(List<String> supportedProtocols) {
+      this.supportedProtocols = supportedProtocols;
+   }
+
+   @Override
+   public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
+      if (msg instanceof FullHttpRequest) {
+         handleHttpRequest(ctx, (FullHttpRequest) msg);
+      }
+      else if (msg instanceof WebSocketFrame) {
+         WebSocketFrame frame = (WebSocketFrame) msg;
+         boolean handle = handleWebSocketFrame(ctx, frame);
+         if (handle) {
+            ctx.fireChannelRead(frame.content().retain());
+         }
+      }
+   }
+
+   private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {
+      // Allow only GET methods.
+      if (req.getMethod() != GET) {
+         sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN));
+         return;
+      }
+
+      // Handshake
+      String supportedProtocolsCSV = StringUtil.joinStringList(supportedProtocols, ",");
+      WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(this.getWebSocketLocation(req), supportedProtocolsCSV,false);
+      this.httpRequest = req;
+      this.handshaker = wsFactory.newHandshaker(req);
+      if (this.handshaker == null) {
+         WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
+      }
+      else {
+         ChannelFuture handshake = this.handshaker.handshake(ctx.channel(), req);
+         handshake.addListener(new ChannelFutureListener() {
+
+            @Override
+            public void operationComplete(ChannelFuture future) throws Exception {
+               if (future.isSuccess()) {
+                  // we need to insert an encoder that takes the underlying ChannelBuffer of a StompFrame.toActiveMQBuffer and
+                  // wrap it in a binary web socket frame before letting the wsencoder send it on the wire
+                  future.channel().pipeline().addAfter("wsencoder", "binary-websocket-encoder", BINARY_WEBSOCKET_ENCODER);
+               }
+               else {
+                  // Handshake failed, fire an exceptionCaught event
+                  future.channel().pipeline().fireExceptionCaught(future.cause());
+               }
+            }
+         });
+      }
+   }
+
+   private boolean handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
+
+      // Check for closing frame
+      if (frame instanceof CloseWebSocketFrame) {
+         this.handshaker.close(ctx.channel(), ((CloseWebSocketFrame) frame).retain());
+         return false;
+      }
+      else if (frame instanceof PingWebSocketFrame) {
+         ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
+         return false;
+      }
+      else if (!(frame instanceof TextWebSocketFrame) && !(frame instanceof BinaryWebSocketFrame)) {
+         throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));
+      }
+      return true;
+   }
+
+   private void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, FullHttpResponse res) {
+      // Generate an error page if response status code is not OK (200).
+      if (res.getStatus().code() != 200) {
+         res.content().writeBytes(res.getStatus().toString().getBytes(StandardCharsets.UTF_8));
+         setContentLength(res, res.content().readableBytes());
+      }
+
+      // Send the response and close the connection if necessary.
+      ChannelFuture f = ctx.writeAndFlush(res);
+      if (!isKeepAlive(req) || res.getStatus().code() != 200) {
+         f.addListener(ChannelFutureListener.CLOSE);
+      }
+   }
+
+   @Override
+   public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+      cause.printStackTrace();
+      ctx.close();
+   }
+
+   private String getWebSocketLocation(HttpRequest req) {
+      return "ws://" + req.headers().get(HttpHeaders.Names.HOST) + WEBSOCKET_PATH;
+   }
+
+   public HttpRequest getHttpRequest() {
+      return this.httpRequest;
+   }
+
+   @Sharable
+   private static final class BinaryWebSocketEncoder extends ChannelOutboundHandlerAdapter {
+
+      @Override
+      public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+         if (msg instanceof ByteBuf) {
+            msg = new BinaryWebSocketFrame((ByteBuf) msg);
+         }
+
+         ctx.write(msg, promise);
+      }
+
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7afd0fb0/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java
index 3de5d5d..62befaf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java
@@ -61,4 +61,12 @@ public interface ProtocolManager<P extends BaseInterceptor> {
    boolean acceptsNoHandshake();
 
    void handshake(NettyServerConnection connection, ActiveMQBuffer buffer);
+
+   /**
+    * A list of the IANA websocket subprotocol identifiers supported by this protocol manager.  These are used
+    * during the websocket subprotocol handshake.
+    *
+    * @return A list of subprotocol ids
+    */
+   List<String> websocketSubprotocolIdentifiers();
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7afd0fb0/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
index 46d7aa3..1e6a21b 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
@@ -72,7 +72,7 @@ public class NettyAcceptorFactoryTest extends ActiveMQTestBase {
 
       };
 
-      Acceptor acceptor = factory.createAcceptor("netty", null, params, handler, listener, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(), ActiveMQThreadFactory.defaultThreadFactory()), null);
+      Acceptor acceptor = factory.createAcceptor("netty", null, params, handler, listener, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(), ActiveMQThreadFactory.defaultThreadFactory()), new HashMap<String, ProtocolManager>());
 
       Assert.assertTrue(acceptor instanceof NettyAcceptor);
    }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7afd0fb0/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
index 1379628..7f410ef 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
@@ -95,7 +95,7 @@ public class NettyAcceptorTest extends ActiveMQTestBase {
          }
       };
       pool2 = Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(), ActiveMQThreadFactory.defaultThreadFactory());
-      NettyAcceptor acceptor = new NettyAcceptor("netty", null, params, handler, listener, pool2, null);
+      NettyAcceptor acceptor = new NettyAcceptor("netty", null, params, handler, listener, pool2, new HashMap<String, ProtocolManager>());
 
       addActiveMQComponent(acceptor);
       acceptor.start();