You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jl...@apache.org on 2015/09/10 18:05:36 UTC

hadoop git commit: MAPREDUCE-6474. ShuffleHandler can possibly exhaust nodemanager file descriptors. Contributed by Kuhu Shukla (cherry picked from commit 8e615588d5216394d0251a9c97bd706537856c6d)

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 633344aa5 -> 7909462c3


MAPREDUCE-6474. ShuffleHandler can possibly exhaust nodemanager file descriptors. Contributed by Kuhu Shukla
(cherry picked from commit 8e615588d5216394d0251a9c97bd706537856c6d)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/7909462c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/7909462c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/7909462c

Branch: refs/heads/branch-2
Commit: 7909462c3a248f0e1179fbd37122b1cc7489601a
Parents: 633344a
Author: Jason Lowe <jl...@apache.org>
Authored: Thu Sep 10 16:00:17 2015 +0000
Committer: Jason Lowe <jl...@apache.org>
Committed: Thu Sep 10 16:01:27 2015 +0000

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |   3 +
 .../apache/hadoop/mapred/ShuffleHandler.java    | 177 +++++++++++++++++--
 .../hadoop/mapred/TestShuffleHandler.java       | 129 ++++++++++++++
 3 files changed, 293 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/7909462c/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index bfe0676..ad4c6c5 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -321,6 +321,9 @@ Release 2.7.2 - UNRELEASED
     MAPREDUCE-6442. Stack trace is missing when error occurs in client protocol
     provider's constructor (Chang Li via ozawa)
 
+    MAPREDUCE-6474. ShuffleHandler can possibly exhaust nodemanager file
+    descriptors (Kuhu Shukla via jlowe)
+
 Release 2.7.1 - 2015-07-06 
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7909462c/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
index ee1be23..7a078a8 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
@@ -49,6 +49,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 
 import javax.crypto.SecretKey;
@@ -170,6 +171,7 @@ public class ShuffleHandler extends AuxiliaryService {
   private int maxShuffleConnections;
   private int shuffleBufferSize;
   private boolean shuffleTransferToAllowed;
+  private int maxSessionOpenFiles;
   private ReadaheadPool readaheadPool = ReadaheadPool.getInstance();
 
   private Map<String,String> userRsrc;
@@ -220,6 +222,13 @@ public class ShuffleHandler extends AuxiliaryService {
   public static final boolean WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED = 
       false;
 
+  /* the maximum number of files a single GET request can
+   open simultaneously during shuffle
+   */
+  public static final String SHUFFLE_MAX_SESSION_OPEN_FILES =
+      "mapreduce.shuffle.max.session-open-files";
+  public static final int DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES = 3;
+
   boolean connectionKeepAliveEnabled = false;
   int connectionKeepAliveTimeOut;
   int mapOutputMetaInfoCacheSize;
@@ -248,6 +257,104 @@ public class ShuffleHandler extends AuxiliaryService {
 
   final ShuffleMetrics metrics;
 
+  class ReduceMapFileCount implements ChannelFutureListener {
+
+    private ReduceContext reduceContext;
+
+    public ReduceMapFileCount(ReduceContext rc) {
+      this.reduceContext = rc;
+    }
+
+    @Override
+    public void operationComplete(ChannelFuture future) throws Exception {
+      if (!future.isSuccess()) {
+        future.getChannel().close();
+        return;
+      }
+      int waitCount = this.reduceContext.getMapsToWait().decrementAndGet();
+      if (waitCount == 0) {
+        metrics.operationComplete(future);
+        future.getChannel().close();
+      } else {
+        pipelineFact.getSHUFFLE().sendMap(reduceContext);
+      }
+    }
+  }
+
+  /**
+   * Maintain parameters per messageReceived() Netty context.
+   * Allows sendMapOutput calls from operationComplete()
+   */
+  private static class ReduceContext {
+
+    private List<String> mapIds;
+    private AtomicInteger mapsToWait;
+    private AtomicInteger mapsToSend;
+    private int reduceId;
+    private ChannelHandlerContext ctx;
+    private String user;
+    private Map<String, Shuffle.MapOutputInfo> infoMap;
+    private String outputBasePathStr;
+
+    public ReduceContext(List<String> mapIds, int rId,
+                         ChannelHandlerContext context, String usr,
+                         Map<String, Shuffle.MapOutputInfo> mapOutputInfoMap,
+                         String outputBasePath) {
+
+      this.mapIds = mapIds;
+      this.reduceId = rId;
+      /**
+      * Atomic count for tracking the no. of map outputs that are yet to
+      * complete. Multiple futureListeners' operationComplete() can decrement
+      * this value asynchronously. It is used to decide when the channel should
+      * be closed.
+      */
+      this.mapsToWait = new AtomicInteger(mapIds.size());
+      /**
+      * Atomic count for tracking the no. of map outputs that have been sent.
+      * Multiple sendMap() calls can increment this value
+      * asynchronously. Used to decide which mapId should be sent next.
+      */
+      this.mapsToSend = new AtomicInteger(0);
+      this.ctx = context;
+      this.user = usr;
+      this.infoMap = mapOutputInfoMap;
+      this.outputBasePathStr = outputBasePath;
+    }
+
+    public int getReduceId() {
+      return reduceId;
+    }
+
+    public ChannelHandlerContext getCtx() {
+      return ctx;
+    }
+
+    public String getUser() {
+      return user;
+    }
+
+    public Map<String, Shuffle.MapOutputInfo> getInfoMap() {
+      return infoMap;
+    }
+
+    public String getOutputBasePathStr() {
+      return outputBasePathStr;
+    }
+
+    public List<String> getMapIds() {
+      return mapIds;
+    }
+
+    public AtomicInteger getMapsToSend() {
+      return mapsToSend;
+    }
+
+    public AtomicInteger getMapsToWait() {
+      return mapsToWait;
+    }
+  }
+
   ShuffleHandler(MetricsSystem ms) {
     super("httpshuffle");
     metrics = ms.register(new ShuffleMetrics());
@@ -357,6 +464,9 @@ public class ShuffleHandler extends AuxiliaryService {
          (Shell.WINDOWS)?WINDOWS_DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED:
                          DEFAULT_SHUFFLE_TRANSFERTO_ALLOWED);
 
+    maxSessionOpenFiles = conf.getInt(SHUFFLE_MAX_SESSION_OPEN_FILES,
+        DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES);
+
     ThreadFactory bossFactory = new ThreadFactoryBuilder()
       .setNameFormat("ShuffleHandler Netty Boss #%d")
       .build();
@@ -638,6 +748,10 @@ public class ShuffleHandler extends AuxiliaryService {
       }
     }
 
+    public Shuffle getSHUFFLE() {
+      return SHUFFLE;
+    }
+
     public void destroy() {
       if (sslFactory != null) {
         sslFactory.destroy();
@@ -809,31 +923,62 @@ public class ShuffleHandler extends AuxiliaryService {
         return;
       }
       ch.write(response);
-      // TODO refactor the following into the pipeline
-      ChannelFuture lastMap = null;
-      for (String mapId : mapIds) {
+      //Initialize one ReduceContext object per messageReceived call
+      ReduceContext reduceContext = new ReduceContext(mapIds, reduceId, ctx,
+          user, mapOutputInfoMap, outputBasePathStr);
+      for (int i = 0; i < Math.min(maxSessionOpenFiles, mapIds.size()); i++) {
+        ChannelFuture nextMap = sendMap(reduceContext);
+        if(nextMap == null) {
+          return;
+        }
+      }
+    }
+
+    /**
+     * Calls sendMapOutput for the mapId pointed by ReduceContext.mapsToSend
+     * and increments it. This method is first called by messageReceived()
+     * maxSessionOpenFiles times and then on the completion of every
+     * sendMapOutput operation. This limits the number of open files on a node,
+     * which can get really large(exhausting file descriptors on the NM) if all
+     * sendMapOutputs are called in one go, as was done previous to this change.
+     * @param reduceContext used to call sendMapOutput with correct params.
+     * @return the ChannelFuture of the sendMapOutput, can be null.
+     */
+    public ChannelFuture sendMap(ReduceContext reduceContext)
+        throws Exception {
+
+      ChannelFuture nextMap = null;
+      if (reduceContext.getMapsToSend().get() <
+          reduceContext.getMapIds().size()) {
+        int nextIndex = reduceContext.getMapsToSend().getAndIncrement();
+        String mapId = reduceContext.getMapIds().get(nextIndex);
+
         try {
-          MapOutputInfo info = mapOutputInfoMap.get(mapId);
+          MapOutputInfo info = reduceContext.getInfoMap().get(mapId);
           if (info == null) {
-            info = getMapOutputInfo(outputBasePathStr + mapId,
-                mapId, reduceId, user);
+            info = getMapOutputInfo(reduceContext.getOutputBasePathStr() +
+                       mapId, mapId, reduceContext.getReduceId(),
+                       reduceContext.getUser());
           }
-          lastMap =
-              sendMapOutput(ctx, ch, user, mapId,
-                reduceId, info);
-          if (null == lastMap) {
-            sendError(ctx, NOT_FOUND);
-            return;
+          nextMap = sendMapOutput(
+              reduceContext.getCtx(),
+              reduceContext.getCtx().getChannel(),
+              reduceContext.getUser(), mapId,
+              reduceContext.getReduceId(), info);
+          if (null == nextMap) {
+            sendError(reduceContext.getCtx(), NOT_FOUND);
+            return null;
           }
+          nextMap.addListener(new ReduceMapFileCount(reduceContext));
         } catch (IOException e) {
           LOG.error("Shuffle error :", e);
           String errorMessage = getErrorMessage(e);
-          sendError(ctx,errorMessage , INTERNAL_SERVER_ERROR);
-          return;
+          sendError(reduceContext.getCtx(), errorMessage,
+              INTERNAL_SERVER_ERROR);
+          return null;
         }
       }
-      lastMap.addListener(metrics);
-      lastMap.addListener(ChannelFutureListener.CLOSE);
+      return nextMap;
     }
 
     private String getErrorMessage(Throwable t) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/7909462c/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
index 878bc87..5cb6050 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
@@ -22,6 +22,7 @@ import static org.apache.hadoop.test.MetricsAsserts.assertGauge;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
 import static org.apache.hadoop.test.MockitoMaker.make;
 import static org.apache.hadoop.test.MockitoMaker.stub;
+import static org.junit.Assert.assertTrue;
 import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
 import static org.jboss.netty.handler.codec.http.HttpResponseStatus.OK;
 import static org.jboss.netty.handler.codec.http.HttpVersion.HTTP_1_1;
@@ -79,18 +80,66 @@ import org.apache.hadoop.yarn.server.records.Version;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.AbstractChannel;
 import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
 import org.jboss.netty.handler.codec.http.HttpRequest;
 import org.jboss.netty.handler.codec.http.HttpResponse;
 import org.jboss.netty.handler.codec.http.HttpResponseStatus;
+import org.jboss.netty.handler.codec.http.HttpMethod;
 import org.junit.Assert;
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.mockito.Mockito;
 import org.mortbay.jetty.HttpHeaders;
 
 public class TestShuffleHandler {
   static final long MiB = 1024 * 1024; 
   private static final Log LOG = LogFactory.getLog(TestShuffleHandler.class);
 
+  class MockShuffleHandler extends org.apache.hadoop.mapred.ShuffleHandler {
+    @Override
+    protected Shuffle getShuffle(final Configuration conf) {
+      return new Shuffle(conf) {
+        @Override
+        protected void verifyRequest(String appid, ChannelHandlerContext ctx,
+            HttpRequest request, HttpResponse response, URL requestUri)
+            throws IOException {
+        }
+        @Override
+        protected MapOutputInfo getMapOutputInfo(String base, String mapId,
+            int reduce, String user) throws IOException {
+          // Do nothing.
+          return null;
+        }
+        @Override
+        protected void populateHeaders(List<String> mapIds, String jobId,
+            String user, int reduce, HttpRequest request,
+            HttpResponse response, boolean keepAliveParam,
+            Map<String, MapOutputInfo> infoMap) throws IOException {
+          // Do nothing.
+        }
+        @Override
+        protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
+            Channel ch, String user, String mapId, int reduce,
+            MapOutputInfo info) throws IOException {
+
+          ShuffleHeader header =
+              new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
+          DataOutputBuffer dob = new DataOutputBuffer();
+          header.write(dob);
+          ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+          dob = new DataOutputBuffer();
+          for (int i = 0; i < 100; ++i) {
+            header.write(dob);
+          }
+          return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+        }
+      };
+    }
+  }
+
   /**
    * Test the validation of ShuffleHandler's meta-data's serialization and
    * de-serialization.
@@ -934,4 +983,84 @@ public class TestShuffleHandler {
       FileUtil.fullyDelete(absLogDir);
     }
   }
+
+  @Test(timeout = 4000)
+  public void testSendMapCount() throws Exception {
+    final List<ShuffleHandler.ReduceMapFileCount> listenerList =
+        new ArrayList<ShuffleHandler.ReduceMapFileCount>();
+
+    final ChannelHandlerContext mockCtx =
+        Mockito.mock(ChannelHandlerContext.class);
+    final MessageEvent mockEvt = Mockito.mock(MessageEvent.class);
+    final Channel mockCh = Mockito.mock(AbstractChannel.class);
+
+    // Mock HttpRequest and ChannelFuture
+    final HttpRequest mockHttpRequest = createMockHttpRequest();
+    final ChannelFuture mockFuture = createMockChannelFuture(mockCh,
+        listenerList);
+
+    // Mock Netty Channel Context and Channel behavior
+    Mockito.doReturn(mockCh).when(mockCtx).getChannel();
+    Mockito.when(mockCtx.getChannel()).thenReturn(mockCh);
+    Mockito.doReturn(mockFuture).when(mockCh).write(Mockito.any(Object.class));
+    Mockito.when(mockCh.write(Object.class)).thenReturn(mockFuture);
+
+    //Mock MessageEvent behavior
+    Mockito.doReturn(mockCh).when(mockEvt).getChannel();
+    Mockito.when(mockEvt.getChannel()).thenReturn(mockCh);
+    Mockito.doReturn(mockHttpRequest).when(mockEvt).getMessage();
+
+    final ShuffleHandler sh = new MockShuffleHandler();
+    Configuration conf = new Configuration();
+    sh.init(conf);
+    sh.start();
+    int maxOpenFiles =conf.getInt(ShuffleHandler.SHUFFLE_MAX_SESSION_OPEN_FILES,
+        ShuffleHandler.DEFAULT_SHUFFLE_MAX_SESSION_OPEN_FILES);
+    sh.getShuffle(conf).messageReceived(mockCtx, mockEvt);
+    assertTrue("Number of Open files should not exceed the configured " +
+            "value!-Not Expected",
+        listenerList.size() <= maxOpenFiles);
+    while(!listenerList.isEmpty()) {
+      listenerList.remove(0).operationComplete(mockFuture);
+      assertTrue("Number of Open files should not exceed the configured " +
+              "value!-Not Expected",
+          listenerList.size() <= maxOpenFiles);
+    }
+    sh.close();
+  }
+
+  public ChannelFuture createMockChannelFuture(Channel mockCh,
+      final List<ShuffleHandler.ReduceMapFileCount> listenerList) {
+    final ChannelFuture mockFuture = Mockito.mock(ChannelFuture.class);
+    Mockito.when(mockFuture.getChannel()).thenReturn(mockCh);
+    Mockito.doReturn(true).when(mockFuture).isSuccess();
+    Mockito.doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        //Add ReduceMapFileCount listener to a list
+        if (invocation.getArguments()[0].getClass() ==
+            ShuffleHandler.ReduceMapFileCount.class)
+          listenerList.add((ShuffleHandler.ReduceMapFileCount)
+              invocation.getArguments()[0]);
+        return null;
+      }
+    }).when(mockFuture).addListener(Mockito.any(
+        ShuffleHandler.ReduceMapFileCount.class));
+    return mockFuture;
+  }
+
+  public HttpRequest createMockHttpRequest() {
+    HttpRequest mockHttpRequest = Mockito.mock(HttpRequest.class);
+    Mockito.doReturn(HttpMethod.GET).when(mockHttpRequest).getMethod();
+    Mockito.doAnswer(new Answer() {
+      @Override
+      public Object answer(InvocationOnMock invocation) throws Throwable {
+        String uri = "/mapOutput?job=job_12345_1&reduce=1";
+        for (int i = 0; i < 100; i++)
+          uri = uri.concat("&map=attempt_12345_1_m_" + i + "_0");
+        return uri;
+      }
+    }).when(mockHttpRequest).getUri();
+    return mockHttpRequest;
+  }
 }