You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ec...@apache.org on 2015/09/15 21:12:01 UTC
[16/50] [abbrv] hadoop git commit: MAPREDUCE-6474. ShuffleHandler can
possibly exhaust nodemanager file descriptors. Contributed by Kuhu Shukla
MAPREDUCE-6474. ShuffleHandler can possibly exhaust nodemanager file descriptors. Contributed by Kuhu Shukla
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/8e615588
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/8e615588
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/8e615588
Branch: refs/heads/HADOOP-11890
Commit: 8e615588d5216394d0251a9c97bd706537856c6d
Parents: a40342b
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Sep 10 16:00:17 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Sep 10 16:00:17 2015 +0000
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 3 +
.../apache/hadoop/mapred/ShuffleHandler.java | 177 +++++++++++++++++--
.../hadoop/mapred/TestShuffleHandler.java | 129 ++++++++++++++
3 files changed, 293 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e615588/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 428d37e..02c1f1f 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -590,6 +590,9 @@ Release 2.7.2 - UNRELEASED
MAPREDUCE-6442. Stack trace is missing when error occurs in client protocol
provider's constructor (Chang Li via ozawa)
+ MAPREDUCE-6474. ShuffleHandler can possibly exhaust nodemanager file
+ descriptors (Kuhu Shukla via jlowe)
+
Release 2.7.1 - 2015-07-06
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e615588/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
index ee1be23..7a078a8 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
@@ -49,6 +49,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import javax.crypto.SecretKey;
@@ -170,6 +171,7 @@ public class ShuffleHandler extends AuxiliaryService {
private int maxShuffleConnections;
private int shuffleBufferSize;
private boolean shuffleTransferToAllowed;
+ private int maxSessionOpenFiles;
private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
private Map<String,String> userRsrc;
@@ -220,6 +222,13 @@ public class ShuffleHandler extends AuxiliaryService {
public static final boolean WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED =
false;
+ /* the maximum number of files a single GET request can
+ open simultaneously during shuffle
+ */
+ public static final String SHUFFLE_MAX_SESSION_OPEN_FILES =
+ "mapreduce.shuffle.max.session-open-files";
+ public static final int DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES = 3;
+
boolean connectionKeepAliveEnabled = false;
int connectionKeepAliveTimeOut;
int mapOutputMetaInfoCacheSize;
@@ -248,6 +257,104 @@ public class ShuffleHandler extends AuxiliaryService {
final ShuffleMetrics metrics;
+ class ReduceMapFileCount implements ChannelFutureListener {
+
+ private ReduceContext reduceContext;
+
+ public ReduceMapFileCount(ReduceContext rc) {
+ this.reduceContext = rc;
+ }
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ future.getChannel().close();
+ return;
+ }
+ int waitCount = this.reduceContext.getMapsToWait().decrementAndGet();
+ if (waitCount == 0) {
+ metrics.operationComplete(future);
+ future.getChannel().close();
+ } else {
+ pipelineFact.getSHUFFLE().sendMap(reduceContext);
+ }
+ }
+ }
+
+ /**
+ * Maintain parameters per messageReceived() Netty context.
+ * Allows sendMapOutput calls from operationComplete()
+ */
+ private static class ReduceContext {
+
+ private List<String> mapIds;
+ private AtomicInteger mapsToWait;
+ private AtomicInteger mapsToSend;
+ private int reduceId;
+ private ChannelHandlerContext ctx;
+ private String user;
+ private Map<String, Shuffle.MapOutputInfo> infoMap;
+ private String outputBasePathStr;
+
+ public ReduceContext(List<String> mapIds, int rId,
+ ChannelHandlerContext context, String usr,
+ Map<String, Shuffle.MapOutputInfo> mapOutputInfoMap,
+ String outputBasePath) {
+
+ this.mapIds = mapIds;
+ this.reduceId = rId;
+ /**
+ * Atomic count for tracking the no. of map outputs that are yet to
+ * complete. Multiple futureListeners' operationComplete() can decrement
+ * this value asynchronously. It is used to decide when the channel should
+ * be closed.
+ */
+ this.mapsToWait = new AtomicInteger(mapIds.size());
+ /**
+ * Atomic count for tracking the no. of map outputs that have been sent.
+ * Multiple sendMap() calls can increment this value
+ * asynchronously. Used to decide which mapId should be sent next.
+ */
+ this.mapsToSend = new AtomicInteger(0);
+ this.ctx = context;
+ this.user = usr;
+ this.infoMap = mapOutputInfoMap;
+ this.outputBasePathStr = outputBasePath;
+ }
+
+ public int getReduceId() {
+ return reduceId;
+ }
+
+ public ChannelHandlerContext getCtx() {
+ return ctx;
+ }
+
+ public String getUser() {
+ return user;
+ }
+
+ public Map<String, Shuffle.MapOutputInfo> getInfoMap() {
+ return infoMap;
+ }
+
+ public String getOutputBasePathStr() {
+ return outputBasePathStr;
+ }
+
+ public List<String> getMapIds() {
+ return mapIds;
+ }
+
+ public AtomicInteger getMapsToSend() {
+ return mapsToSend;
+ }
+
+ public AtomicInteger getMapsToWait() {
+ return mapsToWait;
+ }
+ }
+
ShuffleHandler(MetricsSystem ms) {
super("httpshuffle");
metrics = ms.register(new ShuffleMetrics());
@@ -357,6 +464,9 @@ public class ShuffleHandler extends AuxiliaryService {
(Shell.WINDOWS)?WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED:
DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED);
+ maxSessionOpenFiles = conf.getInt(SHUFFLE_MAX_SESSION_OPEN_FILES,
+ DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES);
+
ThreadFactory bossFactory = new ThreadFactoryBuilder()
.setNameFormat("ShuffleHandler Netty Boss #%d")
.build();
@@ -638,6 +748,10 @@ public class ShuffleHandler extends AuxiliaryService {
}
}
+ public Shuffle getSHUFFLE() {
+ return SHUFFLE;
+ }
+
public void destroy() {
if (sslFactory != null) {
sslFactory.destroy();
@@ -809,31 +923,62 @@ public class ShuffleHandler extends AuxiliaryService {
return;
}
ch.write(response);
- // TODO refactor the following into the pipeline
- ChannelFuture lastMap = null;
- for (String mapId : mapIds) {
+ //Initialize one ReduceContext object per messageReceived call
+ ReduceContext reduceContext = new ReduceContext(mapIds, reduceId, ctx,
+ user, mapOutputInfoMap, outputBasePathStr);
+ for (int i = 0; i < Math.min(maxSessionOpenFiles, mapIds.size()); i++) {
+ ChannelFuture nextMap = sendMap(reduceContext);
+ if(nextMap == null) {
+ return;
+ }
+ }
+ }
+
+ /**
+ * Calls sendMapOutput for the mapId pointed by ReduceContext.mapsToSend
+ * and increments it. This method is first called by messageReceived()
+ * maxSessionOpenFiles times and then on the completion of every
+ * sendMapOutput operation. This limits the number of open files on a node,
+ * which can get really large(exhausting file descriptors on the NM) if all
+ * sendMapOutputs are called in one go, as was done previous to this change.
+ * @param reduceContext used to call sendMapOutput with correct params.
+ * @return the ChannelFuture of the sendMapOutput, can be null.
+ */
+ public ChannelFuture sendMap(ReduceContext reduceContext)
+ throws Exception {
+
+ ChannelFuture nextMap = null;
+ if (reduceContext.getMapsToSend().get() <
+ reduceContext.getMapIds().size()) {
+ int nextIndex = reduceContext.getMapsToSend().getAndIncrement();
+ String mapId = reduceContext.getMapIds().get(nextIndex);
+
try {
- MapOutputInfo info = mapOutputInfoMap.get(mapId);
+ MapOutputInfo info = reduceContext.getInfoMap().get(mapId);
if (info == null) {
- info = getMapOutputInfo(outputBasePathStr + mapId,
- mapId, reduceId, user);
+ info = getMapOutputInfo(reduceContext.getOutputBasePathStr() +
+ mapId, mapId, reduceContext.getReduceId(),
+ reduceContext.getUser());
}
- lastMap =
- sendMapOutput(ctx, ch, user, mapId,
- reduceId, info);
- if (null == lastMap) {
- sendError(ctx, NOT_FOUND);
- return;
+ nextMap = sendMapOutput(
+ reduceContext.getCtx(),
+ reduceContext.getCtx().getChannel(),
+ reduceContext.getUser(), mapId,
+ reduceContext.getReduceId(), info);
+ if (null == nextMap) {
+ sendError(reduceContext.getCtx(), NOT_FOUND);
+ return null;
}
+ nextMap.addListener(new ReduceMapFileCount(reduceContext));
} catch (IOException e) {
LOG.error("Shuffle error :", e);
String errorMessage = getErrorMessage(e);
- sendError(ctx,errorMessage , INTERNAL_SERVER_ERROR);
- return;
+ sendError(reduceContext.getCtx(), errorMessage,
+ INTERNAL_SERVER_ERROR);
+ return null;
}
}
- lastMap.addListener(metrics);
- lastMap.addListener(ChannelFutureListener.CLOSE);
+ return nextMap;
}
private String getErrorMessage(Throwable t) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/8e615588/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
index bad9b2d..f16fe5a 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
@@ -22,6 +22,7 @@ import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.apache.hadoop.test.MockitoMaker.make;
import static org.apache.hadoop.test.MockitoMaker.stub;
+import static org.junit.Assert.assertTrue;
import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
@@ -79,18 +80,66 @@ import org.apache.hadoop.yarn.server.records.Version;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.AbstractChannel;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.HttpMethod;
import org.junit.Assert;
import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mockito.Mockito;
import org.mortbay.jetty.HttpHeaders;
public class TestShuffleHandler {
static final long MiB = 1024 * 1024;
private static final Log LOG = LogFactory.getLog(TestShuffleHandler.class);
+ class MockShuffleHandler extends org.apache.hadoop.mapred.ShuffleHandler {
+ @Override
+ protected Shuffle getShuffle(final Configuration conf) {
+ return new Shuffle(conf) {
+ @Override
+ protected void verifyRequest(String appid, ChannelHandlerContext ctx,
+ HttpRequest request, HttpResponse response, URL requestUri)
+ throws IOException {
+ }
+ @Override
+ protected MapOutputInfo getMapOutputInfo(String base, String mapId,
+ int reduce, String user) throws IOException {
+ // Do nothing.
+ return null;
+ }
+ @Override
+ protected void populateHeaders(List<String> mapIds, String jobId,
+ String user, int reduce, HttpRequest request,
+ HttpResponse response, boolean keepAliveParam,
+ Map<String, MapOutputInfo> infoMap) throws IOException {
+ // Do nothing.
+ }
+ @Override
+ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
+ Channel ch, String user, String mapId, int reduce,
+ MapOutputInfo info) throws IOException {
+
+ ShuffleHeader header =
+ new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
+ DataOutputBuffer dob = new DataOutputBuffer();
+ header.write(dob);
+ ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ dob = new DataOutputBuffer();
+ for (int i = 0; i < 100; ++i) {
+ header.write(dob);
+ }
+ return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ }
+ };
+ }
+ }
+
/**
* Test the validation of ShuffleHandler's meta-data's serialization and
* de-serialization.
@@ -934,4 +983,84 @@ public class TestShuffleHandler {
FileUtil.fullyDelete(absLogDir);
}
}
+
+ @Test(timeout = 4000)
+ public void testSendMapCount() throws Exception {
+ final List<ShuffleHandler.ReduceMapFileCount> listenerList =
+ new ArrayList<ShuffleHandler.ReduceMapFileCount>();
+
+ final ChannelHandlerContext mockCtx =
+ Mockito.mock(ChannelHandlerContext.class);
+ final MessageEvent mockEvt = Mockito.mock(MessageEvent.class);
+ final Channel mockCh = Mockito.mock(AbstractChannel.class);
+
+ // Mock HttpRequest and ChannelFuture
+ final HttpRequest mockHttpRequest = createMockHttpRequest();
+ final ChannelFuture mockFuture = createMockChannelFuture(mockCh,
+ listenerList);
+
+ // Mock Netty Channel Context and Channel behavior
+ Mockito.doReturn(mockCh).when(mockCtx).getChannel();
+ Mockito.when(mockCtx.getChannel()).thenReturn(mockCh);
+ Mockito.doReturn(mockFuture).when(mockCh).write(Mockito.any(Object.class));
+ Mockito.when(mockCh.write(Object.class)).thenReturn(mockFuture);
+
+ //Mock MessageEvent behavior
+ Mockito.doReturn(mockCh).when(mockEvt).getChannel();
+ Mockito.when(mockEvt.getChannel()).thenReturn(mockCh);
+ Mockito.doReturn(mockHttpRequest).when(mockEvt).getMessage();
+
+ final ShuffleHandler sh = new MockShuffleHandler();
+ Configuration conf = new Configuration();
+ sh.init(conf);
+ sh.start();
+ int maxOpenFiles =conf.getInt(ShuffleHandler.SHUFFLE_MAX_SESSION_OPEN_FILES,
+ ShuffleHandler.DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES);
+ sh.getShuffle(conf).messageReceived(mockCtx, mockEvt);
+ assertTrue("Number of Open files should not exceed the configured " +
+ "value!-Not Expected",
+ listenerList.size() <= maxOpenFiles);
+ while(!listenerList.isEmpty()) {
+ listenerList.remove(0).operationComplete(mockFuture);
+ assertTrue("Number of Open files should not exceed the configured " +
+ "value!-Not Expected",
+ listenerList.size() <= maxOpenFiles);
+ }
+ sh.close();
+ }
+
+ public ChannelFuture createMockChannelFuture(Channel mockCh,
+ final List<ShuffleHandler.ReduceMapFileCount> listenerList) {
+ final ChannelFuture mockFuture = Mockito.mock(ChannelFuture.class);
+ Mockito.when(mockFuture.getChannel()).thenReturn(mockCh);
+ Mockito.doReturn(true).when(mockFuture).isSuccess();
+ Mockito.doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ //Add ReduceMapFileCount listener to a list
+ if (invocation.getArguments()[0].getClass() ==
+ ShuffleHandler.ReduceMapFileCount.class)
+ listenerList.add((ShuffleHandler.ReduceMapFileCount)
+ invocation.getArguments()[0]);
+ return null;
+ }
+ }).when(mockFuture).addListener(Mockito.any(
+ ShuffleHandler.ReduceMapFileCount.class));
+ return mockFuture;
+ }
+
+ public HttpRequest createMockHttpRequest() {
+ HttpRequest mockHttpRequest = Mockito.mock(HttpRequest.class);
+ Mockito.doReturn(HttpMethod.GET).when(mockHttpRequest).getMethod();
+ Mockito.doAnswer(new Answer() {
+ @Override
+ public Object answer(InvocationOnMock invocation) throws Throwable {
+ String uri = "/mapOutput?job=job_12345_1&reduce=1";
+ for (int i = 0; i < 100; i++)
+ uri = uri.concat("&map=attempt_12345_1_m_" + i + "_0");
+ return uri;
+ }
+ }).when(mockHttpRequest).getUri();
+ return mockHttpRequest;
+ }
}