You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2016/06/21 21:26:10 UTC

hadoop git commit: MAPREDUCE-6197. Cache MapOutputLocations in ShuffleHandler. Contributed by Junping Du

Repository: hadoop
Updated Branches:
  refs/heads/trunk b2c596cdd -> d8107fcd1


MAPREDUCE-6197. Cache MapOutputLocations in ShuffleHandler. Contributed by Junping Du


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d8107fcd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d8107fcd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d8107fcd

Branch: refs/heads/trunk
Commit: d8107fcd1c93c202925f2946d0cd4072fe0aef1e
Parents: b2c596c
Author: Jian He <ji...@apache.org>
Authored: Tue Jun 21 14:25:58 2016 -0700
Committer: Jian He <ji...@apache.org>
Committed: Tue Jun 21 14:25:58 2016 -0700

----------------------------------------------------------------------
 .../apache/hadoop/mapred/ShuffleHandler.java    | 193 +++++++++++++++----
 .../hadoop/mapred/TestShuffleHandler.java       |  16 +-
 2 files changed, 165 insertions(+), 44 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8107fcd/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
index 8cbae81..ed197f2 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
@@ -46,6 +46,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -131,6 +132,12 @@ import org.mortbay.jetty.HttpHeaders;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.cache.Weigher;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.protobuf.ByteString;
 
@@ -156,6 +163,9 @@ public class ShuffleHandler extends AuxiliaryService {
   protected static final Version CURRENT_VERSION_INFO = 
       Version.newInstance(1, 0);
 
+  private static final String DATA_FILE_NAME = "file.out";
+  private static final String INDEX_FILE_NAME = "file.out.index";
+
   private int port;
   private ChannelFactory selector;
   private final ChannelGroup accepted = new DefaultChannelGroup();
@@ -294,12 +304,12 @@ public class ShuffleHandler extends AuxiliaryService {
     private ChannelHandlerContext ctx;
     private String user;
     private Map<String, Shuffle.MapOutputInfo> infoMap;
-    private String outputBasePathStr;
+    private String jobId;
 
     public ReduceContext(List<String> mapIds, int rId,
                          ChannelHandlerContext context, String usr,
                          Map<String, Shuffle.MapOutputInfo> mapOutputInfoMap,
-                         String outputBasePath) {
+                         String jobId) {
 
       this.mapIds = mapIds;
       this.reduceId = rId;
@@ -319,7 +329,7 @@ public class ShuffleHandler extends AuxiliaryService {
       this.ctx = context;
       this.user = usr;
       this.infoMap = mapOutputInfoMap;
-      this.outputBasePathStr = outputBasePath;
+      this.jobId = jobId;
     }
 
     public int getReduceId() {
@@ -338,8 +348,8 @@ public class ShuffleHandler extends AuxiliaryService {
       return infoMap;
     }
 
-    public String getOutputBasePathStr() {
-      return outputBasePathStr;
+    public String getJobId() {
+      return jobId;
     }
 
     public List<String> getMapIds() {
@@ -780,18 +790,63 @@ public class ShuffleHandler extends AuxiliaryService {
 
   class Shuffle extends SimpleChannelUpstreamHandler {
 
+    private static final int MAX_WEIGHT = 10 * 1024 * 1024;
+    private static final int EXPIRE_AFTER_ACCESS_MINUTES = 5;
+    private static final int ALLOWED_CONCURRENCY = 16;
     private final Configuration conf;
     private final IndexCache indexCache;
     private final LocalDirAllocator lDirAlloc =
       new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS);
     private int port;
+    private final LoadingCache<AttemptPathIdentifier, AttemptPathInfo> pathCache =
+      CacheBuilder.newBuilder().expireAfterAccess(EXPIRE_AFTER_ACCESS_MINUTES,
+      TimeUnit.MINUTES).softValues().concurrencyLevel(ALLOWED_CONCURRENCY).
+      removalListener(
+          new RemovalListener<AttemptPathIdentifier, AttemptPathInfo>() {
+            @Override
+            public void onRemoval(RemovalNotification<AttemptPathIdentifier,
+                AttemptPathInfo> notification) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("PathCache Eviction: " + notification.getKey() +
+                    ", Reason=" + notification.getCause());
+              }
+            }
+          }
+      ).maximumWeight(MAX_WEIGHT).weigher(
+          new Weigher<AttemptPathIdentifier, AttemptPathInfo>() {
+            @Override
+            public int weigh(AttemptPathIdentifier key,
+                AttemptPathInfo value) {
+              return key.jobId.length() + key.user.length() +
+                  key.attemptId.length()+
+                  value.indexPath.toString().length() +
+                  value.dataPath.toString().length();
+            }
+          }
+      ).build(new CacheLoader<AttemptPathIdentifier, AttemptPathInfo>() {
+        @Override
+        public AttemptPathInfo load(AttemptPathIdentifier key) throws
+            Exception {
+          String base = getBaseLocation(key.jobId, key.user);
+          String attemptBase = base + key.attemptId;
+          Path indexFileName = lDirAlloc.getLocalPathToRead(
+              attemptBase + "/" + INDEX_FILE_NAME, conf);
+          Path mapOutputFileName = lDirAlloc.getLocalPathToRead(
+              attemptBase + "/" + DATA_FILE_NAME, conf);
+
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Loaded : " + key + " via loader");
+          }
+          return new AttemptPathInfo(indexFileName, mapOutputFileName);
+        }
+      });
 
     public Shuffle(Configuration conf) {
       this.conf = conf;
       indexCache = new IndexCache(new JobConf(conf));
       this.port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
     }
-    
+
     public void setPort(int port) {
       this.port = port;
     }
@@ -908,13 +963,8 @@ public class ShuffleHandler extends AuxiliaryService {
       Channel ch = evt.getChannel();
       String user = userRsrc.get(jobId);
 
-      // $x/$user/appcache/$appId/output/$mapId
-      // TODO: Once Shuffle is out of NM, this can use MR APIs to convert
-      // between App and Job
-      String outputBasePathStr = getBaseLocation(jobId, user);
-
       try {
-        populateHeaders(mapIds, outputBasePathStr, user, reduceId, request,
+        populateHeaders(mapIds, jobId, user, reduceId, request,
           response, keepAliveParam, mapOutputInfoMap);
       } catch(IOException e) {
         ch.write(response);
@@ -926,7 +976,7 @@ public class ShuffleHandler extends AuxiliaryService {
       ch.write(response);
       //Initialize one ReduceContext object per messageReceived call
       ReduceContext reduceContext = new ReduceContext(mapIds, reduceId, ctx,
-          user, mapOutputInfoMap, outputBasePathStr);
+          user, mapOutputInfoMap, jobId);
       for (int i = 0; i < Math.min(maxSessionOpenFiles, mapIds.size()); i++) {
         ChannelFuture nextMap = sendMap(reduceContext);
         if(nextMap == null) {
@@ -957,9 +1007,8 @@ public class ShuffleHandler extends AuxiliaryService {
         try {
           MapOutputInfo info = reduceContext.getInfoMap().get(mapId);
           if (info == null) {
-            info = getMapOutputInfo(reduceContext.getOutputBasePathStr() +
-                       mapId, mapId, reduceContext.getReduceId(),
-                       reduceContext.getUser());
+            info = getMapOutputInfo(mapId, reduceContext.getReduceId(),
+                reduceContext.getJobId(), reduceContext.getUser());
           }
           nextMap = sendMapOutput(
               reduceContext.getCtx(),
@@ -1003,46 +1052,58 @@ public class ShuffleHandler extends AuxiliaryService {
       return baseStr;
     }
 
-    protected MapOutputInfo getMapOutputInfo(String base, String mapId,
-        int reduce, String user) throws IOException {
-      // Index file
-      Path indexFileName =
-          lDirAlloc.getLocalPathToRead(base + "/file.out.index", conf);
+    protected MapOutputInfo getMapOutputInfo(String mapId, int reduce,
+        String jobId, String user) throws IOException {
+      AttemptPathInfo pathInfo;
+      try {
+        AttemptPathIdentifier identifier = new AttemptPathIdentifier(
+            jobId, user, mapId);
+        pathInfo = pathCache.get(identifier);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Retrieved pathInfo for " + identifier +
+              " check for corresponding loaded messages to determine whether" +
+              " it was loaded or cached");
+        }
+      } catch (ExecutionException e) {
+        if (e.getCause() instanceof IOException) {
+          throw (IOException) e.getCause();
+        } else {
+          throw new RuntimeException(e.getCause());
+        }
+      }
+
       IndexRecord info =
-          indexCache.getIndexInformation(mapId, reduce, indexFileName, user);
+        indexCache.getIndexInformation(mapId, reduce, pathInfo.indexPath, user);
 
-      Path mapOutputFileName =
-          lDirAlloc.getLocalPathToRead(base + "/file.out", conf);
       if (LOG.isDebugEnabled()) {
-        LOG.debug(base + " : " + mapOutputFileName + " : " + indexFileName);
+        LOG.debug("getMapOutputInfo: jobId=" + jobId + ", mapId=" + mapId +
+            ",dataFile=" + pathInfo.dataPath + ", indexFile=" +
+            pathInfo.indexPath);
       }
-      MapOutputInfo outputInfo = new MapOutputInfo(mapOutputFileName, info);
+
+      MapOutputInfo outputInfo = new MapOutputInfo(pathInfo.dataPath, info);
       return outputInfo;
     }
 
-    protected void populateHeaders(List<String> mapIds, String outputBaseStr,
+    protected void populateHeaders(List<String> mapIds, String jobId,
         String user, int reduce, HttpRequest request, HttpResponse response,
         boolean keepAliveParam, Map<String, MapOutputInfo> mapOutputInfoMap)
         throws IOException {
 
       long contentLength = 0;
       for (String mapId : mapIds) {
-        String base = outputBaseStr + mapId;
-        MapOutputInfo outputInfo = getMapOutputInfo(base, mapId, reduce, user);
+        MapOutputInfo outputInfo = getMapOutputInfo(mapId, reduce, jobId, user);
         if (mapOutputInfoMap.size() < mapOutputMetaInfoCacheSize) {
           mapOutputInfoMap.put(mapId, outputInfo);
         }
-        // Index file
-        Path indexFileName =
-            lDirAlloc.getLocalPathToRead(base + "/file.out.index", conf);
-        IndexRecord info =
-            indexCache.getIndexInformation(mapId, reduce, indexFileName, user);
+
         ShuffleHeader header =
-            new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce);
+            new ShuffleHeader(mapId, outputInfo.indexRecord.partLength,
+            outputInfo.indexRecord.rawLength, reduce);
         DataOutputBuffer dob = new DataOutputBuffer();
         header.write(dob);
 
-        contentLength += info.partLength;
+        contentLength += outputInfo.indexRecord.partLength;
         contentLength += dob.getLength();
       }
 
@@ -1215,4 +1276,64 @@ public class ShuffleHandler extends AuxiliaryService {
       }
     }
   }
+  
+  static class AttemptPathInfo {
+    // TODO Change this over to just store local dir indices, instead of the
+    // entire path. Far more efficient.
+    private final Path indexPath;
+    private final Path dataPath;
+
+    public AttemptPathInfo(Path indexPath, Path dataPath) {
+      this.indexPath = indexPath;
+      this.dataPath = dataPath;
+    }
+  }
+
+  static class AttemptPathIdentifier {
+    private final String jobId;
+    private final String user;
+    private final String attemptId;
+
+    public AttemptPathIdentifier(String jobId, String user, String attemptId) {
+      this.jobId = jobId;
+      this.user = user;
+      this.attemptId = attemptId;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+
+      AttemptPathIdentifier that = (AttemptPathIdentifier) o;
+
+      if (!attemptId.equals(that.attemptId)) {
+        return false;
+      }
+      if (!jobId.equals(that.jobId)) {
+        return false;
+      }
+
+      return true;
+    }
+
+    @Override
+    public int hashCode() {
+      int result = jobId.hashCode();
+      result = 31 * result + attemptId.hashCode();
+      return result;
+    }
+
+    @Override
+    public String toString() {
+      return "AttemptPathIdentifier{" +
+          "attemptId='" + attemptId + '\'' +
+          ", jobId='" + jobId + '\'' +
+          '}';
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8107fcd/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
index 25a622b..1717588 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
@@ -110,8 +110,8 @@ public class TestShuffleHandler {
             throws IOException {
         }
         @Override
-        protected MapOutputInfo getMapOutputInfo(String base, String mapId,
-            int reduce, String user) throws IOException {
+        protected MapOutputInfo getMapOutputInfo(String mapId, int reduce,
+            String jobId, String user) throws IOException {
           // Do nothing.
           return null;
         }
@@ -230,8 +230,8 @@ public class TestShuffleHandler {
         // replace the shuffle handler with one stubbed for testing
         return new Shuffle(conf) {
           @Override
-          protected MapOutputInfo getMapOutputInfo(String base, String mapId,
-              int reduce, String user) throws IOException {
+          protected MapOutputInfo getMapOutputInfo(String mapId, int reduce,
+              String jobId, String user) throws IOException {
             return null;
           }
           @Override
@@ -325,8 +325,8 @@ public class TestShuffleHandler {
         // replace the shuffle handler with one stubbed for testing
         return new Shuffle(conf) {
           @Override
-          protected MapOutputInfo getMapOutputInfo(String base, String mapId,
-              int reduce, String user) throws IOException {
+          protected MapOutputInfo getMapOutputInfo(String mapId, int reduce,
+              String jobId, String user) throws IOException {
             return null;
           }
           @Override
@@ -534,8 +534,8 @@ public class TestShuffleHandler {
         // replace the shuffle handler with one stubbed for testing
         return new Shuffle(conf) {
           @Override
-          protected MapOutputInfo getMapOutputInfo(String base, String mapId,
-              int reduce, String user) throws IOException {
+          protected MapOutputInfo getMapOutputInfo(String mapId, int reduce,
+              String jobId, String user) throws IOException {
             // Do nothing.
             return null;
           }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org