You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/06/09 11:37:46 UTC
[3/5] storm git commit: STORM-1038: Upgrade to latest Netty
http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClientState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClientState.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClientState.java
index 45fa2d8..1818dfa 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClientState.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyClientState.java
@@ -12,15 +12,10 @@
package org.apache.storm.messaging.netty;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelLocal;
+import org.apache.storm.shade.io.netty.util.AttributeKey;
final class SaslNettyClientState {
- public static final ChannelLocal<SaslNettyClient> getSaslNettyClient = new ChannelLocal<SaslNettyClient>() {
- protected SaslNettyClient initialValue(Channel channel) {
- return null;
- }
- };
+ public static final AttributeKey<SaslNettyClient> SASL_NETTY_CLIENT = AttributeKey.valueOf("sasl.netty.client");
}
http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyServerState.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyServerState.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyServerState.java
index f8cb386..2c79519 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyServerState.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslNettyServerState.java
@@ -12,14 +12,9 @@
package org.apache.storm.messaging.netty;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelLocal;
+import org.apache.storm.shade.io.netty.util.AttributeKey;
final class SaslNettyServerState {
- public static final ChannelLocal<SaslNettyServer> getSaslNettyServer = new ChannelLocal<SaslNettyServer>() {
- protected SaslNettyServer initialValue(Channel channel) {
- return null;
- }
- };
+ public static final AttributeKey<SaslNettyServer> SASL_NETTY_SERVER = AttributeKey.valueOf("sasl.netty.server");
}
http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormClientHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormClientHandler.java
index 293cd38..25b0aa2 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormClientHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormClientHandler.java
@@ -1,3 +1,4 @@
+
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
@@ -13,21 +14,18 @@
package org.apache.storm.messaging.netty;
import java.io.IOException;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelHandlerContext;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelStateEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.Channels;
-import org.apache.storm.shade.org.jboss.netty.channel.MessageEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SaslStormClientHandler extends SimpleChannelUpstreamHandler {
+public class SaslStormClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger LOG = LoggerFactory
.getLogger(SaslStormClientHandler.class);
- long start_time;
- private ISaslClient client;
+ private final long start_time;
+ private final ISaslClient client;
/**
* Used for client or server's token to send or receive from each other.
*/
@@ -41,25 +39,22 @@ public class SaslStormClientHandler extends SimpleChannelUpstreamHandler {
}
@Override
- public void channelConnected(ChannelHandlerContext ctx,
- ChannelStateEvent event) {
- // register the newly established channel
- Channel channel = ctx.getChannel();
- client.channelConnected(channel);
+ public void channelActive(ChannelHandlerContext ctx) {
+ Channel channel = ctx.channel();
+ LOG.info("Connection established from " + channel.localAddress()
+ + " to " + channel.remoteAddress());
try {
- SaslNettyClient saslNettyClient = SaslNettyClientState.getSaslNettyClient
- .get(channel);
+ SaslNettyClient saslNettyClient = channel.attr(SaslNettyClientState.SASL_NETTY_CLIENT).get();
if (saslNettyClient == null) {
LOG.debug("Creating saslNettyClient now " + "for channel: "
+ channel);
saslNettyClient = new SaslNettyClient(name, token);
- SaslNettyClientState.getSaslNettyClient.set(channel,
- saslNettyClient);
+ channel.attr(SaslNettyClientState.SASL_NETTY_CLIENT).set(saslNettyClient);
}
LOG.debug("Sending SASL_TOKEN_MESSAGE_REQUEST");
- channel.write(ControlMessage.SASL_TOKEN_MESSAGE_REQUEST);
+ channel.writeAndFlush(ControlMessage.SASL_TOKEN_MESSAGE_REQUEST, channel.voidPromise());
} catch (Exception e) {
LOG.error("Failed to authenticate with server " + "due to error: ",
e);
@@ -67,54 +62,65 @@ public class SaslStormClientHandler extends SimpleChannelUpstreamHandler {
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent event)
- throws Exception {
+ public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception {
LOG.debug("send/recv time (ms): {}",
(System.currentTimeMillis() - start_time));
- Channel channel = ctx.getChannel();
-
+ // examine the response message from server
+ if (message instanceof ControlMessage) {
+ handleControlMessage(ctx, (ControlMessage) message);
+ } else if (message instanceof SaslMessageToken) {
+ handleSaslMessageToken(ctx, (SaslMessageToken) message);
+ } else {
+ LOG.error("Unexpected message from server: {}", message);
+ }
+ }
+
+ private SaslNettyClient getChannelSaslNettyClient(Channel channel) throws Exception {
// Generate SASL response to server using Channel-local SASL client.
- SaslNettyClient saslNettyClient = SaslNettyClientState.getSaslNettyClient
- .get(channel);
+ SaslNettyClient saslNettyClient = channel.attr(SaslNettyClientState.SASL_NETTY_CLIENT).get();
if (saslNettyClient == null) {
throw new Exception("saslNettyClient was unexpectedly "
- + "null for channel: " + channel);
+ + "null for channel: " + channel);
}
+ return saslNettyClient;
+ }
+
+ private void handleControlMessage(ChannelHandlerContext ctx, ControlMessage controlMessage) throws Exception {
+ SaslNettyClient saslNettyClient = getChannelSaslNettyClient(ctx.channel());
+ if (controlMessage == ControlMessage.SASL_COMPLETE_REQUEST) {
+ LOG.debug("Server has sent us the SaslComplete "
+ + "message. Allowing normal work to proceed.");
- // examine the response message from server
- if (event.getMessage() instanceof ControlMessage) {
- ControlMessage msg = (ControlMessage) event.getMessage();
- if (msg == ControlMessage.SASL_COMPLETE_REQUEST) {
- LOG.debug("Server has sent us the SaslComplete "
- + "message. Allowing normal work to proceed.");
-
- if (!saslNettyClient.isComplete()) {
- LOG.error("Server returned a Sasl-complete message, "
- + "but as far as we can tell, we are not authenticated yet.");
- throw new Exception("Server returned a "
- + "Sasl-complete message, but as far as "
- + "we can tell, we are not authenticated yet.");
- }
- ctx.getPipeline().remove(this);
- this.client.channelReady();
-
- // We call fireMessageReceived since the client is allowed to
- // perform this request. The client's request will now proceed
- // to the next pipeline component namely StormClientHandler.
- Channels.fireMessageReceived(ctx, msg);
- return;
+ if (!saslNettyClient.isComplete()) {
+ LOG.error("Server returned a Sasl-complete message, "
+ + "but as far as we can tell, we are not authenticated yet.");
+ throw new Exception("Server returned a "
+ + "Sasl-complete message, but as far as "
+ + "we can tell, we are not authenticated yet.");
}
+ ctx.pipeline().remove(this);
+ this.client.channelReady(ctx.channel());
+
+ // We call fireMessageRead since the client is allowed to
+ // perform this request. The client's request will now proceed
+ // to the next pipeline component namely StormClientHandler.
+ ctx.fireChannelRead(controlMessage);
+ } else {
+ LOG.warn("Unexpected control message: {}", controlMessage);
}
- SaslMessageToken saslTokenMessage = (SaslMessageToken) event
- .getMessage();
+ }
+
+ private void handleSaslMessageToken(ChannelHandlerContext ctx, SaslMessageToken saslMessageToken) throws Exception {
+ Channel channel = ctx.channel();
+ SaslNettyClient saslNettyClient = getChannelSaslNettyClient(channel);
LOG.debug("Responding to server's token of length: "
- + saslTokenMessage.getSaslToken().length);
+ + saslMessageToken.getSaslToken().length);
// Generate SASL response (but we only actually send the response if
// it's non-null.
byte[] responseToServer = saslNettyClient
- .saslResponse(saslTokenMessage);
+ .saslResponse(saslMessageToken);
if (responseToServer == null) {
// If we generate a null response, then authentication has completed
// (if not, warn), and return without sending a response back to the
@@ -127,7 +133,7 @@ public class SaslStormClientHandler extends SimpleChannelUpstreamHandler {
throw new Exception("Server response is null, but as far as "
+ "we can tell, we are not authenticated yet.");
}
- this.client.channelReady();
+ this.client.channelReady(channel);
return;
} else {
LOG.debug("Response to server token has length:"
@@ -136,7 +142,7 @@ public class SaslStormClientHandler extends SimpleChannelUpstreamHandler {
// Construct a message containing the SASL response and send it to the
// server.
SaslMessageToken saslResponse = new SaslMessageToken(responseToServer);
- channel.write(saslResponse);
+ channel.writeAndFlush(saslResponse, channel.voidPromise());
}
private void getSASLCredentials() throws IOException {
http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormServerAuthorizeHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormServerAuthorizeHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormServerAuthorizeHandler.java
index 7db90db..64a8bae 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormServerAuthorizeHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormServerAuthorizeHandler.java
@@ -12,21 +12,18 @@
package org.apache.storm.messaging.netty;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelHandlerContext;
-import org.apache.storm.shade.org.jboss.netty.channel.Channels;
-import org.apache.storm.shade.org.jboss.netty.channel.MessageEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Authorize or deny client requests based on existence and completeness of client's SASL authentication.
*/
-public class SaslStormServerAuthorizeHandler extends SimpleChannelUpstreamHandler {
+public class SaslStormServerAuthorizeHandler extends ChannelInboundHandlerAdapter {
private static final Logger LOG = LoggerFactory
- .getLogger(SaslStormServerHandler.class);
+ .getLogger(SaslStormServerAuthorizeHandler.class);
/**
* Constructor.
@@ -35,19 +32,16 @@ public class SaslStormServerAuthorizeHandler extends SimpleChannelUpstreamHandle
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
- Object msg = e.getMessage();
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg == null) {
return;
}
- Channel channel = ctx.getChannel();
LOG.debug("messageReceived: Checking whether the client is authorized to send messages to the server ");
// Authorize: client is allowed to doRequest() if and only if the client
// has successfully authenticated with this server.
- SaslNettyServer saslNettyServer = SaslNettyServerState.getSaslNettyServer
- .get(channel);
+ SaslNettyServer saslNettyServer = ctx.channel().attr(SaslNettyServerState.SASL_NETTY_SERVER).get();
if (saslNettyServer == null) {
LOG.warn("messageReceived: This client is *NOT* authorized to perform "
@@ -70,9 +64,9 @@ public class SaslStormServerAuthorizeHandler extends SimpleChannelUpstreamHandle
+ saslNettyServer.getUserName()
+ " is authorized to do request " + "on server.");
- // We call fireMessageReceived since the client is allowed to perform
+ // We call fireChannelRead since the client is allowed to perform
// this request. The client's request will now proceed to the next
// pipeline component.
- Channels.fireMessageReceived(ctx, msg);
+ ctx.fireChannelRead(msg);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormServerHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormServerHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormServerHandler.java
index 40f8916..ce69a6f 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormServerHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/SaslStormServerHandler.java
@@ -13,20 +13,17 @@
package org.apache.storm.messaging.netty;
import java.io.IOException;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelHandlerContext;
-import org.apache.storm.shade.org.jboss.netty.channel.Channels;
-import org.apache.storm.shade.org.jboss.netty.channel.ExceptionEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.MessageEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
+public class SaslStormServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger LOG = LoggerFactory
.getLogger(SaslStormServerHandler.class);
- ISaslServer server;
+ private final ISaslServer server;
/**
* Used for client or server's token to send or receive from each other.
*/
@@ -39,41 +36,39 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
- throws Exception {
- Object msg = e.getMessage();
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg == null) {
return;
}
- Channel channel = ctx.getChannel();
+ Channel channel = ctx.channel();
if (msg instanceof ControlMessage
- && e.getMessage() == ControlMessage.SASL_TOKEN_MESSAGE_REQUEST) {
+ && msg == ControlMessage.SASL_TOKEN_MESSAGE_REQUEST) {
// initialize server-side SASL functionality, if we haven't yet
// (in which case we are looking at the first SASL message from the
// client).
- SaslNettyServer saslNettyServer = SaslNettyServerState.getSaslNettyServer
- .get(channel);
+ SaslNettyServer saslNettyServer = channel.attr(SaslNettyServerState.SASL_NETTY_SERVER).get();
if (saslNettyServer == null) {
LOG.debug("No saslNettyServer for " + channel
- + " yet; creating now, with topology token: ");
+ + " yet; creating now, with topology token: " + topologyName);
try {
saslNettyServer = new SaslNettyServer(topologyName, token);
- } catch (IOException ioe) {
+ LOG.debug("SaslNettyServer for " + channel
+ + "created with topology token: " + topologyName);
+ } catch (IOException e) {
LOG.error("Error occurred while creating saslNettyServer on server "
- + channel.getLocalAddress()
+ + channel.localAddress()
+ " for client "
- + channel.getRemoteAddress());
- saslNettyServer = null;
+ + channel.remoteAddress());
+ throw new IllegalStateException("Failed to set SaslNettyServerState.SASL_NETTY_SERVER");
}
- SaslNettyServerState.getSaslNettyServer.set(channel,
- saslNettyServer);
+ channel.attr(SaslNettyServerState.SASL_NETTY_SERVER).set(saslNettyServer);
} else {
LOG.debug("Found existing saslNettyServer on server:"
- + channel.getLocalAddress() + " for client "
- + channel.getRemoteAddress());
+ + channel.localAddress() + " for client "
+ + channel.remoteAddress());
}
LOG.debug("processToken: With nettyServer: " + saslNettyServer
@@ -83,7 +78,7 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
saslTokenMessageRequest = new SaslMessageToken(
saslNettyServer.response(new byte[0]));
// Send response to client.
- channel.write(saslTokenMessageRequest);
+ channel.writeAndFlush(saslTokenMessageRequest, channel.voidPromise());
// do not send upstream to other handlers: no further action needs
// to be done for SASL_TOKEN_MESSAGE_REQUEST requests.
return;
@@ -93,8 +88,7 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
// initialize server-side SASL functionality, if we haven't yet
// (in which case we are looking at the first SASL message from the
// client).
- SaslNettyServer saslNettyServer = SaslNettyServerState.getSaslNettyServer
- .get(channel);
+ SaslNettyServer saslNettyServer = channel.attr(SaslNettyServerState.SASL_NETTY_SERVER).get();
if (saslNettyServer == null) {
throw new Exception("saslNettyServer was unexpectedly "
+ "null for channel: " + channel);
@@ -104,17 +98,17 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
.getSaslToken()));
// Send response to client.
- channel.write(saslTokenMessageRequest);
+ channel.writeAndFlush(saslTokenMessageRequest, channel.voidPromise());
if (saslNettyServer.isComplete()) {
// If authentication of client is complete, we will also send a
// SASL-Complete message to the client.
LOG.debug("SASL authentication is complete for client with "
+ "username: " + saslNettyServer.getUserName());
- channel.write(ControlMessage.SASL_COMPLETE_REQUEST);
+ channel.writeAndFlush(ControlMessage.SASL_COMPLETE_REQUEST, channel.voidPromise());
LOG.debug("Removing SaslServerHandler from pipeline since SASL "
+ "authentication is complete.");
- ctx.getPipeline().remove(this);
+ ctx.pipeline().remove(this);
server.authenticated(channel);
}
} else {
@@ -125,15 +119,13 @@ public class SaslStormServerHandler extends SimpleChannelUpstreamHandler {
// authentication has not completed.
LOG.warn("Sending upstream an unexpected non-SASL message : "
+ msg);
- Channels.fireMessageReceived(ctx, msg);
+ ctx.fireChannelRead(msg);
}
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
- if (server != null) {
- server.closeChannel(e.getChannel());
- }
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ ctx.close();
}
private void getSASLCredentials() throws IOException {
http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
index d5b2c8a..6500abe 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/Server.java
@@ -13,14 +13,13 @@
package org.apache.storm.messaging.netty;
import java.net.InetSocketAddress;
-import java.util.Arrays;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;
@@ -33,37 +32,45 @@ import org.apache.storm.metric.api.IMetric;
import org.apache.storm.metric.api.IStatefulObject;
import org.apache.storm.serialization.KryoValuesDeserializer;
import org.apache.storm.serialization.KryoValuesSerializer;
-import org.apache.storm.shade.org.jboss.netty.bootstrap.ServerBootstrap;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelFactory;
-import org.apache.storm.shade.org.jboss.netty.channel.group.ChannelGroup;
-import org.apache.storm.shade.org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.apache.storm.shade.org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.apache.storm.shade.io.netty.bootstrap.ServerBootstrap;
+import org.apache.storm.shade.io.netty.buffer.PooledByteBufAllocator;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelFuture;
+import org.apache.storm.shade.io.netty.channel.ChannelOption;
+import org.apache.storm.shade.io.netty.channel.EventLoopGroup;
+import org.apache.storm.shade.io.netty.channel.group.ChannelGroup;
+import org.apache.storm.shade.io.netty.channel.group.DefaultChannelGroup;
+import org.apache.storm.shade.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.storm.shade.io.netty.channel.socket.nio.NioServerSocketChannel;
+import org.apache.storm.shade.io.netty.util.concurrent.GlobalEventExecutor;
import org.apache.storm.utils.ObjectReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServer {
+ public static final int LOAD_METRICS_TASK_ID = -1;
+
private static final Logger LOG = LoggerFactory.getLogger(Server.class);
- final ChannelFactory factory;
- final ServerBootstrap bootstrap;
+ private final EventLoopGroup bossEventLoopGroup;
+ private final EventLoopGroup workerEventLoopGroup;
+ private final ServerBootstrap bootstrap;
private final ConcurrentHashMap<String, AtomicInteger> messagesEnqueued = new ConcurrentHashMap<>();
private final AtomicInteger messagesDequeued = new AtomicInteger(0);
private final int boundPort;
- Map<String, Object> topoConf;
- int port;
- volatile ChannelGroup allChannels = new DefaultChannelGroup("storm-server");
- KryoValuesSerializer _ser;
- KryoValuesDeserializer deser;
+ private final Map<String, Object> topoConf;
+ private final int port;
+ private final ChannelGroup allChannels = new DefaultChannelGroup("storm-server", GlobalEventExecutor.INSTANCE);
+ private final KryoValuesSerializer ser;
+ private final KryoValuesDeserializer deser;
private volatile boolean closing = false;
- private IConnectionCallback _cb = null;
+ private IConnectionCallback cb = null;
private Supplier<Object> newConnectionResponse;
Server(Map<String, Object> topoConf, int port) {
this.topoConf = topoConf;
this.port = port;
- _ser = new KryoValuesSerializer(topoConf);
+ ser = new KryoValuesSerializer(topoConf);
deser = new KryoValuesDeserializer(topoConf);
// Configure the server.
@@ -74,30 +81,33 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
ThreadFactory bossFactory = new NettyRenameThreadFactory(netty_name() + "-boss");
ThreadFactory workerFactory = new NettyRenameThreadFactory(netty_name() + "-worker");
- if (maxWorkers > 0) {
- factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
- Executors.newCachedThreadPool(workerFactory), maxWorkers);
- } else {
- factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(bossFactory),
- Executors.newCachedThreadPool(workerFactory));
- }
+ bossEventLoopGroup = new NioEventLoopGroup(1, bossFactory);
+ // 0 means DEFAULT_EVENT_LOOP_THREADS
+ // https://github.com/netty/netty/blob/netty-4.1.24.Final/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java#L40
+ this.workerEventLoopGroup = new NioEventLoopGroup(maxWorkers > 0 ? maxWorkers : 0, workerFactory);
LOG.info("Create Netty Server " + netty_name() + ", buffer_size: " + buffer_size + ", maxWorkers: " + maxWorkers);
- bootstrap = new ServerBootstrap(factory);
- bootstrap.setOption("child.tcpNoDelay", true);
- bootstrap.setOption("child.receiveBufferSize", buffer_size);
- bootstrap.setOption("child.keepAlive", true);
- bootstrap.setOption("reuseAddress", true);
- bootstrap.setOption("backlog", backlog);
-
- // Set up the pipeline factory.
- bootstrap.setPipelineFactory(new StormServerPipelineFactory(this));
+ bootstrap = new ServerBootstrap()
+ .group(bossEventLoopGroup, workerEventLoopGroup)
+ .channel(NioServerSocketChannel.class)
+ .option(ChannelOption.SO_REUSEADDR, true)
+ .option(ChannelOption.SO_BACKLOG, backlog)
+ .childOption(ChannelOption.TCP_NODELAY, true)
+ .childOption(ChannelOption.SO_RCVBUF, buffer_size)
+ .childOption(ChannelOption.SO_KEEPALIVE, true)
+ .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+ .childHandler(new StormServerPipelineFactory(ser, deser, topoConf, this));
// Bind and start to accept incoming connections.
- Channel channel = bootstrap.bind(new InetSocketAddress(port));
- boundPort = ((InetSocketAddress) channel.getLocalAddress()).getPort();
- allChannels.add(channel);
+ try {
+ ChannelFuture bindFuture = bootstrap.bind(new InetSocketAddress(port)).sync();
+ Channel channel = bindFuture.channel();
+ boundPort = ((InetSocketAddress) channel.localAddress()).getPort();
+ allChannels.add(channel);
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
}
private void addReceiveCount(String from, int amount) {
@@ -124,18 +134,18 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
* @throws InterruptedException
*/
protected void enqueue(List<TaskMessage> msgs, String from) throws InterruptedException {
- if (null == msgs || msgs.size() == 0 || closing) {
+ if (null == msgs || msgs.isEmpty() || closing) {
return;
}
addReceiveCount(from, msgs.size());
- if (_cb != null) {
- _cb.recv(msgs);
+ if (cb != null) {
+ cb.recv(msgs);
}
}
@Override
public void registerRecv(IConnectionCallback cb) {
- _cb = cb;
+ this.cb = cb;
}
@Override
@@ -143,14 +153,6 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
this.newConnectionResponse = newConnectionResponse;
}
- /**
- * @param channel channel to close
- */
- public void closeChannel(Channel channel) {
- channel.close().awaitUninterruptibly();
- allChannels.remove(channel);
- }
-
@Override
public int getPort() {
return boundPort;
@@ -159,26 +161,25 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
/**
* close all channels, and release resources
*/
- public synchronized void close() {
- if (allChannels != null) {
- allChannels.close().awaitUninterruptibly();
- factory.releaseExternalResources();
- allChannels = null;
- }
+ @Override
+ public void close() {
+ allChannels.close().awaitUninterruptibly();
+ workerEventLoopGroup.shutdownGracefully().awaitUninterruptibly();
+ bossEventLoopGroup.shutdownGracefully().awaitUninterruptibly();
}
@Override
- synchronized public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
+ public void sendLoadMetrics(Map<Integer, Double> taskToLoad) {
MessageBatch mb = new MessageBatch(1);
- mb.add(new TaskMessage(-1, _ser.serialize(Arrays.asList((Object) taskToLoad))));
- allChannels.write(mb);
+ mb.add(new TaskMessage(LOAD_METRICS_TASK_ID, ser.serialize(Collections.singletonList((Object) taskToLoad))));
+ allChannels.writeAndFlush(mb);
}
// this method expected to be thread safe
@Override
- synchronized public void sendBackPressureStatus(BackPressureStatus bpStatus) {
+ public void sendBackPressureStatus(BackPressureStatus bpStatus) {
LOG.info("Sending BackPressure status update to connected workers. BPStatus = {}", bpStatus);
- allChannels.write(bpStatus);
+ allChannels.writeAndFlush(bpStatus);
}
@Override
@@ -187,16 +188,11 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
}
@Override
- public void send(int task, byte[] message) {
- throw new UnsupportedOperationException("Server connection should not send any messages");
- }
-
- @Override
public void send(Iterator<TaskMessage> msgs) {
throw new UnsupportedOperationException("Server connection should not send any messages");
}
- public String netty_name() {
+ public final String netty_name() {
return "Netty-server-localhost-" + port;
}
@@ -212,7 +208,7 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
}
private boolean connectionEstablished(Channel channel) {
- return channel != null && channel.isBound();
+ return channel != null && channel.isActive();
}
private boolean connectionEstablished(ChannelGroup allChannels) {
@@ -226,11 +222,12 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
return allEstablished;
}
+ @Override
public Object getState() {
LOG.debug("Getting metrics for server on port {}", port);
HashMap<String, Object> ret = new HashMap<>();
ret.put("dequeuedMessages", messagesDequeued.getAndSet(0));
- HashMap<String, Integer> enqueued = new HashMap<String, Integer>();
+ HashMap<String, Integer> enqueued = new HashMap<>();
Iterator<Map.Entry<String, AtomicInteger>> it = messagesEnqueued.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<String, AtomicInteger> ent = it.next();
@@ -245,8 +242,8 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
ret.put("enqueued", enqueued);
// Report messageSizes metric, if enabled (non-null).
- if (_cb instanceof IMetric) {
- Object metrics = ((IMetric) _cb).getValueAndReset();
+ if (cb instanceof IMetric) {
+ Object metrics = ((IMetric) cb).getValueAndReset();
if (metrics instanceof Map) {
ret.put("messageBytes", metrics);
}
@@ -258,28 +255,32 @@ class Server extends ConnectionWithStatus implements IStatefulObject, ISaslServe
/**
* Implementing IServer.
**/
- public void channelConnected(Channel c) {
+ @Override
+ public void channelActive(Channel c) {
if (newConnectionResponse != null) {
- c.write(newConnectionResponse.get()); // not synchronized since it is not yet in channel grp, so pvt to this thread
+ c.writeAndFlush(newConnectionResponse.get(), c.voidPromise());
}
allChannels.add(c);
}
+ @Override
public void received(Object message, String remote, Channel channel) throws InterruptedException {
List<TaskMessage> msgs = (List<TaskMessage>) message;
enqueue(msgs, remote);
}
+ @Override
public String name() {
return (String) topoConf.get(Config.TOPOLOGY_NAME);
}
+ @Override
public String secretKey() {
return SaslUtils.getSecretKey(topoConf);
}
+ @Override
public void authenticated(Channel c) {
- return;
}
@Override
http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
index 9b63346..4c38344 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientHandler.java
@@ -18,19 +18,16 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.storm.messaging.TaskMessage;
import org.apache.storm.serialization.KryoValuesDeserializer;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelHandlerContext;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelStateEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.ExceptionEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.MessageEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class StormClientHandler extends SimpleChannelUpstreamHandler {
+public class StormClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger LOG = LoggerFactory.getLogger(StormClientHandler.class);
- private Client client;
- private KryoValuesDeserializer _des;
- private AtomicBoolean[] remoteBpStatus;
+ private final Client client;
+ private final KryoValuesDeserializer _des;
+ private final AtomicBoolean[] remoteBpStatus;
StormClientHandler(Client client, AtomicBoolean[] remoteBpStatus, Map<String, Object> conf) {
this.client = client;
@@ -39,9 +36,8 @@ public class StormClientHandler extends SimpleChannelUpstreamHandler {
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) {
+ public void channelRead(ChannelHandlerContext ctx, Object message) throws Exception {
//examine the response message from server
- Object message = event.getMessage();
if (message instanceof ControlMessage) {
ControlMessage msg = (ControlMessage) message;
if (msg == ControlMessage.FAILURE_RESPONSE) {
@@ -61,16 +57,17 @@ public class StormClientHandler extends SimpleChannelUpstreamHandler {
}
LOG.debug("Received BackPressure status update : {}", status);
} else if (message instanceof List) {
- //This should be the metrics, and there should only be one of them
+ //This should be the load metrics.
+ //There will usually only be one message, but if there are multiple we only process the latest one.
List<TaskMessage> list = (List<TaskMessage>) message;
if (list.size() < 1) {
throw new RuntimeException("Didn't see enough load metrics (" + client.getDstAddress() + ") " + list);
}
- TaskMessage tm = ((List<TaskMessage>) message).get(list.size() - 1);
- if (tm.task() != -1) {
+ TaskMessage tm = list.get(list.size() - 1);
+ if (tm.task() != Server.LOAD_METRICS_TASK_ID) {
throw new RuntimeException("Metrics messages are sent to the system task (" + client.getDstAddress() + ") " + tm);
}
- List metrics = _des.deserialize(tm.message());
+ List<Object> metrics = _des.deserialize(tm.message());
if (metrics.size() < 1) {
throw new RuntimeException("No metrics data in the metrics message (" + client.getDstAddress() + ") " + metrics);
}
@@ -85,13 +82,7 @@ public class StormClientHandler extends SimpleChannelUpstreamHandler {
}
@Override
- public void channelInterestChanged(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
- client.notifyInterestChanged(e.getChannel());
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
- Throwable cause = event.getCause();
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
if (!(cause instanceof ConnectException)) {
LOG.info("Connection to " + client.getDstAddress() + " failed:", cause);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java
index 1808d3e..2a790ef 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormClientPipelineFactory.java
@@ -15,14 +15,14 @@ package org.apache.storm.messaging.netty;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.storm.Config;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelPipeline;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelPipelineFactory;
-import org.apache.storm.shade.org.jboss.netty.channel.Channels;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelInitializer;
+import org.apache.storm.shade.io.netty.channel.ChannelPipeline;
-class StormClientPipelineFactory implements ChannelPipelineFactory {
- private Client client;
- private AtomicBoolean[] remoteBpStatus;
- private Map<String, Object> conf;
+class StormClientPipelineFactory extends ChannelInitializer<Channel> {
+ private final Client client;
+ private final AtomicBoolean[] remoteBpStatus;
+ private final Map<String, Object> conf;
StormClientPipelineFactory(Client client, AtomicBoolean[] remoteBpStatus, Map<String, Object> conf) {
this.client = client;
@@ -30,14 +30,15 @@ class StormClientPipelineFactory implements ChannelPipelineFactory {
this.conf = conf;
}
- public ChannelPipeline getPipeline() throws Exception {
+ @Override
+ protected void initChannel(Channel ch) throws Exception {
// Create a default pipeline implementation.
- ChannelPipeline pipeline = Channels.pipeline();
+ ChannelPipeline pipeline = ch.pipeline();
// Decoder
pipeline.addLast("decoder", new MessageDecoder(client.deser));
// Encoder
- pipeline.addLast("encoder", new MessageEncoder(client.ser));
+ pipeline.addLast("encoder", NettySerializableMessageEncoder.INSTANCE);
boolean isNettyAuth = (Boolean) conf
.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
@@ -48,6 +49,5 @@ class StormClientPipelineFactory implements ChannelPipelineFactory {
}
// business logic.
pipeline.addLast("handler", new StormClientHandler(client, remoteBpStatus, conf));
- return pipeline;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
index 7138f94..55e7058 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerHandler.java
@@ -17,22 +17,18 @@ import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelHandlerContext;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelStateEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.ExceptionEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.MessageEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class StormServerHandler extends SimpleChannelUpstreamHandler {
+public class StormServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger LOG = LoggerFactory.getLogger(StormServerHandler.class);
- private static final Set<Class> allowedExceptions = new HashSet<>(Arrays.asList(new Class[]{ IOException.class }));
- IServer server;
- private AtomicInteger failure_count;
- private Channel channel;
+ private static final Set<Class> ALLOWED_EXCEPTIONS = new HashSet<>(Arrays.asList(new Class[]{ IOException.class }));
+ private final IServer server;
+ private final AtomicInteger failure_count;
public StormServerHandler(IServer server) {
this.server = server;
@@ -40,40 +36,35 @@ public class StormServerHandler extends SimpleChannelUpstreamHandler {
}
@Override
- public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) {
- server.channelConnected(e.getChannel());
- if (channel != null) {
- LOG.debug("Replacing channel with new channel: {} -> ",
- channel, e.getChannel());
- }
- channel = e.getChannel();
- server.channelConnected(channel);
+ public void channelActive(ChannelHandlerContext ctx) {
+ server.channelActive(ctx.channel());
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
- Object msgs = e.getMessage();
- if (msgs == null) {
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ if (msg == null) {
return;
}
+ Channel channel = ctx.channel();
try {
- server.received(msgs, e.getRemoteAddress().toString(), channel);
- } catch (InterruptedException e1) {
+ server.received(msg, channel.remoteAddress().toString(), channel);
+ } catch (InterruptedException e) {
LOG.info("failed to enqueue a request message", e);
failure_count.incrementAndGet();
}
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
try {
- LOG.error("server errors in handling the request", e.getCause());
+ LOG.error("server errors in handling the request", cause);
} catch (Throwable err) {
// Doing nothing (probably due to an oom issue) and hoping Utils.handleUncaughtException will handle it
}
try {
- Utils.handleUncaughtException(e.getCause(), allowedExceptions);
+ Utils.handleUncaughtException(cause, ALLOWED_EXCEPTIONS);
+ ctx.close();
} catch (Error error) {
LOG.info("Received error in netty thread.. terminating server...");
Runtime.getRuntime().exit(1);
http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java
index 068843f..6e22fb7 100644
--- a/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java
+++ b/storm-client/src/jvm/org/apache/storm/messaging/netty/StormServerPipelineFactory.java
@@ -1,3 +1,4 @@
+
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
@@ -12,28 +13,41 @@
package org.apache.storm.messaging.netty;
+import java.util.Map;
import org.apache.storm.Config;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelPipeline;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelPipelineFactory;
-import org.apache.storm.shade.org.jboss.netty.channel.Channels;
-
-class StormServerPipelineFactory implements ChannelPipelineFactory {
- private Server server;
-
- StormServerPipelineFactory(Server server) {
+import org.apache.storm.serialization.KryoValuesDeserializer;
+import org.apache.storm.serialization.KryoValuesSerializer;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelInitializer;
+import org.apache.storm.shade.io.netty.channel.ChannelPipeline;
+
+class StormServerPipelineFactory extends ChannelInitializer<Channel> {
+
+ private final KryoValuesSerializer ser;
+ private final KryoValuesDeserializer deser;
+ private final Map<String, Object> topoConf;
+ private final Server server;
+
+ StormServerPipelineFactory(KryoValuesSerializer ser, KryoValuesDeserializer deser,
+ Map<String, Object> topoConf, Server server) {
+ this.ser = ser;
+ this.deser = deser;
+ this.topoConf = topoConf;
this.server = server;
}
- public ChannelPipeline getPipeline() throws Exception {
+ @Override
+ protected void initChannel(Channel ch) throws Exception {
// Create a default pipeline implementation.
- ChannelPipeline pipeline = Channels.pipeline();
+ ChannelPipeline pipeline = ch.pipeline();
// Decoder
- pipeline.addLast("decoder", new MessageDecoder(server.deser));
- // Encoder
- pipeline.addLast("encoder", new MessageEncoder(server._ser));
+ pipeline.addLast("decoder", new MessageDecoder(deser));
+ // Encoders
+ pipeline.addLast("netty-serializable-encoder", NettySerializableMessageEncoder.INSTANCE);
+ pipeline.addLast("backpressure-encoder", new BackPressureStatusEncoder(ser));
- boolean isNettyAuth = (Boolean) this.server.topoConf
+ boolean isNettyAuth = (Boolean) topoConf
.get(Config.STORM_MESSAGING_NETTY_AUTHENTICATION);
if (isNettyAuth) {
// Authenticate: Removed after authentication completes
@@ -41,11 +55,9 @@ class StormServerPipelineFactory implements ChannelPipelineFactory {
server));
// Authorize
pipeline.addLast("authorizeServerHandler",
- new SaslStormServerAuthorizeHandler());
+ new SaslStormServerAuthorizeHandler());
}
// business logic.
pipeline.addLast("handler", new StormServerHandler(server));
-
- return pipeline;
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java b/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
index ccaa0f8..c24d8db 100644
--- a/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
+++ b/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClient.java
@@ -16,7 +16,6 @@ import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Timer;
import java.util.TimerTask;
-import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -28,10 +27,14 @@ import org.apache.storm.messaging.netty.ISaslClient;
import org.apache.storm.messaging.netty.NettyRenameThreadFactory;
import org.apache.storm.pacemaker.codec.ThriftNettyClientCodec;
import org.apache.storm.security.auth.ClientAuthUtils;
-import org.apache.storm.shade.org.jboss.netty.bootstrap.ClientBootstrap;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelPipelineFactory;
-import org.apache.storm.shade.org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.apache.storm.shade.io.netty.bootstrap.Bootstrap;
+import org.apache.storm.shade.io.netty.buffer.PooledByteBufAllocator;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelOption;
+import org.apache.storm.shade.io.netty.channel.EventLoopGroup;
+import org.apache.storm.shade.io.netty.channel.WriteBufferWaterMark;
+import org.apache.storm.shade.io.netty.channel.nio.NioEventLoopGroup;
+import org.apache.storm.shade.io.netty.channel.socket.nio.NioSocketChannel;
import org.apache.storm.utils.StormBoundedExponentialBackoffRetry;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -40,7 +43,8 @@ public class PacemakerClient implements ISaslClient {
private static final Logger LOG = LoggerFactory.getLogger(PacemakerClient.class);
private static Timer timer = new Timer(true);
- private final ClientBootstrap bootstrap;
+ private final Bootstrap bootstrap;
+ private final EventLoopGroup workerEventLoopGroup;
private String client_name;
private String secret;
private AtomicBoolean ready;
@@ -56,11 +60,6 @@ public class PacemakerClient implements ISaslClient {
private StormBoundedExponentialBackoffRetry backoff = new StormBoundedExponentialBackoffRetry(100, 5000, 20);
private int retryTimes = 0;
- //the constructor is invoked by pacemaker-state-factory-test
- public PacemakerClient() {
- bootstrap = new ClientBootstrap();
- }
-
public PacemakerClient(Map<String, Object> config, String host) {
int port = (int) config.get(Config.PACEMAKER_PORT);
@@ -68,9 +67,9 @@ public class PacemakerClient implements ISaslClient {
if (client_name == null) {
client_name = "pacemaker-client";
}
+ int maxWorkers = (int)config.get(Config.PACEMAKER_CLIENT_MAX_THREADS);
String auth = (String) config.get(Config.PACEMAKER_AUTH_METHOD);
- ThriftNettyClientCodec.AuthMethod authMethod;
switch (auth) {
@@ -100,23 +99,25 @@ public class PacemakerClient implements ISaslClient {
ready = new AtomicBoolean(false);
shutdown = new AtomicBoolean(false);
- channelRef = new AtomicReference<Channel>(null);
+ channelRef = new AtomicReference<>(null);
setupMessaging();
- ThreadFactory bossFactory = new NettyRenameThreadFactory("client-boss");
ThreadFactory workerFactory = new NettyRenameThreadFactory("client-worker");
- NioClientSocketChannelFactory factory =
- new NioClientSocketChannelFactory(Executors.newCachedThreadPool(bossFactory), Executors.newCachedThreadPool(workerFactory));
- bootstrap = new ClientBootstrap(factory);
- bootstrap.setOption("tcpNoDelay", true);
- bootstrap.setOption("sendBufferSize", 5242880);
- bootstrap.setOption("keepAlive", true);
+ // 0 means DEFAULT_EVENT_LOOP_THREADS
+ // https://github.com/netty/netty/blob/netty-4.1.24.Final/transport/src/main/java/io/netty/channel/MultithreadEventLoopGroup.java#L40
+ this.workerEventLoopGroup = new NioEventLoopGroup(maxWorkers > 0 ? maxWorkers : 0, workerFactory);
+ int thriftMessageMaxSize = (Integer) config.get(Config.PACEMAKER_THRIFT_MESSAGE_SIZE_MAX);
+ bootstrap = new Bootstrap()
+ .group(workerEventLoopGroup)
+ .channel(NioSocketChannel.class)
+ .option(ChannelOption.TCP_NODELAY, true)
+ .option(ChannelOption.SO_SNDBUF, 5242880)
+ .option(ChannelOption.SO_KEEPALIVE, true)
+ .option(ChannelOption.WRITE_BUFFER_WATER_MARK, new WriteBufferWaterMark(8 * 1024, 32 * 1024))
+ .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
+ .handler(new ThriftNettyClientCodec(this, config, authMethod, host, thriftMessageMaxSize));
remote_addr = new InetSocketAddress(host, port);
- int thriftMessageMaxSize = (Integer) config.get(Config.PACEMAKER_THRIFT_MESSAGE_SIZE_MAX);
- ChannelPipelineFactory pipelineFactory =
- new ThriftNettyClientCodec(this, config, authMethod, host, thriftMessageMaxSize).pipelineFactory();
- bootstrap.setPipelineFactory(pipelineFactory);
bootstrap.connect(remote_addr);
}
@@ -129,27 +130,16 @@ public class PacemakerClient implements ISaslClient {
}
@Override
- public synchronized void channelConnected(Channel channel) {
+ public synchronized void channelReady(Channel channel) {
Channel oldChannel = channelRef.get();
if (oldChannel != null) {
LOG.debug("Closing oldChannel is connected: {}", oldChannel.toString());
close_channel();
}
- LOG.debug("Channel is connected: {}", channel.toString());
channelRef.set(channel);
-
- //If we're not going to authenticate, we can begin sending.
- if (authMethod == ThriftNettyClientCodec.AuthMethod.NONE) {
- ready.set(true);
- this.notifyAll();
- }
retryTimes = 0;
- }
-
- @Override
- public synchronized void channelReady() {
- LOG.debug("Channel is ready.");
+ LOG.debug("Channel is ready: {}", channel.toString());
ready.set(true);
this.notifyAll();
}
@@ -176,7 +166,7 @@ public class PacemakerClient implements ISaslClient {
waitUntilReady();
Channel channel = channelRef.get();
if (channel != null) {
- channel.write(m);
+ channel.writeAndFlush(m, channel.voidPromise());
m.wait(1000);
}
if (messages[next] != m && messages[next] != null) {
@@ -257,7 +247,7 @@ public class PacemakerClient implements ISaslClient {
public void shutdown() {
shutdown.set(true);
- bootstrap.releaseExternalResources();
+ workerEventLoopGroup.shutdownGracefully().awaitUninterruptibly();
}
private synchronized void close_channel() {
http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClientHandler.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClientHandler.java b/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClientHandler.java
index 49525db..b81f02d 100644
--- a/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClientHandler.java
+++ b/storm-client/src/jvm/org/apache/storm/pacemaker/PacemakerClientHandler.java
@@ -15,57 +15,49 @@ package org.apache.storm.pacemaker;
import java.net.ConnectException;
import org.apache.storm.generated.HBMessage;
import org.apache.storm.messaging.netty.ControlMessage;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelHandlerContext;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelStateEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.ExceptionEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.MessageEvent;
-import org.apache.storm.shade.org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.channel.ChannelInboundHandlerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class PacemakerClientHandler extends SimpleChannelUpstreamHandler {
+public class PacemakerClientHandler extends ChannelInboundHandlerAdapter {
private static final Logger LOG = LoggerFactory.getLogger(PacemakerClientHandler.class);
- private PacemakerClient client;
+ private final PacemakerClient client;
public PacemakerClientHandler(PacemakerClient client) {
this.client = client;
}
@Override
- public void channelConnected(ChannelHandlerContext ctx,
- ChannelStateEvent event) {
+ public void channelActive(ChannelHandlerContext ctx) {
// register the newly established channel
- Channel channel = ctx.getChannel();
- client.channelConnected(channel);
-
+ Channel channel = ctx.channel();
LOG.info("Connection established from {} to {}",
- channel.getLocalAddress(), channel.getRemoteAddress());
+ channel.localAddress(), channel.remoteAddress());
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent event) {
- LOG.debug("Got Message: {}", event.getMessage().toString());
- Object evm = event.getMessage();
+ public void channelRead(ChannelHandlerContext ctx, Object message) {
+ LOG.debug("Got Message: {}", message.toString());
- if (evm instanceof ControlMessage) {
- LOG.debug("Got control message: {}", evm.toString());
+ if (message instanceof ControlMessage) {
+ LOG.debug("Got control message: {}", message.toString());
return;
- } else if (evm instanceof HBMessage) {
- client.gotMessage((HBMessage) evm);
+ } else if (message instanceof HBMessage) {
+ client.gotMessage((HBMessage) message);
} else {
- LOG.warn("Got unexpected message: {} from server.", evm);
+ LOG.warn("Got unexpected message: {} from server.", message);
}
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent event) {
- Throwable t = event.getCause();
- if (t instanceof ConnectException) {
- LOG.warn("Connection to pacemaker failed. Trying to reconnect {}", t.getMessage());
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ if (cause instanceof ConnectException) {
+ LOG.warn("Connection to pacemaker failed. Trying to reconnect {}", cause.getMessage());
} else {
- LOG.error("Exception occurred in Pacemaker.", t);
+ LOG.error("Exception occurred in Pacemaker.", cause);
}
client.reconnect();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java b/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java
index 2d9d5c9..c88ecf2 100644
--- a/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java
+++ b/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftDecoder.java
@@ -13,17 +13,17 @@
package org.apache.storm.pacemaker.codec;
import java.io.IOException;
+import java.util.List;
import org.apache.storm.generated.HBMessage;
import org.apache.storm.generated.HBServerMessageType;
import org.apache.storm.messaging.netty.ControlMessage;
import org.apache.storm.messaging.netty.SaslMessageToken;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBuffer;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelHandlerContext;
-import org.apache.storm.shade.org.jboss.netty.handler.codec.frame.FrameDecoder;
+import org.apache.storm.shade.io.netty.buffer.ByteBuf;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.handler.codec.ByteToMessageDecoder;
import org.apache.storm.utils.Utils;
-public class ThriftDecoder extends FrameDecoder {
+public class ThriftDecoder extends ByteToMessageDecoder {
private static final int INTEGER_SIZE = 4;
@@ -40,11 +40,10 @@ public class ThriftDecoder extends FrameDecoder {
}
@Override
- protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buf) throws Exception {
-
+ protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf buf, List<Object> out) throws Exception {
long available = buf.readableBytes();
if (available < INTEGER_SIZE) {
- return null;
+ return;
}
buf.markReaderIndex();
@@ -61,7 +60,7 @@ public class ThriftDecoder extends FrameDecoder {
if (available < thriftLen) {
// We haven't received the entire object yet, return and wait for more bytes.
buf.resetReaderIndex();
- return null;
+ return;
}
byte serialized[] = new byte[thriftLen];
@@ -70,12 +69,12 @@ public class ThriftDecoder extends FrameDecoder {
if (m.get_type() == HBServerMessageType.CONTROL_MESSAGE) {
ControlMessage cm = ControlMessage.read(m.get_data().get_message_blob());
- return cm;
+ out.add(cm);
} else if (m.get_type() == HBServerMessageType.SASL_MESSAGE_TOKEN) {
SaslMessageToken sm = SaslMessageToken.read(m.get_data().get_message_blob());
- return sm;
+ out.add(sm);
} else {
- return m;
+ out.add(m);
}
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftEncoder.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftEncoder.java b/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftEncoder.java
index d726159..9dffd04 100644
--- a/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftEncoder.java
+++ b/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftEncoder.java
@@ -12,59 +12,56 @@
package org.apache.storm.pacemaker.codec;
-import java.io.IOException;
+import java.util.List;
import org.apache.storm.generated.HBMessage;
import org.apache.storm.generated.HBMessageData;
import org.apache.storm.generated.HBServerMessageType;
import org.apache.storm.messaging.netty.ControlMessage;
import org.apache.storm.messaging.netty.INettySerializable;
import org.apache.storm.messaging.netty.SaslMessageToken;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBuffer;
-import org.apache.storm.shade.org.jboss.netty.buffer.ChannelBuffers;
-import org.apache.storm.shade.org.jboss.netty.channel.Channel;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelHandlerContext;
-import org.apache.storm.shade.org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
+import org.apache.storm.shade.io.netty.buffer.ByteBuf;
+import org.apache.storm.shade.io.netty.buffer.ByteBufAllocator;
+import org.apache.storm.shade.io.netty.buffer.Unpooled;
+import org.apache.storm.shade.io.netty.channel.ChannelHandlerContext;
+import org.apache.storm.shade.io.netty.handler.codec.MessageToMessageEncoder;
import org.apache.storm.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ThriftEncoder extends OneToOneEncoder {
+public class ThriftEncoder extends MessageToMessageEncoder<Object> {
private static final Logger LOG = LoggerFactory
.getLogger(ThriftEncoder.class);
- private HBMessage encodeNettySerializable(INettySerializable netty_message,
- HBServerMessageType mType) {
+ private HBMessage encodeNettySerializable(ByteBufAllocator alloc,
+ INettySerializable netty_message, HBServerMessageType mType) {
HBMessageData message_data = new HBMessageData();
HBMessage m = new HBMessage();
+ byte[] messageBuffer = new byte[netty_message.encodeLength()];
+ ByteBuf wrappedBuffer = Unpooled.wrappedBuffer(messageBuffer);
try {
- ChannelBuffer cbuffer = netty_message.buffer();
- if (cbuffer.hasArray()) {
- message_data.set_message_blob(cbuffer.array());
- } else {
- byte buff[] = new byte[netty_message.encodeLength()];
- cbuffer.readBytes(buff, 0, netty_message.encodeLength());
- message_data.set_message_blob(buff);
- }
+ netty_message.write(wrappedBuffer);
+
+ message_data.set_message_blob(messageBuffer);
m.set_type(mType);
m.set_data(message_data);
return m;
- } catch (IOException e) {
- LOG.error("Failed to encode NettySerializable: ", e);
- throw new RuntimeException(e);
+ } finally {
+ wrappedBuffer.release();
}
}
@Override
- protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) {
+ protected void encode(ChannelHandlerContext channelHandlerContext, Object msg, List<Object> out) throws Exception {
if (msg == null) {
- return null;
+ return;
}
LOG.debug("Trying to encode: " + msg.getClass().toString() + " : " + msg.toString());
HBMessage m;
+ ByteBufAllocator alloc = channelHandlerContext.alloc();
if (msg instanceof INettySerializable) {
INettySerializable nettyMsg = (INettySerializable) msg;
@@ -77,19 +74,19 @@ public class ThriftEncoder extends OneToOneEncoder {
LOG.error("Didn't recognise INettySerializable: " + nettyMsg.toString());
throw new RuntimeException("Unrecognized INettySerializable.");
}
- m = encodeNettySerializable(nettyMsg, type);
+ m = encodeNettySerializable(alloc, nettyMsg, type);
} else {
m = (HBMessage) msg;
}
try {
byte serialized[] = Utils.thriftSerialize(m);
- ChannelBuffer ret = ChannelBuffers.directBuffer(serialized.length + 4);
+ ByteBuf ret = alloc.ioBuffer(serialized.length + 4);
ret.writeInt(serialized.length);
ret.writeBytes(serialized);
- return ret;
+ out.add(ret);
} catch (RuntimeException e) {
LOG.error("Failed to serialize.", e);
throw e;
http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftNettyClientCodec.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftNettyClientCodec.java b/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftNettyClientCodec.java
index 853ce74..00e1bc9 100644
--- a/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftNettyClientCodec.java
+++ b/storm-client/src/jvm/org/apache/storm/pacemaker/codec/ThriftNettyClientCodec.java
@@ -19,25 +19,24 @@ import org.apache.storm.messaging.netty.SaslStormClientHandler;
import org.apache.storm.pacemaker.PacemakerClient;
import org.apache.storm.pacemaker.PacemakerClientHandler;
import org.apache.storm.security.auth.ClientAuthUtils;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelPipeline;
-import org.apache.storm.shade.org.jboss.netty.channel.ChannelPipelineFactory;
-import org.apache.storm.shade.org.jboss.netty.channel.Channels;
+import org.apache.storm.shade.io.netty.channel.Channel;
+import org.apache.storm.shade.io.netty.channel.ChannelInitializer;
+import org.apache.storm.shade.io.netty.channel.ChannelPipeline;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ThriftNettyClientCodec {
+public class ThriftNettyClientCodec extends ChannelInitializer<Channel> {
public static final String SASL_HANDLER = "sasl-handler";
public static final String KERBEROS_HANDLER = "kerberos-handler";
private static final Logger LOG = LoggerFactory
.getLogger(ThriftNettyClientCodec.class);
- ;
private final int thriftMessageMaxSize;
- private PacemakerClient client;
- private AuthMethod authMethod;
- private Map<String, Object> topoConf;
- private String host;
+ private final PacemakerClient client;
+ private final AuthMethod authMethod;
+ private final Map<String, Object> topoConf;
+ private final String host;
public ThriftNettyClientCodec(PacemakerClient pacemaker_client, Map<String, Object> topoConf,
AuthMethod authMethod, String host, int thriftMessageMaxSizeBytes) {
@@ -48,10 +47,9 @@ public class ThriftNettyClientCodec {
thriftMessageMaxSize = thriftMessageMaxSizeBytes;
}
- public ChannelPipelineFactory pipelineFactory() {
- return new ChannelPipelineFactory() {
- public ChannelPipeline getPipeline() {
- ChannelPipeline pipeline = Channels.pipeline();
+ @Override
+ protected void initChannel(Channel ch) throws Exception {
+ ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("encoder", new ThriftEncoder());
pipeline.addLast("decoder", new ThriftDecoder(thriftMessageMaxSize));
@@ -74,13 +72,10 @@ public class ThriftNettyClientCodec {
throw new RuntimeException(e);
}
} else {
- client.channelReady();
+ client.channelReady(ch);
}
pipeline.addLast("PacemakerClientHandler", new PacemakerClientHandler(client));
- return pipeline;
- }
- };
}
public enum AuthMethod {
http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java b/storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java
index 3d41766..540d691 100644
--- a/storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java
+++ b/storm-client/src/jvm/org/apache/storm/utils/TransferDrainer.java
@@ -1,3 +1,4 @@
+
/**
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership. The ASF licenses this file to you under the Apache License, Version
@@ -17,6 +18,7 @@ import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Map.Entry;
+import java.util.stream.Stream;
import org.apache.storm.generated.NodeInfo;
import org.apache.storm.messaging.IConnection;
import org.apache.storm.messaging.TaskMessage;
@@ -26,7 +28,7 @@ import org.slf4j.LoggerFactory;
public class TransferDrainer {
private static final Logger LOG = LoggerFactory.getLogger(TransferDrainer.class);
- private Map<Integer, ArrayList<TaskMessage>> bundles = new HashMap();
+ private final Map<Integer, ArrayList<TaskMessage>> bundles = new HashMap<>();
// Cache the msgs grouped by destination node
public void add(TaskMessage taskMsg) {
@@ -40,26 +42,24 @@ public class TransferDrainer {
}
public void send(Map<Integer, NodeInfo> taskToNode, Map<NodeInfo, IConnection> connections) {
- HashMap<NodeInfo, ArrayList<ArrayList<TaskMessage>>> bundleMapByDestination = groupBundleByDestination(taskToNode);
+ HashMap<NodeInfo, Stream<TaskMessage>> bundleMapByDestination = groupBundleByDestination(taskToNode);
- for (Map.Entry<NodeInfo, ArrayList<ArrayList<TaskMessage>>> entry : bundleMapByDestination.entrySet()) {
+ for (Map.Entry<NodeInfo, Stream<TaskMessage>> entry : bundleMapByDestination.entrySet()) {
NodeInfo node = entry.getKey();
IConnection conn = connections.get(node);
if (conn != null) {
- ArrayList<ArrayList<TaskMessage>> bundle = entry.getValue();
- Iterator<TaskMessage> iter = getBundleIterator(bundle);
- if (null != iter && iter.hasNext()) {
+ Iterator<TaskMessage> iter = entry.getValue().iterator();
+ if (iter.hasNext()) {
conn.send(iter);
}
- entry.getValue().clear();
} else {
LOG.warn("Connection not available for hostPort {}", node);
}
}
}
- private HashMap<NodeInfo, ArrayList<ArrayList<TaskMessage>>> groupBundleByDestination(Map<Integer, NodeInfo> taskToNode) {
- HashMap<NodeInfo, ArrayList<ArrayList<TaskMessage>>> result = new HashMap<>();
+ private HashMap<NodeInfo, Stream<TaskMessage>> groupBundleByDestination(Map<Integer, NodeInfo> taskToNode) {
+ HashMap<NodeInfo, Stream<TaskMessage>> result = new HashMap<>();
for (Entry<Integer, ArrayList<TaskMessage>> entry : bundles.entrySet()) {
if (entry.getValue().isEmpty()) {
@@ -67,12 +67,7 @@ public class TransferDrainer {
}
NodeInfo node = taskToNode.get(entry.getKey());
if (node != null) {
- ArrayList<ArrayList<TaskMessage>> msgs = result.get(node);
- if (msgs == null) {
- msgs = new ArrayList<>();
- result.put(node, msgs);
- }
- msgs.add(entry.getValue());
+ result.merge(node, entry.getValue().stream(), Stream::concat);
} else {
LOG.warn("No remote destination available for task {}", entry.getKey());
}
@@ -80,54 +75,6 @@ public class TransferDrainer {
return result;
}
- private Iterator<TaskMessage> getBundleIterator(final ArrayList<ArrayList<TaskMessage>> bundle) {
-
- if (null == bundle) {
- return null;
- }
-
- return new Iterator<TaskMessage>() {
-
- private int offset = 0;
- private int size = 0;
- private int bundleOffset = 0;
- private Iterator<TaskMessage> iter = bundle.get(bundleOffset).iterator();
-
- {
- for (ArrayList<TaskMessage> list : bundle) {
- size += list.size();
- }
- }
-
- @Override
- public boolean hasNext() {
- return offset < size;
- }
-
- @Override
- public TaskMessage next() {
- TaskMessage msg;
- if (iter.hasNext()) {
- msg = iter.next();
- } else {
- bundleOffset++;
- iter = bundle.get(bundleOffset).iterator();
- msg = iter.next();
- }
- if (null != msg) {
- offset++;
- }
- return msg;
- }
-
- @Override
- public void remove() {
- throw new RuntimeException("not supported");
- }
- };
- }
-
-
public void clear() {
for (ArrayList<TaskMessage> taskMessages : bundles.values()) {
taskMessages.clear();
http://git-wip-us.apache.org/repos/asf/storm/blob/e2308285/storm-client/test/jvm/org/apache/storm/PaceMakerStateStorageFactoryTest.java
----------------------------------------------------------------------
diff --git a/storm-client/test/jvm/org/apache/storm/PaceMakerStateStorageFactoryTest.java b/storm-client/test/jvm/org/apache/storm/PaceMakerStateStorageFactoryTest.java
index f4031f1..39db4ef 100644
--- a/storm-client/test/jvm/org/apache/storm/PaceMakerStateStorageFactoryTest.java
+++ b/storm-client/test/jvm/org/apache/storm/PaceMakerStateStorageFactoryTest.java
@@ -18,6 +18,10 @@
package org.apache.storm;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
@@ -30,18 +34,30 @@ import org.apache.storm.generated.HBPulse;
import org.apache.storm.generated.HBServerMessageType;
import org.apache.storm.pacemaker.PacemakerClient;
import org.apache.storm.pacemaker.PacemakerClientPool;
+import org.apache.storm.pacemaker.PacemakerConnectionException;
import org.apache.storm.utils.Utils;
import org.junit.Assert;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+@RunWith(MockitoJUnitRunner.class)
public class PaceMakerStateStorageFactoryTest {
- private PaceMakerClientProxy clientProxy;
+
+ @Captor
+ private ArgumentCaptor<HBMessage> hbMessageCaptor;
+
+ @Mock
+ private PacemakerClient clientMock;
private PacemakerClientPoolProxy clientPoolProxy;
private PaceMakerStateStorage stateStorage;
public void createPaceMakerStateStorage(HBServerMessageType messageType, HBMessageData messageData) throws Exception {
HBMessage response = new HBMessage(messageType, messageData);
- clientProxy = new PaceMakerClientProxy(response, null);
+ when(clientMock.send(any())).thenReturn(response);
clientPoolProxy = new PacemakerClientPoolProxy();
stateStorage = new PaceMakerStateStorage(clientPoolProxy, null);
}
@@ -50,7 +66,8 @@ public class PaceMakerStateStorageFactoryTest {
public void testSetWorkerHb() throws Exception {
createPaceMakerStateStorage(HBServerMessageType.SEND_PULSE_RESPONSE, null);
stateStorage.set_worker_hb("/foo", Utils.javaSerialize("data"), null);
- HBMessage sent = clientProxy.checkCaptured();
+ verify(clientMock).send(hbMessageCaptor.capture());
+ HBMessage sent = hbMessageCaptor.getValue();
HBPulse pulse = sent.get_data().get_pulse();
Assert.assertEquals(HBServerMessageType.SEND_PULSE, sent.get_type());
Assert.assertEquals("/foo", pulse.get_id());
@@ -67,7 +84,8 @@ public class PaceMakerStateStorageFactoryTest {
public void testDeleteWorkerHb() throws Exception {
createPaceMakerStateStorage(HBServerMessageType.DELETE_PATH_RESPONSE, null);
stateStorage.delete_worker_hb("/foo/bar");
- HBMessage sent = clientProxy.checkCaptured();
+ verify(clientMock).send(hbMessageCaptor.capture());
+ HBMessage sent = hbMessageCaptor.getValue();
Assert.assertEquals(HBServerMessageType.DELETE_PATH, sent.get_type());
Assert.assertEquals("/foo/bar", sent.get_data().get_path());
}
@@ -86,7 +104,8 @@ public class PaceMakerStateStorageFactoryTest {
hbPulse.set_details(Utils.serialize(cwh));
createPaceMakerStateStorage(HBServerMessageType.GET_PULSE_RESPONSE, HBMessageData.pulse(hbPulse));
stateStorage.get_worker_hb("/foo", false);
- HBMessage sent = clientProxy.checkCaptured();
+ verify(clientMock).send(hbMessageCaptor.capture());
+ HBMessage sent = hbMessageCaptor.getValue();
Assert.assertEquals(HBServerMessageType.GET_PULSE, sent.get_type());
Assert.assertEquals("/foo", sent.get_data().get_path());
}
@@ -107,7 +126,8 @@ public class PaceMakerStateStorageFactoryTest {
public void testGetWorkerHbChildren() throws Exception {
createPaceMakerStateStorage(HBServerMessageType.GET_ALL_NODES_FOR_PATH_RESPONSE, HBMessageData.nodes(new HBNodes()));
stateStorage.get_worker_hb_children("/foo", false);
- HBMessage sent = clientProxy.checkCaptured();
+ verify(clientMock).send(hbMessageCaptor.capture());
+ HBMessage sent = hbMessageCaptor.getValue();
Assert.assertEquals(HBServerMessageType.GET_ALL_NODES_FOR_PATH, sent.get_type());
Assert.assertEquals("/foo", sent.get_data().get_path());
}
@@ -118,42 +138,22 @@ public class PaceMakerStateStorageFactoryTest {
stateStorage.get_worker_hb_children("/foo", false);
}
- private class PaceMakerClientProxy extends PacemakerClient {
- private HBMessage response;
- private HBMessage captured;
-
- public PaceMakerClientProxy(HBMessage response, HBMessage captured) {
- this.response = response;
- this.captured = captured;
- }
-
- @Override
- public HBMessage send(HBMessage m) {
- captured = m;
- return response;
- }
-
- public HBMessage checkCaptured() {
- return captured;
- }
- }
-
private class PacemakerClientPoolProxy extends PacemakerClientPool {
public PacemakerClientPoolProxy() {
super(new HashMap<>());
}
public PacemakerClient getWriteClient() {
- return clientProxy;
+ return clientMock;
}
- public HBMessage send(HBMessage m) {
- return clientProxy.send(m);
+ public HBMessage send(HBMessage m) throws PacemakerConnectionException, InterruptedException {
+ return clientMock.send(m);
}
- public List<HBMessage> sendAll(HBMessage m) {
+ public List<HBMessage> sendAll(HBMessage m) throws PacemakerConnectionException, InterruptedException {
List<HBMessage> response = new ArrayList<>();
- response.add(clientProxy.send(m));
+ response.add(clientMock.send(m));
return response;
}
}