You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tez.apache.org by ab...@apache.org on 2021/04/23 10:09:30 UTC
[tez] branch branch-0.9 updated: TEZ-4157: ShuffleHandler: upgrade
to Netty4 and remove Netty3 dependency from tez (#118) (Laszlo Bodor
reviewed by Ashutosh Chauhan, Jonathan Turner Eagles)
This is an automated email from the ASF dual-hosted git repository.
abstractdog pushed a commit to branch branch-0.9
in repository https://gitbox.apache.org/repos/asf/tez.git
The following commit(s) were added to refs/heads/branch-0.9 by this push:
new 47914b5 TEZ-4157: ShuffleHandler: upgrade to Netty4 and remove Netty3 dependency from tez (#118) (Laszlo Bodor reviewed by Ashutosh Chauhan, Jonathan Turner Eagles)
47914b5 is described below
commit 47914b521f3a855cfa78db84eee75c100525d64b
Author: Bodor Laszlo <bo...@gmail.com>
AuthorDate: Fri Apr 23 11:43:31 2021 +0200
TEZ-4157: ShuffleHandler: upgrade to Netty4 and remove Netty3 dependency from tez (#118) (Laszlo Bodor reviewed by Ashutosh Chauhan, Jonathan Turner Eagles)
---
pom.xml | 36 +-
tez-ext-service-tests/pom.xml | 3 +-
.../apache/tez/shufflehandler/ShuffleHandler.java | 295 ++++++++-------
tez-plugins/tez-aux-services/pom.xml | 4 +-
.../tez/auxservices/FadvisedChunkedFile.java | 16 +-
.../apache/tez/auxservices/FadvisedFileRegion.java | 55 ++-
.../org/apache/tez/auxservices/ShuffleHandler.java | 395 ++++++++++-----------
.../apache/tez/auxservices/TestShuffleHandler.java | 176 +++++----
8 files changed, 497 insertions(+), 483 deletions(-)
diff --git a/pom.xml b/pom.xml
index d383a50..f03fc20 100644
--- a/pom.xml
+++ b/pom.xml
@@ -58,7 +58,7 @@
<clover.license>${user.home}/clover.license</clover.license>
<hadoop.version>2.7.2</hadoop.version>
- <netty.version>3.6.2.Final</netty.version>
+ <netty.version>4.0.52.Final</netty.version>
<pig.version>0.13.0</pig.version>
<slf4j.version>1.7.30</slf4j.version>
<protobuf.version>2.5.0</protobuf.version>
@@ -254,7 +254,7 @@
</dependency>
<dependency>
<groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
+ <artifactId>netty-all</artifactId>
<scope>compile</scope>
<version>${netty.version}</version>
</dependency>
@@ -333,12 +333,22 @@
<groupId>commons-el</groupId>
<artifactId>commons-el</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-auth</artifactId>
<version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
@@ -555,6 +565,10 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-common</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -571,6 +585,10 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-shuffle</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -596,6 +614,10 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-common</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
@@ -604,6 +626,12 @@
<scope>test</scope>
<type>test-jar</type>
<version>${hadoop.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
@@ -642,6 +670,10 @@
<groupId>tomcat</groupId>
<artifactId>jasper-runtime</artifactId>
</exclusion>
+ <exclusion>
+ <groupId>io.netty</groupId>
+ <artifactId>netty</artifactId>
+ </exclusion>
</exclusions>
</dependency>
<dependency>
diff --git a/tez-ext-service-tests/pom.xml b/tez-ext-service-tests/pom.xml
index 15067d0..d16127b 100644
--- a/tez-ext-service-tests/pom.xml
+++ b/tez-ext-service-tests/pom.xml
@@ -32,8 +32,7 @@
<dependencies>
<dependency>
<groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
- <version>3.6.2.Final</version>
+ <artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
diff --git a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java
index 49dffe9..43f24ba 100644
--- a/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java
+++ b/tez-ext-service-tests/src/test/java/org/apache/tez/shufflehandler/ShuffleHandler.java
@@ -14,17 +14,17 @@
package org.apache.tez.shufflehandler;
-import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import static io.netty.buffer.Unpooled.wrappedBuffer;
+import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpMethod.GET;
+import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
+import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
+import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import javax.crypto.SecretKey;
import java.io.File;
@@ -41,15 +41,15 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import com.google.common.base.Charsets;
+
import org.apache.tez.common.Preconditions;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
@@ -64,36 +64,38 @@ import org.apache.tez.mapreduce.hadoop.MRConfig;
import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
import org.apache.tez.runtime.library.common.shuffle.orderedgrouped.ShuffleHeader;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.DefaultFileRegion;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
-import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
-import org.jboss.netty.handler.codec.http.HttpHeaders;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.codec.http.QueryStringDecoder;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-import org.jboss.netty.util.CharsetUtil;
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.DefaultFileRegion;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpRequestDecoder;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseEncoder;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.QueryStringDecoder;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.util.CharsetUtil;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -110,9 +112,13 @@ public class ShuffleHandler {
Pattern.CASE_INSENSITIVE);
private int port;
- private final ChannelFactory selector;
- private final ChannelGroup accepted = new DefaultChannelGroup();
- protected HttpPipelineFactory pipelineFact;
+
+ // pipeline items
+ private Shuffle SHUFFLE;
+
+ private NioEventLoopGroup bossGroup;
+ private NioEventLoopGroup workerGroup;
+ private final ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private final Configuration conf;
private final ConcurrentMap<String, Boolean> registeredApps = new ConcurrentHashMap<String, Boolean>();
@@ -171,17 +177,23 @@ public class ShuffleHandler {
maxShuffleThreads = 2 * Runtime.getRuntime().availableProcessors();
}
- ThreadFactory bossFactory = new ThreadFactoryBuilder()
- .setNameFormat("ShuffleHandler Netty Boss #%d")
- .build();
- ThreadFactory workerFactory = new ThreadFactoryBuilder()
- .setNameFormat("ShuffleHandler Netty Worker #%d")
- .build();
-
- selector = new NioServerSocketChannelFactory(
- Executors.newCachedThreadPool(bossFactory),
- Executors.newCachedThreadPool(workerFactory),
- maxShuffleThreads);
+ final String BOSS_THREAD_NAME_PREFIX = "ShuffleHandler Netty Boss #";
+ AtomicInteger bossThreadCounter = new AtomicInteger(0);
+ bossGroup = new NioEventLoopGroup(maxShuffleThreads, new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, BOSS_THREAD_NAME_PREFIX + bossThreadCounter.incrementAndGet());
+ }
+ });
+
+ final String WORKER_THREAD_NAME_PREFIX = "ShuffleHandler Netty Worker #";
+ AtomicInteger workerThreadCounter = new AtomicInteger(0);
+ workerGroup = new NioEventLoopGroup(maxShuffleThreads, new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, WORKER_THREAD_NAME_PREFIX + workerThreadCounter.incrementAndGet());
+ }
+ });
connectionKeepAliveEnabled =
conf.getBoolean(SHUFFLE_CONNECTION_KEEP_ALIVE_ENABLED,
@@ -199,22 +211,44 @@ public class ShuffleHandler {
public void start() throws Exception {
- ServerBootstrap bootstrap = new ServerBootstrap(selector);
- try {
- pipelineFact = new HttpPipelineFactory(conf);
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- bootstrap.setPipelineFactory(pipelineFact);
+ ServerBootstrap bootstrap = new ServerBootstrap()
+ .channel(NioServerSocketChannel.class)
+ .group(bossGroup, workerGroup)
+ .localAddress(port);
+ initPipeline(bootstrap, conf);
port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
- Channel ch = bootstrap.bind(new InetSocketAddress(port));
+ Channel ch = bootstrap.bind().sync().channel();
accepted.add(ch);
- port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
+ port = ((InetSocketAddress)ch.localAddress()).getPort();
conf.set(SHUFFLE_PORT_CONFIG_KEY, Integer.toString(port));
- pipelineFact.SHUFFLE.setPort(port);
+ SHUFFLE.setPort(port);
LOG.info("TezShuffleHandler" + " listening on port " + port);
}
+ private void initPipeline(ServerBootstrap bootstrap, Configuration conf) throws Exception {
+ SHUFFLE = getShuffle(conf);
+
+ if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
+ MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) {
+ throw new UnsupportedOperationException(
+ "SSL Shuffle is not currently supported for the test shuffle handler");
+ }
+
+ ChannelInitializer<NioSocketChannel> channelInitializer =
+ new ChannelInitializer<NioSocketChannel>() {
+ @Override
+ public void initChannel(NioSocketChannel ch) throws Exception {
+ ChannelPipeline pipeline = ch.pipeline();
+ pipeline.addLast("decoder", new HttpRequestDecoder());
+ pipeline.addLast("aggregator", new HttpObjectAggregator(1 << 16));
+ pipeline.addLast("encoder", new HttpResponseEncoder());
+ pipeline.addLast("chunking", new ChunkedWriteHandler());
+ pipeline.addLast("shuffle", SHUFFLE);
+ }
+ };
+ bootstrap.childHandler(channelInitializer);
+ }
+
public static void initializeAndStart(Configuration conf) throws Exception {
if (!initing.getAndSet(true)) {
INSTANCE = new ShuffleHandler(conf);
@@ -245,15 +279,13 @@ public class ShuffleHandler {
removeJobShuffleInfo(applicationIdString);
}
-
public void stop() throws Exception {
accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
- if (selector != null) {
- ServerBootstrap bootstrap = new ServerBootstrap(selector);
- bootstrap.releaseExternalResources();
+ if (bossGroup != null) {
+ bossGroup.shutdownGracefully();
}
- if (pipelineFact != null) {
- pipelineFact.destroy();
+ if (workerGroup != null) {
+ workerGroup.shutdownGracefully();
}
}
@@ -261,7 +293,6 @@ public class ShuffleHandler {
return new Shuffle(conf);
}
-
private void addJobToken(String appIdString, String user,
Token<JobTokenIdentifier> jobToken) {
String jobIdString = appIdString.replace("application", "job");
@@ -280,40 +311,8 @@ public class ShuffleHandler {
userRsrc.remove(appIdString);
}
- class HttpPipelineFactory implements ChannelPipelineFactory {
-
- final Shuffle SHUFFLE;
-
- public HttpPipelineFactory(Configuration conf) throws Exception {
- SHUFFLE = getShuffle(conf);
- // TODO Setup SSL Shuffle
- if (conf.getBoolean(MRConfig.SHUFFLE_SSL_ENABLED_KEY,
- MRConfig.SHUFFLE_SSL_ENABLED_DEFAULT)) {
- throw new UnsupportedOperationException(
- "SSL Shuffle is not currently supported for the test shuffle handler");
- }
- }
-
- public void destroy() {
- }
-
- @Override
- public ChannelPipeline getPipeline() throws Exception {
- ChannelPipeline pipeline = Channels.pipeline();
- pipeline.addLast("decoder", new HttpRequestDecoder());
- pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
- pipeline.addLast("encoder", new HttpResponseEncoder());
- pipeline.addLast("chunking", new ChunkedWriteHandler());
- pipeline.addLast("shuffle", SHUFFLE);
- return pipeline;
- // TODO factor security manager into pipeline
- // TODO factor out encode/decode to permit binary shuffle
- // TODO factor out decode of index to permit alt. models
- }
-
- }
-
- class Shuffle extends SimpleChannelUpstreamHandler {
+ @Sharable
+ class Shuffle extends ChannelInboundHandlerAdapter {
private final Configuration conf;
private final IndexCache indexCache;
@@ -343,37 +342,36 @@ public class ShuffleHandler {
}
@Override
- public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt)
+ public void channelActive(ChannelHandlerContext ctx)
throws Exception {
+
if ((maxShuffleConnections > 0) && (accepted.size() >= maxShuffleConnections)) {
- LOG.info(String.format("Current number of shuffle connections (%d) is " +
- "greater than or equal to the max allowed shuffle connections (%d)",
+ LOG.info(String.format("Current number of shuffle connections (%d) is " +
+ "greater than or equal to the max allowed shuffle connections (%d)",
accepted.size(), maxShuffleConnections));
- evt.getChannel().close();
+ ctx.channel().close();
return;
}
- accepted.add(evt.getChannel());
- super.channelOpen(ctx, evt);
-
+ accepted.add(ctx.channel());
+ super.channelActive(ctx);
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
+ public void channelRead(ChannelHandlerContext ctx, Object message)
throws Exception {
- HttpRequest request = (HttpRequest) evt.getMessage();
+ HttpRequest request = (HttpRequest) message;
if (request.getMethod() != GET) {
sendError(ctx, METHOD_NOT_ALLOWED);
return;
}
// Check whether the shuffle version is compatible
if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(
- request.getHeader(ShuffleHeader.HTTP_HEADER_NAME))
+ request.headers().get(ShuffleHeader.HTTP_HEADER_NAME))
|| !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(
- request.getHeader(ShuffleHeader.HTTP_HEADER_VERSION))) {
+ request.headers().get(ShuffleHeader.HTTP_HEADER_VERSION))) {
sendError(ctx, "Incompatible shuffle request version", BAD_REQUEST);
}
- final Map<String,List<String>> q =
- new QueryStringDecoder(request.getUri()).getParameters();
+ final Map<String, List<String>> q = new QueryStringDecoder(request.getUri()).parameters();
final List<String> keepAliveList = q.get("keepAlive");
boolean keepAliveParam = false;
if (keepAliveList != null && keepAliveList.size() == 1) {
@@ -432,7 +430,7 @@ public class ShuffleHandler {
Map<String, MapOutputInfo> mapOutputInfoMap =
new HashMap<String, MapOutputInfo>();
- Channel ch = evt.getChannel();
+ Channel ch = ctx.channel();
String user = userRsrc.get(jobId);
// $x/$user/appcache/$appId/output/$mapId
@@ -444,13 +442,13 @@ public class ShuffleHandler {
populateHeaders(mapIds, outputBasePathStr, user, reduceId, request,
response, keepAliveParam, mapOutputInfoMap);
} catch(IOException e) {
- ch.write(response);
+ ch.writeAndFlush(response);
LOG.error("Shuffle error in populating headers :", e);
String errorMessage = getErrorMessage(e);
sendError(ctx,errorMessage , INTERNAL_SERVER_ERROR);
return;
}
- ch.write(response);
+ ch.writeAndFlush(response);
// TODO refactor the following into the pipeline
ChannelFuture lastMap = null;
for (String mapId : mapIds) {
@@ -551,12 +549,12 @@ public class ShuffleHandler {
boolean keepAliveParam, long contentLength) {
if (!connectionKeepAliveEnabled && !keepAliveParam) {
LOG.info("Setting connection close header...");
- response.setHeader(HttpHeaders.Names.CONNECTION, CONNECTION_CLOSE);
+ response.headers().set(HttpHeaders.Names.CONNECTION, CONNECTION_CLOSE);
} else {
- response.setHeader(HttpHeaders.Names.CONTENT_LENGTH,
+ response.headers().set(HttpHeaders.Names.CONTENT_LENGTH,
String.valueOf(contentLength));
- response.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
- response.setHeader(HttpHeaders.Values.KEEP_ALIVE, "timeout="
+ response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+ response.headers().set(HttpHeaders.Values.KEEP_ALIVE, "timeout="
+ connectionKeepAliveTimeOut);
LOG.info("Content Length in shuffle : " + contentLength);
}
@@ -584,7 +582,7 @@ public class ShuffleHandler {
String enc_str = SecureShuffleUtils.buildMsgFrom(requestUri);
// hash from the fetcher
String urlHashStr =
- request.getHeader(SecureShuffleUtils.HTTP_HEADER_URL_HASH);
+ request.headers().get(SecureShuffleUtils.HTTP_HEADER_URL_HASH);
if (urlHashStr == null) {
LOG.info("Missing header hash for " + appid);
throw new IOException("fetcher cannot be authenticated");
@@ -600,11 +598,11 @@ public class ShuffleHandler {
String reply =
SecureShuffleUtils.generateHash(urlHashStr.getBytes(Charsets.UTF_8),
tokenSecret);
- response.setHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
+ response.headers().set(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
// Put shuffle version into http header
- response.setHeader(ShuffleHeader.HTTP_HEADER_NAME,
+ response.headers().set(ShuffleHeader.HTTP_HEADER_NAME,
ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
- response.setHeader(ShuffleHeader.HTTP_HEADER_VERSION,
+ response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
if (LOG.isDebugEnabled()) {
int len = reply.length();
@@ -621,7 +619,7 @@ public class ShuffleHandler {
new ShuffleHeader(mapId, info.getPartLength(), info.getRawLength(), reduce);
final DataOutputBuffer dob = new DataOutputBuffer();
header.write(dob);
- ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
final File spillfile =
new File(mapOutputInfo.mapOutputFileName.toString());
RandomAccessFile spill;
@@ -634,15 +632,7 @@ public class ShuffleHandler {
ChannelFuture writeFuture;
final DefaultFileRegion partition =
new DefaultFileRegion(spill.getChannel(), info.getStartOffset(), info.getPartLength());
- writeFuture = ch.write(partition);
- writeFuture.addListener(new ChannelFutureListener() {
- // TODO error handling; distinguish IO/connection failures,
- // attribute to appropriate spill output
- @Override
- public void operationComplete(ChannelFuture future) {
- partition.releaseExternalResources();
- }
- });
+ writeFuture = ch.writeAndFlush(partition);
return writeFuture;
}
@@ -653,25 +643,22 @@ public class ShuffleHandler {
protected void sendError(ChannelHandlerContext ctx, String message,
HttpResponseStatus status) {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
- response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
+ FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status);
+ response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
// Put shuffle version into http header
- response.setHeader(ShuffleHeader.HTTP_HEADER_NAME,
+ response.headers().set(ShuffleHeader.HTTP_HEADER_NAME,
ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
- response.setHeader(ShuffleHeader.HTTP_HEADER_VERSION,
+ response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
- response.setContent(
- ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
+ response.content().writeBytes(Unpooled.copiedBuffer(message, CharsetUtil.UTF_8));
// Close the connection as soon as the error message is sent.
- ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+ ctx.channel().writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
- Channel ch = e.getChannel();
- Throwable cause = e.getCause();
if (cause instanceof TooLongFrameException) {
sendError(ctx, BAD_REQUEST);
return;
@@ -688,8 +675,8 @@ public class ShuffleHandler {
}
LOG.error("Shuffle error: ", cause);
- if (ch.isConnected()) {
- LOG.error("Shuffle error " + e);
+ if (ctx.channel().isActive()) {
+ LOG.error("Shuffle error", cause);
sendError(ctx, INTERNAL_SERVER_ERROR);
}
}
diff --git a/tez-plugins/tez-aux-services/pom.xml b/tez-plugins/tez-aux-services/pom.xml
index a0f4f94..202afc4 100644
--- a/tez-plugins/tez-aux-services/pom.xml
+++ b/tez-plugins/tez-aux-services/pom.xml
@@ -112,7 +112,7 @@
</dependency>
<dependency>
<groupId>io.netty</groupId>
- <artifactId>netty</artifactId>
+ <artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
@@ -237,7 +237,7 @@
<shadedPattern>org.apache.tez.shaded.$0</shadedPattern>
</relocation>
<relocation>
- <pattern>org.jboss.netty</pattern>
+ <pattern>io.netty</pattern>
<shadedPattern>org.apache.tez.shaded.$0</shadedPattern>
</relocation>
<relocation>
diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedChunkedFile.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedChunkedFile.java
index cc3f762..162feb9 100644
--- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedChunkedFile.java
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedChunkedFile.java
@@ -31,7 +31,9 @@ import org.slf4j.LoggerFactory;
import static org.apache.hadoop.io.nativeio.NativeIO.POSIX.POSIX_FADV_DONTNEED;
-import org.jboss.netty.handler.stream.ChunkedFile;
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.stream.ChunkedFile;
public class FadvisedChunkedFile extends ChunkedFile {
@@ -57,13 +59,13 @@ public class FadvisedChunkedFile extends ChunkedFile {
}
@Override
- public Object nextChunk() throws Exception {
+ public ByteBuf readChunk(ChannelHandlerContext ctx) throws Exception {
if (manageOsCache && readaheadPool != null) {
readaheadRequest = readaheadPool
- .readaheadStream(identifier, fd, getCurrentOffset(), readaheadLength,
- getEndOffset(), readaheadRequest);
+ .readaheadStream(identifier, fd, currentOffset(), readaheadLength,
+ endOffset(), readaheadRequest);
}
- return super.nextChunk();
+ return super.readChunk(ctx);
}
@Override
@@ -71,11 +73,11 @@ public class FadvisedChunkedFile extends ChunkedFile {
if (readaheadRequest != null) {
readaheadRequest.cancel();
}
- if (manageOsCache && getEndOffset() - getStartOffset() > 0) {
+ if (manageOsCache && endOffset() - startOffset() > 0) {
try {
NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
fd,
- getStartOffset(), getEndOffset() - getStartOffset(),
+ startOffset(), endOffset() - startOffset(),
POSIX_FADV_DONTNEED);
} catch (Throwable t) {
LOG.warn("Failed to manage OS cache for " + identifier, t);
diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedFileRegion.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedFileRegion.java
index 40789d8..2366363 100644
--- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedFileRegion.java
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/FadvisedFileRegion.java
@@ -34,7 +34,7 @@ import org.slf4j.LoggerFactory;
import static org.apache.hadoop.io.nativeio.NativeIO.POSIX.POSIX_FADV_DONTNEED;
-import org.jboss.netty.channel.DefaultFileRegion;
+import io.netty.channel.DefaultFileRegion;
import com.google.common.annotations.VisibleForTesting;
@@ -54,6 +54,7 @@ public class FadvisedFileRegion extends DefaultFileRegion {
private final FileChannel fileChannel;
private ReadaheadRequest readaheadRequest;
+ private boolean transferred = false;
public FadvisedFileRegion(RandomAccessFile file, long position, long count,
boolean manageOsCache, int readaheadLength, ReadaheadPool readaheadPool,
@@ -77,15 +78,40 @@ public class FadvisedFileRegion extends DefaultFileRegion {
throws IOException {
if (readaheadPool != null && readaheadLength > 0) {
readaheadRequest = readaheadPool.readaheadStream(identifier, fd,
- getPosition() + position, readaheadLength,
- getPosition() + getCount(), readaheadRequest);
+ position() + position, readaheadLength,
+ position() + count(), readaheadRequest);
}
+ long written = 0;
if(this.shuffleTransferToAllowed) {
- return super.transferTo(target, position);
+ written = super.transferTo(target, position);
} else {
- return customShuffleTransfer(target, position);
+ written = customShuffleTransfer(target, position);
}
+ /*
+ * At this point, we can assume that the transfer was successful.
+ */
+ transferred = true;
+ return written;
+ }
+
+ /**
+ * Since Netty4, deallocate() is called automatically during cleanup, but before the
+ * ChannelFutureListeners. Deallocate calls FileChannel.close() and makes the file descriptor
+ * invalid, so every OS cache operation (e.g. posix_fadvice) with the original file descriptor
+ * will fail after this operation, so we need to take care of cleanup operations here (before
+ * deallocating) instead of listeners outside.
+ */
+ @Override
+ protected void deallocate() {
+ if (readaheadRequest != null) {
+ readaheadRequest.cancel();
+ }
+
+ if (transferred) {
+ transferSuccessful();
+ }
+ super.deallocate();
}
/**
@@ -142,24 +168,19 @@ public class FadvisedFileRegion extends DefaultFileRegion {
return actualCount - trans;
}
-
- @Override
- public void releaseExternalResources() {
- if (readaheadRequest != null) {
- readaheadRequest.cancel();
- }
- super.releaseExternalResources();
- }
-
/**
* Call when the transfer completes successfully so we can advise the OS that
* we don't need the region to be cached anymore.
*/
public void transferSuccessful() {
- if (manageOsCache && getCount() > 0) {
+ if (manageOsCache && count() > 0) {
try {
- NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier,
- fd, getPosition(), getCount(), POSIX_FADV_DONTNEED);
+ if (fd.valid()) {
+ NativeIO.POSIX.getCacheManipulator().posixFadviseIfPossible(identifier, fd, position(),
+ count(), POSIX_FADV_DONTNEED);
+ } else {
+ LOG.debug("File descriptor is not valid anymore, skipping posix_fadvise: " + identifier);
+ }
} catch (Throwable t) {
LOG.warn("Failed to manage OS cache for " + identifier, t);
}
diff --git a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
index c4c2970..c56a0a8 100644
--- a/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/main/java/org/apache/tez/auxservices/ShuffleHandler.java
@@ -21,17 +21,17 @@ package org.apache.tez.auxservices;
import org.apache.hadoop.util.DiskChecker;
import static org.fusesource.leveldbjni.JniDBFactory.asString;
import static org.fusesource.leveldbjni.JniDBFactory.bytes;
-import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
-import static org.jboss.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
-import static org.jboss.netty.handler.codec.http.HttpMethod.GET;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import static io.netty.buffer.Unpooled.wrappedBuffer;
+import static io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
+import static io.netty.handler.codec.http.HttpMethod.GET;
+import static io.netty.handler.codec.http.HttpResponseStatus.BAD_REQUEST;
+import static io.netty.handler.codec.http.HttpResponseStatus.FORBIDDEN;
+import static io.netty.handler.codec.http.HttpResponseStatus.INTERNAL_SERVER_ERROR;
+import static io.netty.handler.codec.http.HttpResponseStatus.METHOD_NOT_ALLOWED;
+import static io.netty.handler.codec.http.HttpResponseStatus.NOT_FOUND;
+import static io.netty.handler.codec.http.HttpResponseStatus.OK;
+import static io.netty.handler.codec.http.HttpResponseStatus.UNAUTHORIZED;
+import static io.netty.handler.codec.http.HttpVersion.HTTP_1_1;
import java.io.File;
import java.io.FileNotFoundException;
@@ -48,7 +48,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
@@ -100,46 +100,47 @@ import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBException;
import org.iq80.leveldb.Logger;
import org.iq80.leveldb.Options;
-import org.jboss.netty.bootstrap.ServerBootstrap;
-import org.jboss.netty.buffer.ChannelBuffers;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFactory;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-import org.jboss.netty.channel.ChannelHandler;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.ChannelPipelineFactory;
-import org.jboss.netty.channel.ChannelStateEvent;
-import org.jboss.netty.channel.Channels;
-import org.jboss.netty.channel.ExceptionEvent;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
-import org.jboss.netty.channel.group.ChannelGroup;
-import org.jboss.netty.channel.group.DefaultChannelGroup;
-import org.jboss.netty.channel.socket.nio.NioServerBossPool;
-import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
-import org.jboss.netty.channel.socket.nio.NioWorkerPool;
-import org.jboss.netty.handler.codec.frame.TooLongFrameException;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
-import org.jboss.netty.handler.codec.http.HttpChunkAggregator;
-import org.jboss.netty.handler.codec.http.HttpHeaders;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpRequestDecoder;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponseEncoder;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.codec.http.QueryStringDecoder;
-import org.jboss.netty.handler.ssl.SslHandler;
-import org.jboss.netty.handler.stream.ChunkedWriteHandler;
-import org.jboss.netty.handler.timeout.IdleState;
-import org.jboss.netty.handler.timeout.IdleStateAwareChannelHandler;
-import org.jboss.netty.handler.timeout.IdleStateEvent;
-import org.jboss.netty.handler.timeout.IdleStateHandler;
-import org.jboss.netty.util.CharsetUtil;
-import org.jboss.netty.util.HashedWheelTimer;
-import org.jboss.netty.util.ThreadNameDeterminer;
-import org.jboss.netty.util.Timer;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler.Sharable;
+import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.group.ChannelGroup;
+import io.netty.channel.group.DefaultChannelGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.handler.codec.http.DefaultFullHttpResponse;
+import io.netty.handler.codec.http.DefaultHttpResponse;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.FullHttpResponse;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpObjectAggregator;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpRequestDecoder;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseEncoder;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.LastHttpContent;
+import io.netty.handler.codec.http.QueryStringDecoder;
+import io.netty.handler.ssl.SslHandler;
+import io.netty.handler.stream.ChunkedWriteHandler;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.handler.timeout.IdleStateHandler;
+import io.netty.util.CharsetUtil;
+import io.netty.util.concurrent.GlobalEventExecutor;
+
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
@@ -180,11 +181,15 @@ public class ShuffleHandler extends AuxiliaryService {
private static final String INDEX_FILE_NAME = "file.out.index";
private int port;
- private ChannelFactory selector;
- private final ChannelGroup accepted = new DefaultChannelGroup();
- protected HttpPipelineFactory pipelineFact;
+ private NioEventLoopGroup bossGroup;
+ private NioEventLoopGroup workerGroup;
+ private final ChannelGroup accepted = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
private int sslFileBufferSize;
+ // pipeline items
+ private Shuffle SHUFFLE;
+ private SSLFactory sslFactory;
+
/**
* Should the shuffle use posix_fadvise calls to manage the OS cache during
* sendfile
@@ -260,7 +265,6 @@ public class ShuffleHandler extends AuxiliaryService {
boolean connectionKeepAliveEnabled = false;
private int connectionKeepAliveTimeOut;
private int mapOutputMetaInfoCacheSize;
- private Timer timer;
@Metrics(about="Shuffle output metrics", context="mapred", name="tez")
static class ShuffleMetrics implements ChannelFutureListener {
@@ -297,7 +301,7 @@ public class ShuffleHandler extends AuxiliaryService {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
- future.getChannel().close();
+ future.channel().close();
return;
}
int waitCount = this.reduceContext.getMapsToWait().decrementAndGet();
@@ -305,21 +309,21 @@ public class ShuffleHandler extends AuxiliaryService {
metrics.operationComplete(future);
// Let the idle timer handler close keep-alive connections
if (reduceContext.getKeepAlive()) {
- ChannelPipeline pipeline = future.getChannel().getPipeline();
+ ChannelPipeline pipeline = future.channel().pipeline();
TimeoutHandler timeoutHandler =
(TimeoutHandler) pipeline.get(TIMEOUT_HANDLER);
timeoutHandler.setEnabledTimeout(true);
} else {
- future.getChannel().close();
+ future.channel().close();
}
} else {
- pipelineFact.getSHUFFLE().sendMap(reduceContext);
+ SHUFFLE.sendMap(reduceContext);
}
}
}
/**
- * Maintain parameters per messageReceived() Netty context.
+ * Maintain parameters per channelRead() Netty context.
* Allows sendMapOutput calls from operationComplete()
*/
private static class ReduceContext {
@@ -416,9 +420,11 @@ public class ShuffleHandler extends AuxiliaryService {
*/
public static ByteBuffer serializeMetaData(int port) throws IOException {
//TODO these bytes should be versioned
- DataOutputBuffer port_dob = new DataOutputBuffer();
- port_dob.writeInt(port);
- return ByteBuffer.wrap(port_dob.getData(), 0, port_dob.getLength());
+ DataOutputBuffer portDob = new DataOutputBuffer();
+ portDob.writeInt(port);
+ ByteBuffer buf = ByteBuffer.wrap(portDob.getData(), 0, portDob.getLength());
+ portDob.close();
+ return buf;
}
/**
@@ -431,6 +437,7 @@ public class ShuffleHandler extends AuxiliaryService {
DataInputByteBuffer in = new DataInputByteBuffer();
in.reset(meta);
int port = in.readInt();
+ in.close();
return port;
}
@@ -513,22 +520,23 @@ public class ShuffleHandler extends AuxiliaryService {
DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES);
final String BOSS_THREAD_NAME_PREFIX = "Tez Shuffle Handler Boss #";
- NioServerBossPool bossPool = new NioServerBossPool(Executors.newCachedThreadPool(), 1, new ThreadNameDeterminer() {
+ AtomicInteger bossThreadCounter = new AtomicInteger(0);
+ bossGroup = new NioEventLoopGroup(maxShuffleThreads, new ThreadFactory() {
@Override
- public String determineThreadName(String currentThreadName, String proposedThreadName) throws Exception {
- return BOSS_THREAD_NAME_PREFIX + currentThreadName.substring(currentThreadName.lastIndexOf('-') + 1);
+ public Thread newThread(Runnable r) {
+ return new Thread(r, BOSS_THREAD_NAME_PREFIX + bossThreadCounter.incrementAndGet());
}
});
final String WORKER_THREAD_NAME_PREFIX = "Tez Shuffle Handler Worker #";
- NioWorkerPool workerPool = new NioWorkerPool(Executors.newCachedThreadPool(), maxShuffleThreads, new ThreadNameDeterminer() {
+ AtomicInteger workerThreadCounter = new AtomicInteger(0);
+ workerGroup = new NioEventLoopGroup(maxShuffleThreads, new ThreadFactory() {
@Override
- public String determineThreadName(String currentThreadName, String proposedThreadName) throws Exception {
- return WORKER_THREAD_NAME_PREFIX + currentThreadName.substring(currentThreadName.lastIndexOf('-') + 1);
+ public Thread newThread(Runnable r) {
+ return new Thread(r, WORKER_THREAD_NAME_PREFIX + workerThreadCounter.incrementAndGet());
}
});
- selector = new NioServerSocketChannelFactory(bossPool, workerPool);
super.serviceInit(new YarnConfiguration(conf));
}
@@ -539,25 +547,24 @@ public class ShuffleHandler extends AuxiliaryService {
userRsrc = new ConcurrentHashMap<String,String>();
secretManager = new JobTokenSecretManager();
recoverState(conf);
- ServerBootstrap bootstrap = new ServerBootstrap(selector);
- // Timer is shared across entire factory and must be released separately
- timer = new HashedWheelTimer();
- try {
- pipelineFact = new HttpPipelineFactory(conf, timer);
- } catch (Exception ex) {
- throw new RuntimeException(ex);
- }
- bootstrap.setOption("backlog", conf.getInt(SHUFFLE_LISTEN_QUEUE_SIZE,
- DEFAULT_SHUFFLE_LISTEN_QUEUE_SIZE));
- bootstrap.setOption("child.keepAlive", true);
- bootstrap.setPipelineFactory(pipelineFact);
+ ServerBootstrap bootstrap = new ServerBootstrap()
+ .channel(NioServerSocketChannel.class)
+ .group(bossGroup, workerGroup)
+ .localAddress(port)
+ .option(ChannelOption.SO_BACKLOG,
+ conf.getInt(SHUFFLE_LISTEN_QUEUE_SIZE, DEFAULT_SHUFFLE_LISTEN_QUEUE_SIZE))
+ .childOption(ChannelOption.SO_KEEPALIVE, true);
+ initPipeline(bootstrap, conf);
port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
- Channel ch = bootstrap.bind(new InetSocketAddress(port));
+ Channel ch = bootstrap.bind().sync().channel();
accepted.add(ch);
- port = ((InetSocketAddress)ch.getLocalAddress()).getPort();
+
+ // setup port
+ port = ((InetSocketAddress)ch.localAddress()).getPort();
conf.set(SHUFFLE_PORT_CONFIG_KEY, Integer.toString(port));
- pipelineFact.SHUFFLE.setPort(port);
+ SHUFFLE.setPort(port);
LOG.info(getName() + " listening on port " + port);
+
super.serviceStart();
sslFileBufferSize = conf.getInt(SUFFLE_SSL_FILE_BUFFER_SIZE_KEY,
@@ -573,20 +580,50 @@ public class ShuffleHandler extends AuxiliaryService {
DEFAULT_SHUFFLE_MAPOUTPUT_META_INFO_CACHE_SIZE));
}
+ private void initPipeline(ServerBootstrap bootstrap, Configuration conf) throws Exception {
+ SHUFFLE = getShuffle(conf);
+ if (conf.getBoolean(SHUFFLE_SSL_ENABLED_KEY, SHUFFLE_SSL_ENABLED_DEFAULT)) {
+ LOG.info("Encrypted shuffle is enabled.");
+ sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
+ sslFactory.init();
+ }
+
+ ChannelInitializer<NioSocketChannel> channelInitializer =
+ new ChannelInitializer<NioSocketChannel>() {
+ @Override
+ public void initChannel(NioSocketChannel ch) throws Exception {
+ ChannelPipeline pipeline = ch.pipeline();
+ if (sslFactory != null) {
+ pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
+ }
+ pipeline.addLast("decoder", new HttpRequestDecoder());
+ pipeline.addLast("aggregator", new HttpObjectAggregator(1 << 16));
+ pipeline.addLast("encoder", new HttpResponseEncoder());
+ pipeline.addLast("chunking", new ChunkedWriteHandler());
+ pipeline.addLast("shuffle", SHUFFLE);
+ pipeline.addLast("idle", new IdleStateHandler(0, connectionKeepAliveTimeOut, 0));
+ pipeline.addLast(TIMEOUT_HANDLER, new TimeoutHandler());
+ }
+ };
+ bootstrap.childHandler(channelInitializer);
+ }
+
+ private void destroyPipeline() {
+ if (sslFactory != null) {
+ sslFactory.destroy();
+ }
+ }
+
@Override
protected void serviceStop() throws Exception {
accepted.close().awaitUninterruptibly(10, TimeUnit.SECONDS);
- if (selector != null) {
- ServerBootstrap bootstrap = new ServerBootstrap(selector);
- bootstrap.releaseExternalResources();
- }
- if (pipelineFact != null) {
- pipelineFact.destroy();
+ if (bossGroup != null) {
+ bossGroup.shutdownGracefully();
}
- if (timer != null) {
- // Release this shared timer resource
- timer.stop();
+ if (workerGroup != null) {
+ workerGroup.shutdownGracefully();
}
+ destroyPipeline();
if (stateDb != null) {
stateDb.close();
}
@@ -793,7 +830,7 @@ public class ShuffleHandler extends AuxiliaryService {
}
}
- static class TimeoutHandler extends IdleStateAwareChannelHandler {
+ static class TimeoutHandler extends ChannelDuplexHandler {
private boolean enabledTimeout;
@@ -802,59 +839,14 @@ public class ShuffleHandler extends AuxiliaryService {
}
@Override
- public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) {
- if (e.getState() == IdleState.WRITER_IDLE && enabledTimeout) {
- e.getChannel().close();
- }
- }
- }
-
- class HttpPipelineFactory implements ChannelPipelineFactory {
-
- final Shuffle SHUFFLE;
- private SSLFactory sslFactory;
- private final ChannelHandler idleStateHandler;
-
- public HttpPipelineFactory(Configuration conf, Timer timer) throws Exception {
- SHUFFLE = getShuffle(conf);
- if (conf.getBoolean(SHUFFLE_SSL_ENABLED_KEY,
- SHUFFLE_SSL_ENABLED_DEFAULT)) {
- LOG.info("Encrypted shuffle is enabled.");
- sslFactory = new SSLFactory(SSLFactory.Mode.SERVER, conf);
- sslFactory.init();
- }
- this.idleStateHandler = new IdleStateHandler(timer, 0, connectionKeepAliveTimeOut, 0);
- }
-
- public Shuffle getSHUFFLE() {
- return SHUFFLE;
- }
-
- public void destroy() {
- if (sslFactory != null) {
- sslFactory.destroy();
- }
- }
-
- @Override
- public ChannelPipeline getPipeline() throws Exception {
- ChannelPipeline pipeline = Channels.pipeline();
- if (sslFactory != null) {
- pipeline.addLast("ssl", new SslHandler(sslFactory.createSSLEngine()));
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
+ if (evt instanceof IdleStateEvent) {
+ IdleStateEvent e = (IdleStateEvent) evt;
+ if (e.state() == IdleState.WRITER_IDLE && enabledTimeout) {
+ ctx.channel().close();
+ }
}
- pipeline.addLast("decoder", new HttpRequestDecoder());
- pipeline.addLast("aggregator", new HttpChunkAggregator(1 << 16));
- pipeline.addLast("encoder", new HttpResponseEncoder());
- pipeline.addLast("chunking", new ChunkedWriteHandler());
- pipeline.addLast("shuffle", SHUFFLE);
- pipeline.addLast("idle", idleStateHandler);
- pipeline.addLast(TIMEOUT_HANDLER, new TimeoutHandler());
- return pipeline;
- // TODO factor security manager into pipeline
- // TODO factor out encode/decode to permit binary shuffle
- // TODO factor out decode of index to permit alt. models
}
-
}
protected static class Range {
@@ -880,7 +872,8 @@ public class ShuffleHandler extends AuxiliaryService {
}
}
- class Shuffle extends SimpleChannelUpstreamHandler {
+ @Sharable
+ class Shuffle extends ChannelInboundHandlerAdapter {
private static final int MAX_WEIGHT = 10 * 1024 * 1024;
private static final int EXPIRE_AFTER_ACCESS_MINUTES = 5;
@@ -966,37 +959,43 @@ public class ShuffleHandler extends AuxiliaryService {
}
@Override
- public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent evt)
+ public void channelActive(ChannelHandlerContext ctx)
throws Exception {
if ((maxShuffleConnections > 0) && (accepted.size() >= maxShuffleConnections)) {
LOG.info(String.format("Current number of shuffle connections (%d) is " +
"greater than or equal to the max allowed shuffle connections (%d)",
accepted.size(), maxShuffleConnections));
- evt.getChannel().close();
+ ctx.channel().close();
return;
}
- accepted.add(evt.getChannel());
- super.channelOpen(ctx, evt);
+ accepted.add(ctx.channel());
+ super.channelActive(ctx);
}
@Override
- public void messageReceived(ChannelHandlerContext ctx, MessageEvent evt)
+ public void channelRead(ChannelHandlerContext ctx, Object message)
throws Exception {
- HttpRequest request = (HttpRequest) evt.getMessage();
+ FullHttpRequest request = (FullHttpRequest) message;
+ handleRequest(ctx, request);
+ request.release();
+ }
+
+ private void handleRequest(ChannelHandlerContext ctx, FullHttpRequest request)
+ throws IOException, Exception {
if (request.getMethod() != GET) {
sendError(ctx, METHOD_NOT_ALLOWED);
return;
}
// Check whether the shuffle version is compatible
if (!ShuffleHeader.DEFAULT_HTTP_HEADER_NAME.equals(
- request.getHeader(ShuffleHeader.HTTP_HEADER_NAME))
+ request.headers().get(ShuffleHeader.HTTP_HEADER_NAME))
|| !ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION.equals(
- request.getHeader(ShuffleHeader.HTTP_HEADER_VERSION))) {
+ request.headers().get(ShuffleHeader.HTTP_HEADER_VERSION))) {
sendError(ctx, "Incompatible shuffle request version", BAD_REQUEST);
+ return;
}
- final Map<String,List<String>> q =
- new QueryStringDecoder(request.getUri()).getParameters();
+ final Map<String, List<String>> q = new QueryStringDecoder(request.getUri()).parameters();
final List<String> keepAliveList = q.get("keepAlive");
final List<String> dagCompletedQ = q.get("dagAction");
boolean keepAliveParam = false;
@@ -1017,7 +1016,7 @@ public class ShuffleHandler extends AuxiliaryService {
"\n keepAlive: " + keepAliveParam);
}
// If the request is for Dag Deletion, process the request and send OK.
- if (deleteDagDirectories(evt, dagCompletedQ, jobQ, dagIdQ)) {
+ if (deleteDagDirectories(ctx.channel(), dagCompletedQ, jobQ, dagIdQ)) {
return;
}
if (mapIds == null || reduceRange == null || jobQ == null || dagIdQ == null) {
@@ -1066,8 +1065,8 @@ public class ShuffleHandler extends AuxiliaryService {
Map<String, MapOutputInfo> mapOutputInfoMap =
new HashMap<String, MapOutputInfo>();
- Channel ch = evt.getChannel();
- ChannelPipeline pipeline = ch.getPipeline();
+ Channel ch = ctx.channel();
+ ChannelPipeline pipeline = ch.pipeline();
TimeoutHandler timeoutHandler = (TimeoutHandler)pipeline.get(TIMEOUT_HANDLER);
timeoutHandler.setEnabledTimeout(false);
String user = userRsrc.get(jobId);
@@ -1083,19 +1082,23 @@ public class ShuffleHandler extends AuxiliaryService {
return;
}
ch.write(response);
- //Initialize one ReduceContext object per messageReceived call
+ //Initialize one ReduceContext object per channelRead call
boolean keepAlive = keepAliveParam || connectionKeepAliveEnabled;
ReduceContext reduceContext = new ReduceContext(mapIds, reduceRange, ctx,
user, mapOutputInfoMap, jobId, dagId, keepAlive);
for (int i = 0; i < Math.min(maxSessionOpenFiles, mapIds.size()); i++) {
ChannelFuture nextMap = sendMap(reduceContext);
if(nextMap == null) {
+ // by this special message flushed, we can make sure the whole response is finished
+ ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
return;
}
}
+ // by this special message flushed, we can make sure the whole response is finished
+ ch.writeAndFlush(LastHttpContent.EMPTY_LAST_CONTENT);
}
- private boolean deleteDagDirectories(MessageEvent evt,
+ private boolean deleteDagDirectories(Channel channel,
List<String> dagCompletedQ, List<String> jobQ,
List<String> dagIdQ) {
if (jobQ == null || jobQ.isEmpty()) {
@@ -1112,8 +1115,8 @@ public class ShuffleHandler extends AuxiliaryService {
} catch (IOException e) {
LOG.warn("Encountered exception during dag delete "+ e);
}
- evt.getChannel().write(new DefaultHttpResponse(HTTP_1_1, OK));
- evt.getChannel().close();
+ channel.writeAndFlush(new DefaultHttpResponse(HTTP_1_1, OK))
+ .addListener(ChannelFutureListener.CLOSE);
return true;
}
return false;
@@ -1121,7 +1124,7 @@ public class ShuffleHandler extends AuxiliaryService {
/**
* Calls sendMapOutput for the mapId pointed by ReduceContext.mapsToSend
- * and increments it. This method is first called by messageReceived()
+ * and increments it. This method is first called by channelRead()
* maxSessionOpenFiles times and then on the completion of every
* sendMapOutput operation. This limits the number of open files on a node,
* which can get really large(exhausting file descriptors on the NM) if all
@@ -1131,7 +1134,6 @@ public class ShuffleHandler extends AuxiliaryService {
*/
public ChannelFuture sendMap(ReduceContext reduceContext)
throws Exception {
-
ChannelFuture nextMap = null;
if (reduceContext.getMapsToSend().get() <
reduceContext.getMapIds().size()) {
@@ -1140,14 +1142,16 @@ public class ShuffleHandler extends AuxiliaryService {
try {
MapOutputInfo info = reduceContext.getInfoMap().get(mapId);
+
if (info == null) {
info = getMapOutputInfo(reduceContext.dagId, mapId, reduceContext.getReduceRange(),
reduceContext.getJobId(),
reduceContext.getUser());
}
+
nextMap = sendMapOutput(
reduceContext.getCtx(),
- reduceContext.getCtx().getChannel(),
+ reduceContext.getCtx().channel(),
reduceContext.getUser(), mapId,
reduceContext.getReduceRange(), info);
if (null == nextMap) {
@@ -1277,13 +1281,13 @@ public class ShuffleHandler extends AuxiliaryService {
protected void setResponseHeaders(HttpResponse response, boolean keepAliveParam, long contentLength) {
if (connectionKeepAliveEnabled || keepAliveParam) {
- response.setHeader(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(contentLength));
- response.setHeader(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
- response.setHeader(HttpHeaders.Values.KEEP_ALIVE, "timeout=" + connectionKeepAliveTimeOut);
+ response.headers().set(HttpHeaders.Names.CONTENT_LENGTH, String.valueOf(contentLength));
+ response.headers().set(HttpHeaders.Names.CONNECTION, HttpHeaders.Values.KEEP_ALIVE);
+ response.headers().set(HttpHeaders.Values.KEEP_ALIVE, "timeout=" + connectionKeepAliveTimeOut);
LOG.debug("Content Length in shuffle : {}", contentLength);
} else {
LOG.debug("Setting connection close header...");
- response.setHeader(HttpHeaders.Names.CONNECTION, CONNECTION_CLOSE);
+ response.headers().set(HttpHeaders.Names.CONNECTION, CONNECTION_CLOSE);
}
}
@@ -1334,7 +1338,7 @@ public class ShuffleHandler extends AuxiliaryService {
String enc_str = SecureShuffleUtils.buildMsgFrom(requestUri);
// hash from the fetcher
String urlHashStr =
- request.getHeader(SecureShuffleUtils.HTTP_HEADER_URL_HASH);
+ request.headers().get(SecureShuffleUtils.HTTP_HEADER_URL_HASH);
if (urlHashStr == null) {
LOG.info("Missing header hash for " + appid);
throw new IOException("fetcher cannot be authenticated");
@@ -1350,11 +1354,11 @@ public class ShuffleHandler extends AuxiliaryService {
String reply =
SecureShuffleUtils.generateHash(urlHashStr.getBytes(Charsets.UTF_8),
tokenSecret);
- response.setHeader(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
+ response.headers().set(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH, reply);
// Put shuffle version into http header
- response.setHeader(ShuffleHeader.HTTP_HEADER_NAME,
+ response.headers().set(ShuffleHeader.HTTP_HEADER_NAME,
ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
- response.setHeader(ShuffleHeader.HTTP_HEADER_VERSION,
+ response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
if (LOG.isDebugEnabled()) {
int len = reply.length();
@@ -1402,23 +1406,12 @@ public class ShuffleHandler extends AuxiliaryService {
return null;
}
ChannelFuture writeFuture;
- if (ch.getPipeline().get(SslHandler.class) == null) {
+ if (ch.pipeline().get(SslHandler.class) == null) {
final FadvisedFileRegion partition = new FadvisedFileRegion(spill,
rangeOffset, rangePartLength, manageOsCache, readaheadLength,
readaheadPool, spillFile.getAbsolutePath(),
shuffleBufferSize, shuffleTransferToAllowed);
writeFuture = ch.write(partition);
- writeFuture.addListener(new ChannelFutureListener() {
- // TODO error handling; distinguish IO/connection failures,
- // attribute to appropriate spill output
- @Override
- public void operationComplete(ChannelFuture future) {
- if (future.isSuccess()) {
- partition.transferSuccessful();
- }
- partition.releaseExternalResources();
- }
- });
} else {
// HTTPS cannot be done with zero copy.
final FadvisedChunkedFile chunk = new FadvisedChunkedFile(spill,
@@ -1437,27 +1430,25 @@ public class ShuffleHandler extends AuxiliaryService {
sendError(ctx, "", status);
}
- protected void sendError(ChannelHandlerContext ctx, String message,
- HttpResponseStatus status) {
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, status);
- response.setHeader(CONTENT_TYPE, "text/plain; charset=UTF-8");
+ protected void sendError(ChannelHandlerContext ctx, String message, HttpResponseStatus status) {
+ FullHttpResponse response = new DefaultFullHttpResponse(HTTP_1_1, status);
+
+ response.headers().set(CONTENT_TYPE, "text/plain; charset=UTF-8");
// Put shuffle version into http header
- response.setHeader(ShuffleHeader.HTTP_HEADER_NAME,
+ response.headers().set(ShuffleHeader.HTTP_HEADER_NAME,
ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
- response.setHeader(ShuffleHeader.HTTP_HEADER_VERSION,
+ response.headers().set(ShuffleHeader.HTTP_HEADER_VERSION,
ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
- response.setContent(
- ChannelBuffers.copiedBuffer(message, CharsetUtil.UTF_8));
+ response.content().writeBytes(Unpooled.copiedBuffer(message, CharsetUtil.UTF_8));
// Close the connection as soon as the error message is sent.
- ctx.getChannel().write(response).addListener(ChannelFutureListener.CLOSE);
+ ctx.channel().writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
+ response.release();
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
throws Exception {
- Channel ch = e.getChannel();
- Throwable cause = e.getCause();
if (cause instanceof TooLongFrameException) {
sendError(ctx, BAD_REQUEST);
return;
@@ -1474,8 +1465,8 @@ public class ShuffleHandler extends AuxiliaryService {
}
LOG.error("Shuffle error: ", cause);
- if (ch.isConnected()) {
- LOG.error("Shuffle error " + e);
+ if (ctx.channel().isActive()) {
+ LOG.error("Shuffle error", cause);
sendError(ctx, INTERNAL_SERVER_ERROR);
}
}
diff --git a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
index a610236..2bf9cb2 100644
--- a/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
+++ b/tez-plugins/tez-aux-services/src/test/java/org/apache/tez/auxservices/TestShuffleHandler.java
@@ -21,9 +21,7 @@ package org.apache.tez.auxservices;
//import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
//import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertTrue;
-import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
-import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
-import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
+import static io.netty.buffer.Unpooled.wrappedBuffer;
import static org.junit.Assert.assertEquals;
import static org.junit.Assume.assumeTrue;
import static org.mockito.Mockito.mock;
@@ -44,6 +42,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.CheckedOutputStream;
import java.util.zip.Checksum;
@@ -57,9 +56,6 @@ import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
-import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.MapTask;
-import org.apache.hadoop.mapreduce.TypeConverter;
import org.apache.tez.runtime.library.common.security.SecureShuffleUtils;
import org.apache.tez.common.security.JobTokenIdentifier;
import org.apache.tez.common.security.JobTokenSecretManager;
@@ -78,19 +74,21 @@ import org.apache.hadoop.yarn.server.api.ApplicationTerminationContext;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.tez.runtime.library.common.sort.impl.TezIndexRecord;
import org.apache.tez.runtime.library.common.sort.impl.TezSpillRecord;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelHandlerContext;
-import org.jboss.netty.channel.ChannelPipeline;
-import org.jboss.netty.channel.socket.SocketChannel;
-import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.AbstractChannel;
-import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
-import org.jboss.netty.handler.codec.http.HttpHeaders;
-import org.jboss.netty.handler.codec.http.HttpRequest;
-import org.jboss.netty.handler.codec.http.HttpResponse;
-import org.jboss.netty.handler.codec.http.HttpResponseStatus;
-import org.jboss.netty.handler.codec.http.HttpMethod;
+
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.AbstractChannel;
+import io.netty.handler.codec.http.DefaultFullHttpRequest;
+import io.netty.handler.codec.http.FullHttpRequest;
+import io.netty.handler.codec.http.HttpHeaders;
+import io.netty.handler.codec.http.HttpRequest;
+import io.netty.handler.codec.http.HttpResponse;
+import io.netty.handler.codec.http.HttpResponseStatus;
+import io.netty.handler.codec.http.HttpVersion;
+import io.netty.handler.codec.http.HttpMethod;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
@@ -139,12 +137,12 @@ public class TestShuffleHandler {
new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
DataOutputBuffer dob = new DataOutputBuffer();
header.write(dob);
- ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
dob = new DataOutputBuffer();
for (int i = 0; i < 100; ++i) {
header.write(dob);
}
- return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ return ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
}
};
}
@@ -160,8 +158,8 @@ public class TestShuffleHandler {
protected void verifyRequest(String appid, ChannelHandlerContext ctx,
HttpRequest request, HttpResponse response, URL requestUri)
throws IOException {
- SocketChannel channel = (SocketChannel)(ctx.getChannel());
- socketKeepAlive = channel.getConfig().isKeepAlive();
+ SocketChannel channel = (SocketChannel)(ctx.channel());
+ socketKeepAlive = channel.config().isKeepAlive();
}
};
}
@@ -210,6 +208,7 @@ public class TestShuffleHandler {
sh.metrics.operationComplete(cf);
checkShuffleMetrics(ms, 3*MiB, 1, 1, 0);
+ sh.close();
}
static void checkShuffleMetrics(MetricsSystem ms, long bytes, int failed,
@@ -231,7 +230,7 @@ public class TestShuffleHandler {
*/
@Test (timeout = 10000)
public void testClientClosesConnection() throws Exception {
- final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
+ final AtomicBoolean failureEncountered = new AtomicBoolean(false);
Configuration conf = new Configuration();
conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
@@ -273,27 +272,25 @@ public class TestShuffleHandler {
new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
DataOutputBuffer dob = new DataOutputBuffer();
header.write(dob);
- ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
dob = new DataOutputBuffer();
for (int i = 0; i < 100000; ++i) {
header.write(dob);
}
- return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ return ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
}
@Override
protected void sendError(ChannelHandlerContext ctx,
HttpResponseStatus status) {
- if (failures.size() == 0) {
- failures.add(new Error());
- ctx.getChannel().close();
+ if (failureEncountered.compareAndSet(false, true)) {
+ ctx.channel().close();
}
}
@Override
protected void sendError(ChannelHandlerContext ctx, String message,
HttpResponseStatus status) {
- if (failures.size() == 0) {
- failures.add(new Error());
- ctx.getChannel().close();
+ if (failureEncountered.compareAndSet(false, true)) {
+ ctx.channel().close();
}
}
};
@@ -320,9 +317,9 @@ public class TestShuffleHandler {
header.readFields(input);
input.close();
- shuffleHandler.stop();
+ shuffleHandler.close();
Assert.assertTrue("sendError called when client closed connection",
- failures.size() == 0);
+ !failureEncountered.get());
}
static class LastSocketAddress {
@@ -330,14 +327,14 @@ public class TestShuffleHandler {
void setAddress(SocketAddress lastAddress) {
this.lastAddress = lastAddress;
}
- SocketAddress getSocketAddres() {
+ SocketAddress getSocketAddress() {
return lastAddress;
}
}
@Test(timeout = 10000)
public void testKeepAlive() throws Exception {
- final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
+ final AtomicBoolean failureEncountered = new AtomicBoolean(false);
Configuration conf = new Configuration();
conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
@@ -395,8 +392,7 @@ public class TestShuffleHandler {
protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
Channel ch, String user, String mapId, Range reduceRange,
MapOutputInfo info) throws IOException {
- lastSocketAddress.setAddress(ch.getRemoteAddress());
- HttpResponse response = new DefaultHttpResponse(HTTP_1_1, OK);
+ lastSocketAddress.setAddress(ch.remoteAddress());
// send a shuffle header and a lot of data down the channel
// to trigger a broken pipe
@@ -404,29 +400,27 @@ public class TestShuffleHandler {
new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
DataOutputBuffer dob = new DataOutputBuffer();
header.write(dob);
- ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
dob = new DataOutputBuffer();
for (int i = 0; i < 100000; ++i) {
header.write(dob);
}
- return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ return ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
}
@Override
protected void sendError(ChannelHandlerContext ctx,
HttpResponseStatus status) {
- if (failures.size() == 0) {
- failures.add(new Error());
- ctx.getChannel().close();
+ if (failureEncountered.compareAndSet(false, true)) {
+ ctx.channel().close();
}
}
@Override
protected void sendError(ChannelHandlerContext ctx, String message,
HttpResponseStatus status) {
- if (failures.size() == 0) {
- failures.add(new Error());
- ctx.getChannel().close();
+ if (failureEncountered.compareAndSet(false, true)) {
+ ctx.channel().close();
}
}
};
@@ -457,7 +451,7 @@ public class TestShuffleHandler {
header.readFields(input);
byte[] buffer = new byte[1024];
while (input.read(buffer) != -1) {}
- SocketAddress firstAddress = lastSocketAddress.getSocketAddres();
+ SocketAddress firstAddress = lastSocketAddress.getSocketAddress();
input.close();
// For keepAlive via URL
@@ -479,11 +473,12 @@ public class TestShuffleHandler {
header = new ShuffleHeader();
header.readFields(input);
input.close();
- SocketAddress secondAddress = lastSocketAddress.getSocketAddres();
+ SocketAddress secondAddress = lastSocketAddress.getSocketAddress();
Assert.assertNotNull("Initial shuffle address should not be null", firstAddress);
Assert.assertNotNull("Keep-Alive shuffle address should not be null", secondAddress);
Assert.assertEquals("Initial shuffle address and keep-alive shuffle "
+ "address should be the same", firstAddress, secondAddress);
+ shuffleHandler.close();
}
@Test
@@ -519,7 +514,7 @@ public class TestShuffleHandler {
if (conn != null) {
conn.disconnect();
}
- shuffleHandler.stop();
+ shuffleHandler.close();
}
}
@@ -555,7 +550,6 @@ public class TestShuffleHandler {
HttpURLConnection.HTTP_BAD_REQUEST, conn.getResponseCode());
}
- shuffleHandler.stop();
shuffleHandler.close();
}
@@ -609,12 +603,12 @@ public class TestShuffleHandler {
new ShuffleHeader("dummy_header", 5678, 5678, 1);
DataOutputBuffer dob = new DataOutputBuffer();
header.write(dob);
- ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
dob = new DataOutputBuffer();
for (int i=0; i<100000; ++i) {
header.write(dob);
}
- return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ return ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
}
};
}
@@ -641,6 +635,10 @@ public class TestShuffleHandler {
// Try to open numerous connections
for (int i = 0; i < connAttempts; i++) {
+ // connections should be made in a bit relaxed way, otherwise
+ // non-synced channelActive method will mess them up
+ Thread.sleep(200);
+
conns[i].connect();
}
@@ -664,7 +662,7 @@ public class TestShuffleHandler {
Assert.fail("Expected a SocketException");
}
- shuffleHandler.stop();
+ shuffleHandler.close();
}
/**
@@ -759,7 +757,7 @@ public class TestShuffleHandler {
}
} finally {
- shuffleHandler.stop();
+ shuffleHandler.close();
FileUtil.fullyDelete(absLogDir);
}
}
@@ -852,7 +850,7 @@ public class TestShuffleHandler {
+ " did not match expected owner '" + user + "'";
Assert.assertTrue((new String(byteArr)).contains(message));
} finally {
- shuffleHandler.stop();
+ shuffleHandler.close();
FileUtil.fullyDelete(absLogDir);
}
}
@@ -905,7 +903,6 @@ public class TestShuffleHandler {
public void testRecovery() throws IOException {
final String user = "someuser";
final ApplicationId appId = ApplicationId.newInstance(12345, 1);
- final JobID jobId = JobID.downgrade(TypeConverter.fromYarn(appId));
final File tmpDir = new File(System.getProperty("test.build.data",
System.getProperty("java.io.tmpdir")),
TestShuffleHandler.class.getName());
@@ -1083,7 +1080,7 @@ public class TestShuffleHandler {
@Test(timeout = 100000)
public void testGetMapOutputInfo() throws Exception {
- final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
+ final AtomicBoolean failureEncountered = new AtomicBoolean(false);
Configuration conf = new Configuration();
conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
@@ -1125,9 +1122,8 @@ public class TestShuffleHandler {
@Override
protected void sendError(ChannelHandlerContext ctx, String message,
HttpResponseStatus status) {
- if (failures.size() == 0) {
- failures.add(new Error(message));
- ctx.getChannel().close();
+ if (failureEncountered.compareAndSet(false, true)) {
+ ctx.channel().close();
}
}
@Override
@@ -1139,7 +1135,7 @@ public class TestShuffleHandler {
new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
DataOutputBuffer dob = new DataOutputBuffer();
header.write(dob);
- return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ return ch.writeAndFlush(wrappedBuffer(dob.getData(), 0, dob.getLength()));
}
};
}
@@ -1179,16 +1175,16 @@ public class TestShuffleHandler {
// ignore
}
Assert.assertEquals("sendError called due to shuffle error",
- 0, failures.size());
+ false, failureEncountered.get());
} finally {
- shuffleHandler.stop();
+ shuffleHandler.close();
FileUtil.fullyDelete(absLogDir);
}
}
@Test(timeout = 5000)
public void testDagDelete() throws Exception {
- final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
+ final AtomicBoolean failureEncountered = new AtomicBoolean(false);
Configuration conf = new Configuration();
conf.set(HADOOP_TMP_DIR, TEST_DIR.getAbsolutePath());
conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
@@ -1213,9 +1209,8 @@ public class TestShuffleHandler {
@Override
protected void sendError(ChannelHandlerContext ctx, String message,
HttpResponseStatus status) {
- if (failures.size() == 0) {
- failures.add(new Error(message));
- ctx.getChannel().close();
+ if (failureEncountered.compareAndSet(false, true)) {
+ ctx.channel().close();
}
}
};
@@ -1261,9 +1256,9 @@ public class TestShuffleHandler {
// ignore
}
Assert.assertEquals("sendError called due to shuffle error",
- 0, failures.size());
+ false, failureEncountered.get());
} finally {
- shuffleHandler.stop();
+ shuffleHandler.close();
FileUtil.fullyDelete(absLogDir);
}
}
@@ -1275,29 +1270,23 @@ public class TestShuffleHandler {
final ChannelHandlerContext mockCtx =
mock(ChannelHandlerContext.class);
- final MessageEvent mockEvt = mock(MessageEvent.class);
final Channel mockCh = mock(AbstractChannel.class);
final ChannelPipeline mockPipeline = Mockito.mock(ChannelPipeline.class);
// Mock HttpRequest and ChannelFuture
- final HttpRequest mockHttpRequest = createMockHttpRequest();
+ final FullHttpRequest httpRequest = createHttpRequest();
final ChannelFuture mockFuture = createMockChannelFuture(mockCh,
listenerList);
final ShuffleHandler.TimeoutHandler timerHandler =
new ShuffleHandler.TimeoutHandler();
// Mock Netty Channel Context and Channel behavior
- Mockito.doReturn(mockCh).when(mockCtx).getChannel();
- Mockito.when(mockCh.getPipeline()).thenReturn(mockPipeline);
+ Mockito.doReturn(mockCh).when(mockCtx).channel();
+ Mockito.when(mockCh.pipeline()).thenReturn(mockPipeline);
Mockito.when(mockPipeline.get(Mockito.any(String.class))).thenReturn(timerHandler);
- when(mockCtx.getChannel()).thenReturn(mockCh);
- Mockito.doReturn(mockFuture).when(mockCh).write(Mockito.any(Object.class));
- when(mockCh.write(Object.class)).thenReturn(mockFuture);
-
- //Mock MessageEvent behavior
- Mockito.doReturn(mockCh).when(mockEvt).getChannel();
- when(mockEvt.getChannel()).thenReturn(mockCh);
- Mockito.doReturn(mockHttpRequest).when(mockEvt).getMessage();
+ when(mockCtx.channel()).thenReturn(mockCh);
+ Mockito.doReturn(mockFuture).when(mockCh).writeAndFlush(Mockito.any(Object.class));
+ when(mockCh.writeAndFlush(Object.class)).thenReturn(mockFuture);
final ShuffleHandler sh = new MockShuffleHandler();
Configuration conf = new Configuration();
@@ -1308,7 +1297,7 @@ public class TestShuffleHandler {
sh.start();
int maxOpenFiles =conf.getInt(ShuffleHandler.SHUFFLE_MAX_SESSION_OPEN_FILES,
ShuffleHandler.DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES);
- sh.getShuffle(conf).messageReceived(mockCtx, mockEvt);
+ sh.getShuffle(conf).channelRead(mockCtx, httpRequest);
assertTrue("Number of Open files should not exceed the configured " +
"value!-Not Expected",
listenerList.size() <= maxOpenFiles);
@@ -1324,9 +1313,9 @@ public class TestShuffleHandler {
public ChannelFuture createMockChannelFuture(Channel mockCh,
final List<ShuffleHandler.ReduceMapFileCount> listenerList) {
final ChannelFuture mockFuture = mock(ChannelFuture.class);
- when(mockFuture.getChannel()).thenReturn(mockCh);
+ when(mockFuture.channel()).thenReturn(mockCh);
Mockito.doReturn(true).when(mockFuture).isSuccess();
- Mockito.doAnswer(new Answer() {
+ Mockito.doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
//Add ReduceMapFileCount listener to a list
@@ -1341,18 +1330,11 @@ public class TestShuffleHandler {
return mockFuture;
}
- public HttpRequest createMockHttpRequest() {
- HttpRequest mockHttpRequest = mock(HttpRequest.class);
- Mockito.doReturn(HttpMethod.GET).when(mockHttpRequest).getMethod();
- Mockito.doAnswer(new Answer() {
- @Override
- public Object answer(InvocationOnMock invocation) throws Throwable {
- String uri = "/mapOutput?job=job_12345_1&dag=1&reduce=1";
- for (int i = 0; i < 100; i++)
- uri = uri.concat("&map=attempt_12345_1_m_" + i + "_0");
- return uri;
- }
- }).when(mockHttpRequest).getUri();
- return mockHttpRequest;
+ public FullHttpRequest createHttpRequest() {
+ String uri = "/mapOutput?job=job_12345_1&dag=1&reduce=1";
+ for (int i = 0; i < 100; i++) {
+ uri = uri.concat("&map=attempt_12345_1_m_" + i + "_0");
+ }
+ return new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri);
}
}