You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/08/10 15:07:58 UTC
[2/2] activemq-artemis git commit: ARTEMIS-677 Support websocket
subprotocol handshakes
ARTEMIS-677 Support websocket subprotocol handshakes
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/7afd0fb0
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/7afd0fb0
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/7afd0fb0
Branch: refs/heads/master
Commit: 7afd0fb02842fc4317621a49c983e4abbe689a18
Parents: 2b71022
Author: Martyn Taylor <mt...@redhat.com>
Authored: Wed Aug 10 12:16:39 2016 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Aug 10 11:07:47 2016 -0400
----------------------------------------------------------------------
.../protocol/proton/ProtonProtocolManager.java | 8 +
.../core/protocol/mqtt/MQTTProtocolManager.java | 14 +-
.../openwire/OpenWireProtocolManager.java | 8 +
.../protocol/stomp/StompProtocolManager.java | 9 +-
.../artemis/core/protocol/ProtocolHandler.java | 14 +-
.../protocol/core/impl/CoreProtocolManager.java | 8 +
.../protocol/stomp/WebSocketServerHandler.java | 162 ------------------
.../websocket/WebSocketServerHandler.java | 170 +++++++++++++++++++
.../spi/core/protocol/ProtocolManager.java | 8 +
.../impl/netty/NettyAcceptorFactoryTest.java | 2 +-
.../remoting/impl/netty/NettyAcceptorTest.java | 2 +-
11 files changed, 234 insertions(+), 171 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7afd0fb0/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
index edd8dd0..163fc6b 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/ProtonProtocolManager.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.protocol.proton;
+import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Executor;
@@ -49,6 +50,8 @@ import static org.proton.plug.context.AMQPConstants.Connection.DEFAULT_MAX_FRAME
*/
public class ProtonProtocolManager implements ProtocolManager<Interceptor>, NotificationListener {
+ private static final List<String> websocketRegistryNames = Arrays.asList("amqp");
+
private final ActiveMQServer server;
private MessageConverter protonConverter;
@@ -147,6 +150,11 @@ public class ProtonProtocolManager implements ProtocolManager<Interceptor>, Noti
public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
}
+ @Override
+ public List<String> websocketSubprotocolIdentifiers() {
+ return websocketRegistryNames;
+ }
+
public String getPubSubPrefix() {
return pubSubPrefix;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7afd0fb0/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
index 1d38fcf..17f7e33 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolManager.java
@@ -16,6 +16,10 @@
*/
package org.apache.activemq.artemis.core.protocol.mqtt;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
@@ -34,15 +38,14 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
-import java.util.ArrayList;
-import java.util.List;
-
/**
* MQTTProtocolManager
*/
class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage,MQTTInterceptor,MQTTConnection>
implements NotificationListener {
+ private static final List<String> websocketRegistryNames = Arrays.asList("mqtt", "mqttv3.1");
+
private ActiveMQServer server;
private MQTTLogger log = MQTTLogger.LOGGER;
@@ -138,6 +141,11 @@ class MQTTProtocolManager extends AbstractProtocolManager<MqttMessage,MQTTInterc
public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
}
+ @Override
+ public List<String> websocketSubprotocolIdentifiers() {
+ return websocketRegistryNames;
+ }
+
public void invokeIncoming(MqttMessage mqttMessage, MQTTConnection connection) {
super.invokeInterceptors(this.incomingInterceptors, mqttMessage, connection);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7afd0fb0/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 50b047a..d8dd639 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -17,6 +17,7 @@
package org.apache.activemq.artemis.core.protocol.openwire;
import javax.jms.InvalidClientIDException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
@@ -74,6 +75,8 @@ import org.apache.activemq.util.LongSequenceGenerator;
public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, ClusterTopologyListener {
+ private static final List<String> websocketRegistryNames = Collections.EMPTY_LIST;
+
private static final IdGenerator BROKER_ID_GENERATOR = new IdGenerator();
private static final IdGenerator ID_GENERATOR = new IdGenerator();
@@ -269,6 +272,11 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, Cl
public void handshake(NettyServerConnection connection, ActiveMQBuffer buffer) {
}
+ @Override
+ public List<String> websocketSubprotocolIdentifiers() {
+ return websocketRegistryNames;
+ }
+
public void addConnection(OpenWireConnection connection, ConnectionInfo info) throws Exception {
String username = info.getUserName();
String password = info.getPassword();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7afd0fb0/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
index 9c92fd1..5de63d3 100644
--- a/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
+++ b/artemis-protocols/artemis-stomp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/stomp/StompProtocolManager.java
@@ -18,6 +18,7 @@ package org.apache.activemq.artemis.core.protocol.stomp;
import javax.security.cert.X509Certificate;
import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -56,9 +57,8 @@ import static org.apache.activemq.artemis.core.protocol.stomp.ActiveMQStompProto
* StompProtocolManager
*/
public class StompProtocolManager extends AbstractProtocolManager<StompFrame,StompFrameInterceptor,StompConnection> {
- // Constants -----------------------------------------------------
- // Attributes ----------------------------------------------------
+ private static final List<String> websocketRegistryNames = Arrays.asList("v10.stomp", "v11.stomp", "v12.stomp");
private final ActiveMQServer server;
@@ -192,6 +192,11 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame,Sto
//Todo move handshake to here
}
+ @Override
+ public List<String> websocketSubprotocolIdentifiers() {
+ return websocketRegistryNames;
+ }
+
// Public --------------------------------------------------------
public boolean send(final StompConnection connection, final StompFrame frame) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7afd0fb0/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
index f4fef21..b4d8de5 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/ProtocolHandler.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.protocol;
+import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -45,7 +46,7 @@ import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnector;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
import org.apache.activemq.artemis.core.remoting.impl.netty.PartialPooledByteBufAllocator;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
-import org.apache.activemq.artemis.core.server.protocol.stomp.WebSocketServerHandler;
+import org.apache.activemq.artemis.core.server.protocol.websocket.WebSocketServerHandler;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
@@ -64,6 +65,8 @@ public class ProtocolHandler {
private HttpKeepAliveRunnable httpKeepAliveRunnable;
+ private final List<String> websocketSubprotocolIds;
+
public ProtocolHandler(Map<String, ProtocolManager> protocolMap,
NettyAcceptor nettyAcceptor,
final Map<String, Object> configuration,
@@ -72,6 +75,13 @@ public class ProtocolHandler {
this.nettyAcceptor = nettyAcceptor;
this.configuration = configuration;
this.scheduledThreadPool = scheduledThreadPool;
+
+ websocketSubprotocolIds = new ArrayList<>();
+ for (ProtocolManager pm : protocolMap.values()) {
+ if (pm.websocketSubprotocolIdentifiers() != null) {
+ websocketSubprotocolIds.addAll(pm.websocketSubprotocolIdentifiers());
+ }
+ }
}
public ChannelHandler getProtocolDecoder() {
@@ -106,7 +116,7 @@ public class ProtocolHandler {
HttpHeaders headers = request.headers();
String upgrade = headers.get("upgrade");
if (upgrade != null && upgrade.equalsIgnoreCase("websocket")) {
- ctx.pipeline().addLast("websocket-handler", new WebSocketServerHandler());
+ ctx.pipeline().addLast("websocket-handler", new WebSocketServerHandler(websocketSubprotocolIds));
ctx.pipeline().addLast(new ProtocolDecoder(false, false));
ctx.pipeline().remove(this);
ctx.pipeline().remove("http-handler");
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7afd0fb0/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
index 410b0c0..69db679 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/CoreProtocolManager.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.protocol.core.impl;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
@@ -63,6 +64,8 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> {
private static final Logger logger = Logger.getLogger(CoreProtocolManager.class);
+ private static final List<String> websocketRegistryNames = Collections.EMPTY_LIST;
+
private final ActiveMQServer server;
private final List<Interceptor> incomingInterceptors;
@@ -181,6 +184,11 @@ public class CoreProtocolManager implements ProtocolManager<Interceptor> {
}
}
+ @Override
+ public List<String> websocketSubprotocolIdentifiers() {
+ return websocketRegistryNames;
+ }
+
private boolean isArtemis(ActiveMQBuffer buffer) {
return buffer.getByte(0) == 'A' &&
buffer.getByte(1) == 'R' &&
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7afd0fb0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/stomp/WebSocketServerHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/stomp/WebSocketServerHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/stomp/WebSocketServerHandler.java
deleted file mode 100644
index e276047..0000000
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/stomp/WebSocketServerHandler.java
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.artemis.core.server.protocol.stomp;
-
-import java.nio.charset.StandardCharsets;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.ChannelFuture;
-import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelOutboundHandlerAdapter;
-import io.netty.channel.ChannelPromise;
-import io.netty.channel.SimpleChannelInboundHandler;
-import io.netty.handler.codec.http.DefaultFullHttpResponse;
-import io.netty.handler.codec.http.FullHttpRequest;
-import io.netty.handler.codec.http.FullHttpResponse;
-import io.netty.handler.codec.http.HttpHeaders;
-import io.netty.handler.codec.http.HttpRequest;
-import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
-import io.netty.handler.codec.http.websocketx.WebSocketFrame;
-import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
-import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
-
-import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive;
-import static io.netty.handler.codec.http.HttpHeaders.setContentLength;
-import static io.netty.handler.codec.http.HttpMethod.GET;
-import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
-import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
-
-public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
-
- private static final String WEBSOCKET_PATH = "/stomp";
-
- private HttpRequest httpRequest;
- private WebSocketServerHandshaker handshaker;
- private static final BinaryWebSocketEncoder BINARY_WEBSOCKET_ENCODER = new BinaryWebSocketEncoder();
-
- @Override
- public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
- if (msg instanceof FullHttpRequest) {
- handleHttpRequest(ctx, (FullHttpRequest) msg);
- }
- else if (msg instanceof WebSocketFrame) {
- WebSocketFrame frame = (WebSocketFrame) msg;
- boolean handle = handleWebSocketFrame(ctx, frame);
- if (handle) {
- ctx.fireChannelRead(frame.content().retain());
- }
- }
- }
-
- private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {
- // Allow only GET methods.
- if (req.getMethod() != GET) {
- sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN));
- return;
- }
-
- // Handshake
- WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(this.getWebSocketLocation(req), "v10.stomp,v11.stomp", false);
- this.httpRequest = req;
- this.handshaker = wsFactory.newHandshaker(req);
- if (this.handshaker == null) {
- WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
- }
- else {
- ChannelFuture handshake = this.handshaker.handshake(ctx.channel(), req);
- handshake.addListener(new ChannelFutureListener() {
-
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (future.isSuccess()) {
- // we need to insert an encoder that takes the underlying ChannelBuffer of a StompFrame.toActiveMQBuffer and
- // wrap it in a binary web socket frame before letting the wsencoder send it on the wire
- future.channel().pipeline().addAfter("wsencoder", "binary-websocket-encoder", BINARY_WEBSOCKET_ENCODER);
- }
- else {
- // Handshake failed, fire an exceptionCaught event
- future.channel().pipeline().fireExceptionCaught(future.cause());
- }
- }
- });
- }
- }
-
- private boolean handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
-
- // Check for closing frame
- if (frame instanceof CloseWebSocketFrame) {
- this.handshaker.close(ctx.channel(), ((CloseWebSocketFrame) frame).retain());
- return false;
- }
- else if (frame instanceof PingWebSocketFrame) {
- ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
- return false;
- }
- else if (!(frame instanceof TextWebSocketFrame) && !(frame instanceof BinaryWebSocketFrame)) {
- throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));
- }
- return true;
- }
-
- private void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, FullHttpResponse res) {
- // Generate an error page if response status code is not OK (200).
- if (res.getStatus().code() != 200) {
- res.content().writeBytes(res.getStatus().toString().getBytes(StandardCharsets.UTF_8));
- setContentLength(res, res.content().readableBytes());
- }
-
- // Send the response and close the connection if necessary.
- ChannelFuture f = ctx.writeAndFlush(res);
- if (!isKeepAlive(req) || res.getStatus().code() != 200) {
- f.addListener(ChannelFutureListener.CLOSE);
- }
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- cause.printStackTrace();
- ctx.close();
- }
-
- private String getWebSocketLocation(HttpRequest req) {
- return "ws://" + req.headers().get(HttpHeaders.Names.HOST) + WEBSOCKET_PATH;
- }
-
- public HttpRequest getHttpRequest() {
- return this.httpRequest;
- }
-
- @Sharable
- private static final class BinaryWebSocketEncoder extends ChannelOutboundHandlerAdapter {
-
- @Override
- public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
- if (msg instanceof ByteBuf) {
- msg = new BinaryWebSocketFrame((ByteBuf) msg);
- }
-
- ctx.write(msg, promise);
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7afd0fb0/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandler.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandler.java
new file mode 100644
index 0000000..671cb76
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/protocol/websocket/WebSocketServerHandler.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.server.protocol.websocket;
+
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelOutboundHandlerAdapter;
+import io.netty.channel.ChannelPromise;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
+import io.netty.handler.codec.http.websocketx.WebSocketFrame;
+import io.netty.handler.codec.http.websocketx.WebSocketServerHandshaker;
+import io.netty.handler.codec.http.websocketx.WebSocketServerHandshakerFactory;
+import org.apache.activemq.artemis.utils.StringUtil;
+
+import static io.netty.handler.codec.http.HttpHeaders.isKeepAlive;
+import static io.netty.handler.codec.http.HttpHeaders.setContentLength;
+import static io.netty.handler.codec.http.HttpMethod.GET;
+import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+
+public class WebSocketServerHandler extends SimpleChannelInboundHandler<Object> {
+
+ private static final String WEBSOCKET_PATH = "/stomp";
+
+ private HttpRequest httpRequest;
+ private WebSocketServerHandshaker handshaker;
+ private List<String> supportedProtocols;
+ private static final BinaryWebSocketEncoder BINARY_WEBSOCKET_ENCODER = new BinaryWebSocketEncoder();
+
+ public WebSocketServerHandler(List<String> supportedProtocols) {
+ this.supportedProtocols = supportedProtocols;
+ }
+
+ @Override
+ public void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
+ if (msg instanceof FullHttpRequest) {
+ handleHttpRequest(ctx, (FullHttpRequest) msg);
+ }
+ else if (msg instanceof WebSocketFrame) {
+ WebSocketFrame frame = (WebSocketFrame) msg;
+ boolean handle = handleWebSocketFrame(ctx, frame);
+ if (handle) {
+ ctx.fireChannelRead(frame.content().retain());
+ }
+ }
+ }
+
+ private void handleHttpRequest(ChannelHandlerContext ctx, FullHttpRequest req) throws Exception {
+ // Allow only GET methods.
+ if (req.getMethod() != GET) {
+ sendHttpResponse(ctx, req, new DefaultFullHttpResponse(HTTP_1_1, FORBIDDEN));
+ return;
+ }
+
+ // Handshake
+ String supportedProtocolsCSV = StringUtil.joinStringList(supportedProtocols, ",");
+ WebSocketServerHandshakerFactory wsFactory = new WebSocketServerHandshakerFactory(this.getWebSocketLocation(req), supportedProtocolsCSV,false);
+ this.httpRequest = req;
+ this.handshaker = wsFactory.newHandshaker(req);
+ if (this.handshaker == null) {
+ WebSocketServerHandshakerFactory.sendUnsupportedVersionResponse(ctx.channel());
+ }
+ else {
+ ChannelFuture handshake = this.handshaker.handshake(ctx.channel(), req);
+ handshake.addListener(new ChannelFutureListener() {
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ // we need to insert an encoder that takes the underlying ChannelBuffer of a StompFrame.toActiveMQBuffer and
+ // wrap it in a binary web socket frame before letting the wsencoder send it on the wire
+ future.channel().pipeline().addAfter("wsencoder", "binary-websocket-encoder", BINARY_WEBSOCKET_ENCODER);
+ }
+ else {
+ // Handshake failed, fire an exceptionCaught event
+ future.channel().pipeline().fireExceptionCaught(future.cause());
+ }
+ }
+ });
+ }
+ }
+
+ private boolean handleWebSocketFrame(ChannelHandlerContext ctx, WebSocketFrame frame) {
+
+ // Check for closing frame
+ if (frame instanceof CloseWebSocketFrame) {
+ this.handshaker.close(ctx.channel(), ((CloseWebSocketFrame) frame).retain());
+ return false;
+ }
+ else if (frame instanceof PingWebSocketFrame) {
+ ctx.writeAndFlush(new PongWebSocketFrame(frame.content().retain()));
+ return false;
+ }
+ else if (!(frame instanceof TextWebSocketFrame) && !(frame instanceof BinaryWebSocketFrame)) {
+ throw new UnsupportedOperationException(String.format("%s frame types not supported", frame.getClass().getName()));
+ }
+ return true;
+ }
+
+ private void sendHttpResponse(ChannelHandlerContext ctx, HttpRequest req, FullHttpResponse res) {
+ // Generate an error page if response status code is not OK (200).
+ if (res.getStatus().code() != 200) {
+ res.content().writeBytes(res.getStatus().toString().getBytes(StandardCharsets.UTF_8));
+ setContentLength(res, res.content().readableBytes());
+ }
+
+ // Send the response and close the connection if necessary.
+ ChannelFuture f = ctx.writeAndFlush(res);
+ if (!isKeepAlive(req) || res.getStatus().code() != 200) {
+ f.addListener(ChannelFutureListener.CLOSE);
+ }
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ cause.printStackTrace();
+ ctx.close();
+ }
+
+ private String getWebSocketLocation(HttpRequest req) {
+ return "ws://" + req.headers().get(HttpHeaders.Names.HOST) + WEBSOCKET_PATH;
+ }
+
+ public HttpRequest getHttpRequest() {
+ return this.httpRequest;
+ }
+
+ @Sharable
+ private static final class BinaryWebSocketEncoder extends ChannelOutboundHandlerAdapter {
+
+ @Override
+ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
+ if (msg instanceof ByteBuf) {
+ msg = new BinaryWebSocketFrame((ByteBuf) msg);
+ }
+
+ ctx.write(msg, promise);
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7afd0fb0/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java
index 3de5d5d..62befaf 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/spi/core/protocol/ProtocolManager.java
@@ -61,4 +61,12 @@ public interface ProtocolManager<P extends BaseInterceptor> {
boolean acceptsNoHandshake();
void handshake(NettyServerConnection connection, ActiveMQBuffer buffer);
+
+ /**
+ * A list of the IANA websocket subprotocol identifiers supported by this protocol manager. These are used
+ * during the websocket subprotocol handshake.
+ *
+ * @return A list of subprotocol ids
+ */
+ List<String> websocketSubprotocolIdentifiers();
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7afd0fb0/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
index 46d7aa3..1e6a21b 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorFactoryTest.java
@@ -72,7 +72,7 @@ public class NettyAcceptorFactoryTest extends ActiveMQTestBase {
};
- Acceptor acceptor = factory.createAcceptor("netty", null, params, handler, listener, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(), ActiveMQThreadFactory.defaultThreadFactory()), null);
+ Acceptor acceptor = factory.createAcceptor("netty", null, params, handler, listener, Executors.newCachedThreadPool(ActiveMQThreadFactory.defaultThreadFactory()), Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(), ActiveMQThreadFactory.defaultThreadFactory()), new HashMap<String, ProtocolManager>());
Assert.assertTrue(acceptor instanceof NettyAcceptor);
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/7afd0fb0/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
----------------------------------------------------------------------
diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
index 1379628..7f410ef 100644
--- a/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
+++ b/tests/unit-tests/src/test/java/org/apache/activemq/artemis/tests/unit/core/remoting/impl/netty/NettyAcceptorTest.java
@@ -95,7 +95,7 @@ public class NettyAcceptorTest extends ActiveMQTestBase {
}
};
pool2 = Executors.newScheduledThreadPool(ActiveMQDefaultConfiguration.getDefaultScheduledThreadPoolMaxSize(), ActiveMQThreadFactory.defaultThreadFactory());
- NettyAcceptor acceptor = new NettyAcceptor("netty", null, params, handler, listener, pool2, null);
+ NettyAcceptor acceptor = new NettyAcceptor("netty", null, params, handler, listener, pool2, new HashMap<String, ProtocolManager>());
addActiveMQComponent(acceptor);
acceptor.start();