You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2016/06/21 21:26:10 UTC
hadoop git commit: MAPREDUCE-6197. Cache MapOutputLocations in
ShuffleHandler. Contributed by Junping Du
Repository: hadoop
Updated Branches:
refs/heads/trunk b2c596cdd -> d8107fcd1
MAPREDUCE-6197. Cache MapOutputLocations in ShuffleHandler. Contributed by Junping Du
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d8107fcd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d8107fcd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d8107fcd
Branch: refs/heads/trunk
Commit: d8107fcd1c93c202925f2946d0cd4072fe0aef1e
Parents: b2c596c
Author: Jian He <ji...@apache.org>
Authored: Tue Jun 21 14:25:58 2016 -0700
Committer: Jian He <ji...@apache.org>
Committed: Tue Jun 21 14:25:58 2016 -0700
----------------------------------------------------------------------
.../apache/hadoop/mapred/ShuffleHandler.java | 193 +++++++++++++++----
.../hadoop/mapred/TestShuffleHandler.java | 16 +-
2 files changed, 165 insertions(+), 44 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8107fcd/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
index 8cbae81..ed197f2 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
@@ -46,6 +46,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -131,6 +132,12 @@ import org.mortbay.jetty.HttpHeaders;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
+import com.google.common.cache.RemovalListener;
+import com.google.common.cache.RemovalNotification;
+import com.google.common.cache.Weigher;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ByteString;
@@ -156,6 +163,9 @@ public class ShuffleHandler extends AuxiliaryService {
protected static final Version CURRENT_VERSION_INFO =
Version.newInstance(1, 0);
+ private static final String DATA_FILE_NAME = "file.out";
+ private static final String INDEX_FILE_NAME = "file.out.index";
+
private int port;
private ChannelFactory selector;
private final ChannelGroup accepted = new DefaultChannelGroup();
@@ -294,12 +304,12 @@ public class ShuffleHandler extends AuxiliaryService {
private ChannelHandlerContext ctx;
private String user;
private Map<String, Shuffle.MapOutputInfo> infoMap;
- private String outputBasePathStr;
+ private String jobId;
public ReduceContext(List<String> mapIds, int rId,
ChannelHandlerContext context, String usr,
Map<String, Shuffle.MapOutputInfo> mapOutputInfoMap,
- String outputBasePath) {
+ String jobId) {
this.mapIds = mapIds;
this.reduceId = rId;
@@ -319,7 +329,7 @@ public class ShuffleHandler extends AuxiliaryService {
this.ctx = context;
this.user = usr;
this.infoMap = mapOutputInfoMap;
- this.outputBasePathStr = outputBasePath;
+ this.jobId = jobId;
}
public int getReduceId() {
@@ -338,8 +348,8 @@ public class ShuffleHandler extends AuxiliaryService {
return infoMap;
}
- public String getOutputBasePathStr() {
- return outputBasePathStr;
+ public String getJobId() {
+ return jobId;
}
public List<String> getMapIds() {
@@ -780,18 +790,63 @@ public class ShuffleHandler extends AuxiliaryService {
class Shuffle extends SimpleChannelUpstreamHandler {
+ private static final int MAX_WEIGHT = 10 * 1024 * 1024;
+ private static final int EXPIRE_AFTER_ACCESS_MINUTES = 5;
+ private static final int ALLOWED_CONCURRENCY = 16;
private final Configuration conf;
private final IndexCache indexCache;
private final LocalDirAllocator lDirAlloc =
new LocalDirAllocator(YarnConfiguration.NM_LOCAL_DIRS);
private int port;
+ private final LoadingCache<AttemptPathIdentifier, AttemptPathInfo> pathCache =
+ CacheBuilder.newBuilder().expireAfterAccess(EXPIRE_AFTER_ACCESS_MINUTES,
+ TimeUnit.MINUTES).softValues().concurrencyLevel(ALLOWED_CONCURRENCY).
+ removalListener(
+ new RemovalListener<AttemptPathIdentifier, AttemptPathInfo>() {
+ @Override
+ public void onRemoval(RemovalNotification<AttemptPathIdentifier,
+ AttemptPathInfo> notification) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("PathCache Eviction: " + notification.getKey() +
+ ", Reason=" + notification.getCause());
+ }
+ }
+ }
+ ).maximumWeight(MAX_WEIGHT).weigher(
+ new Weigher<AttemptPathIdentifier, AttemptPathInfo>() {
+ @Override
+ public int weigh(AttemptPathIdentifier key,
+ AttemptPathInfo value) {
+ return key.jobId.length() + key.user.length() +
+ key.attemptId.length()+
+ value.indexPath.toString().length() +
+ value.dataPath.toString().length();
+ }
+ }
+ ).build(new CacheLoader<AttemptPathIdentifier, AttemptPathInfo>() {
+ @Override
+ public AttemptPathInfo load(AttemptPathIdentifier key) throws
+ Exception {
+ String base = getBaseLocation(key.jobId, key.user);
+ String attemptBase = base + key.attemptId;
+ Path indexFileName = lDirAlloc.getLocalPathToRead(
+ attemptBase + "/" + INDEX_FILE_NAME, conf);
+ Path mapOutputFileName = lDirAlloc.getLocalPathToRead(
+ attemptBase + "/" + DATA_FILE_NAME, conf);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Loaded : " + key + " via loader");
+ }
+ return new AttemptPathInfo(indexFileName, mapOutputFileName);
+ }
+ });
public Shuffle(Configuration conf) {
this.conf = conf;
indexCache = new IndexCache(new JobConf(conf));
this.port = conf.getInt(SHUFFLE_PORT_CONFIG_KEY, DEFAULT_SHUFFLE_PORT);
}
-
+
public void setPort(int port) {
this.port = port;
}
@@ -908,13 +963,8 @@ public class ShuffleHandler extends AuxiliaryService {
Channel ch = evt.getChannel();
String user = userRsrc.get(jobId);
- // $x/$user/appcache/$appId/output/$mapId
- // TODO: Once Shuffle is out of NM, this can use MR APIs to convert
- // between App and Job
- String outputBasePathStr = getBaseLocation(jobId, user);
-
try {
- populateHeaders(mapIds, outputBasePathStr, user, reduceId, request,
+ populateHeaders(mapIds, jobId, user, reduceId, request,
response, keepAliveParam, mapOutputInfoMap);
} catch(IOException e) {
ch.write(response);
@@ -926,7 +976,7 @@ public class ShuffleHandler extends AuxiliaryService {
ch.write(response);
//Initialize one ReduceContext object per messageReceived call
ReduceContext reduceContext = new ReduceContext(mapIds, reduceId, ctx,
- user, mapOutputInfoMap, outputBasePathStr);
+ user, mapOutputInfoMap, jobId);
for (int i = 0; i < Math.min(maxSessionOpenFiles, mapIds.size()); i++) {
ChannelFuture nextMap = sendMap(reduceContext);
if(nextMap == null) {
@@ -957,9 +1007,8 @@ public class ShuffleHandler extends AuxiliaryService {
try {
MapOutputInfo info = reduceContext.getInfoMap().get(mapId);
if (info == null) {
- info = getMapOutputInfo(reduceContext.getOutputBasePathStr() +
- mapId, mapId, reduceContext.getReduceId(),
- reduceContext.getUser());
+ info = getMapOutputInfo(mapId, reduceContext.getReduceId(),
+ reduceContext.getJobId(), reduceContext.getUser());
}
nextMap = sendMapOutput(
reduceContext.getCtx(),
@@ -1003,46 +1052,58 @@ public class ShuffleHandler extends AuxiliaryService {
return baseStr;
}
- protected MapOutputInfo getMapOutputInfo(String base, String mapId,
- int reduce, String user) throws IOException {
- // Index file
- Path indexFileName =
- lDirAlloc.getLocalPathToRead(base + "/file.out.index", conf);
+ protected MapOutputInfo getMapOutputInfo(String mapId, int reduce,
+ String jobId, String user) throws IOException {
+ AttemptPathInfo pathInfo;
+ try {
+ AttemptPathIdentifier identifier = new AttemptPathIdentifier(
+ jobId, user, mapId);
+ pathInfo = pathCache.get(identifier);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Retrieved pathInfo for " + identifier +
+ " check for corresponding loaded messages to determine whether" +
+ " it was loaded or cached");
+ }
+ } catch (ExecutionException e) {
+ if (e.getCause() instanceof IOException) {
+ throw (IOException) e.getCause();
+ } else {
+ throw new RuntimeException(e.getCause());
+ }
+ }
+
IndexRecord info =
- indexCache.getIndexInformation(mapId, reduce, indexFileName, user);
+ indexCache.getIndexInformation(mapId, reduce, pathInfo.indexPath, user);
- Path mapOutputFileName =
- lDirAlloc.getLocalPathToRead(base + "/file.out", conf);
if (LOG.isDebugEnabled()) {
- LOG.debug(base + " : " + mapOutputFileName + " : " + indexFileName);
+ LOG.debug("getMapOutputInfo: jobId=" + jobId + ", mapId=" + mapId +
+ ",dataFile=" + pathInfo.dataPath + ", indexFile=" +
+ pathInfo.indexPath);
}
- MapOutputInfo outputInfo = new MapOutputInfo(mapOutputFileName, info);
+
+ MapOutputInfo outputInfo = new MapOutputInfo(pathInfo.dataPath, info);
return outputInfo;
}
- protected void populateHeaders(List<String> mapIds, String outputBaseStr,
+ protected void populateHeaders(List<String> mapIds, String jobId,
String user, int reduce, HttpRequest request, HttpResponse response,
boolean keepAliveParam, Map<String, MapOutputInfo> mapOutputInfoMap)
throws IOException {
long contentLength = 0;
for (String mapId : mapIds) {
- String base = outputBaseStr + mapId;
- MapOutputInfo outputInfo = getMapOutputInfo(base, mapId, reduce, user);
+ MapOutputInfo outputInfo = getMapOutputInfo(mapId, reduce, jobId, user);
if (mapOutputInfoMap.size() < mapOutputMetaInfoCacheSize) {
mapOutputInfoMap.put(mapId, outputInfo);
}
- // Index file
- Path indexFileName =
- lDirAlloc.getLocalPathToRead(base + "/file.out.index", conf);
- IndexRecord info =
- indexCache.getIndexInformation(mapId, reduce, indexFileName, user);
+
ShuffleHeader header =
- new ShuffleHeader(mapId, info.partLength, info.rawLength, reduce);
+ new ShuffleHeader(mapId, outputInfo.indexRecord.partLength,
+ outputInfo.indexRecord.rawLength, reduce);
DataOutputBuffer dob = new DataOutputBuffer();
header.write(dob);
- contentLength += info.partLength;
+ contentLength += outputInfo.indexRecord.partLength;
contentLength += dob.getLength();
}
@@ -1215,4 +1276,64 @@ public class ShuffleHandler extends AuxiliaryService {
}
}
}
+
+ static class AttemptPathInfo {
+ // TODO Change this over to just store local dir indices, instead of the
+ // entire path. Far more efficient.
+ private final Path indexPath;
+ private final Path dataPath;
+
+ public AttemptPathInfo(Path indexPath, Path dataPath) {
+ this.indexPath = indexPath;
+ this.dataPath = dataPath;
+ }
+ }
+
+ static class AttemptPathIdentifier {
+ private final String jobId;
+ private final String user;
+ private final String attemptId;
+
+ public AttemptPathIdentifier(String jobId, String user, String attemptId) {
+ this.jobId = jobId;
+ this.user = user;
+ this.attemptId = attemptId;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ AttemptPathIdentifier that = (AttemptPathIdentifier) o;
+
+ if (!attemptId.equals(that.attemptId)) {
+ return false;
+ }
+ if (!jobId.equals(that.jobId)) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = jobId.hashCode();
+ result = 31 * result + attemptId.hashCode();
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "AttemptPathIdentifier{" +
+ "attemptId='" + attemptId + '\'' +
+ ", jobId='" + jobId + '\'' +
+ '}';
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d8107fcd/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
index 25a622b..1717588 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
@@ -110,8 +110,8 @@ public class TestShuffleHandler {
throws IOException {
}
@Override
- protected MapOutputInfo getMapOutputInfo(String base, String mapId,
- int reduce, String user) throws IOException {
+ protected MapOutputInfo getMapOutputInfo(String mapId, int reduce,
+ String jobId, String user) throws IOException {
// Do nothing.
return null;
}
@@ -230,8 +230,8 @@ public class TestShuffleHandler {
// replace the shuffle handler with one stubbed for testing
return new Shuffle(conf) {
@Override
- protected MapOutputInfo getMapOutputInfo(String base, String mapId,
- int reduce, String user) throws IOException {
+ protected MapOutputInfo getMapOutputInfo(String mapId, int reduce,
+ String jobId, String user) throws IOException {
return null;
}
@Override
@@ -325,8 +325,8 @@ public class TestShuffleHandler {
// replace the shuffle handler with one stubbed for testing
return new Shuffle(conf) {
@Override
- protected MapOutputInfo getMapOutputInfo(String base, String mapId,
- int reduce, String user) throws IOException {
+ protected MapOutputInfo getMapOutputInfo(String mapId, int reduce,
+ String jobId, String user) throws IOException {
return null;
}
@Override
@@ -534,8 +534,8 @@ public class TestShuffleHandler {
// replace the shuffle handler with one stubbed for testing
return new Shuffle(conf) {
@Override
- protected MapOutputInfo getMapOutputInfo(String base, String mapId,
- int reduce, String user) throws IOException {
+ protected MapOutputInfo getMapOutputInfo(String mapId, int reduce,
+ String jobId, String user) throws IOException {
// Do nothing.
return null;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org