You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by de...@apache.org on 2015/07/06 10:20:44 UTC
hadoop git commit: MAPREDUCE-6425. ShuffleHandler passes wrong "base"
parameter to getMapOutputInfo if mapId is not in the cache. Contributed by
zhihai xu.
Repository: hadoop
Updated Branches:
refs/heads/trunk 688617d6d -> bff67dfe2
MAPREDUCE-6425. ShuffleHandler passes wrong "base" parameter to
getMapOutputInfo if mapId is not in the cache. Contributed by zhihai xu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bff67dfe
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bff67dfe
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bff67dfe
Branch: refs/heads/trunk
Commit: bff67dfe2f811654ffb1bbcbd87509c185f452b6
Parents: 688617d
Author: Devaraj K <de...@apache.org>
Authored: Mon Jul 6 13:46:37 2015 +0530
Committer: Devaraj K <de...@apache.org>
Committed: Mon Jul 6 13:46:37 2015 +0530
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 3 +
.../apache/hadoop/mapred/ShuffleHandler.java | 3 +-
.../hadoop/mapred/TestShuffleHandler.java | 101 +++++++++++++++++++
3 files changed, 106 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff67dfe/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 2f80615..2458403 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -542,6 +542,9 @@ Release 2.7.2 - UNRELEASED
BUG FIXES
+ MAPREDUCE-6425. ShuffleHandler passes wrong "base" parameter to getMapOutputInfo
+ if mapId is not in the cache. (zhihai xu via devaraj)
+
Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff67dfe/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
index eedf42b..ee1be23 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
@@ -815,7 +815,8 @@ public class ShuffleHandler extends AuxiliaryService {
try {
MapOutputInfo info = mapOutputInfoMap.get(mapId);
if (info == null) {
- info = getMapOutputInfo(outputBasePathStr, mapId, reduceId, user);
+ info = getMapOutputInfo(outputBasePathStr + mapId,
+ mapId, reduceId, user);
}
lastMap =
sendMapOutput(ctx, ch, user, mapId,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff67dfe/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
index 7053653..746071f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
@@ -601,6 +601,7 @@ public class TestShuffleHandler {
Assert.assertTrue((new String(byteArr)).contains(message));
} finally {
shuffleHandler.stop();
+ FileUtil.fullyDelete(absLogDir);
}
}
@@ -829,4 +830,104 @@ public class TestShuffleHandler {
conn.disconnect();
return rc;
}
+
+ @Test(timeout = 100000)
+ public void testGetMapOutputInfo() throws Exception {
+ final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
+ Configuration conf = new Configuration();
+ conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+ conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
+ File absLogDir = new File("target", TestShuffleHandler.class.
+ getSimpleName() + "LocDir").getAbsoluteFile();
+ conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath());
+ ApplicationId appId = ApplicationId.newInstance(12345, 1);
+ String appAttemptId = "attempt_12345_1_m_1_0";
+ String user = "randomUser";
+ String reducerId = "0";
+ List<File> fileMap = new ArrayList<File>();
+ createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId,
+ conf, fileMap);
+ ShuffleHandler shuffleHandler = new ShuffleHandler() {
+ @Override
+ protected Shuffle getShuffle(Configuration conf) {
+ // replace the shuffle handler with one stubbed for testing
+ return new Shuffle(conf) {
+ @Override
+ protected void populateHeaders(List<String> mapIds,
+ String outputBaseStr, String user, int reduce,
+ HttpRequest request, HttpResponse response,
+ boolean keepAliveParam, Map<String, MapOutputInfo> infoMap)
+ throws IOException {
+ // Only set response headers and skip everything else
+ // send some dummy value for content-length
+ super.setResponseHeaders(response, keepAliveParam, 100);
+ }
+ @Override
+ protected void verifyRequest(String appid,
+ ChannelHandlerContext ctx, HttpRequest request,
+ HttpResponse response, URL requestUri) throws IOException {
+ // Do nothing.
+ }
+ @Override
+ protected void sendError(ChannelHandlerContext ctx, String message,
+ HttpResponseStatus status) {
+ if (failures.size() == 0) {
+ failures.add(new Error(message));
+ ctx.getChannel().close();
+ }
+ }
+ @Override
+ protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
+ Channel ch, String user, String mapId, int reduce,
+ MapOutputInfo info) throws IOException {
+ // send a shuffle header
+ ShuffleHeader header =
+ new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
+ DataOutputBuffer dob = new DataOutputBuffer();
+ header.write(dob);
+ return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+ }
+ };
+ }
+ };
+ shuffleHandler.init(conf);
+ try {
+ shuffleHandler.start();
+ DataOutputBuffer outputBuffer = new DataOutputBuffer();
+ outputBuffer.reset();
+ Token<JobTokenIdentifier> jt =
+ new Token<JobTokenIdentifier>("identifier".getBytes(),
+ "password".getBytes(), new Text(user), new Text("shuffleService"));
+ jt.write(outputBuffer);
+ shuffleHandler
+ .initializeApplication(new ApplicationInitializationContext(user,
+ appId, ByteBuffer.wrap(outputBuffer.getData(), 0,
+ outputBuffer.getLength())));
+ URL url =
+ new URL(
+ "http://127.0.0.1:"
+ + shuffleHandler.getConfig().get(
+ ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
+ + "/mapOutput?job=job_12345_0001&reduce=" + reducerId
+ + "&map=attempt_12345_1_m_1_0");
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+ conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+ ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+ conn.connect();
+ try {
+ DataInputStream is = new DataInputStream(conn.getInputStream());
+ ShuffleHeader header = new ShuffleHeader();
+ header.readFields(is);
+ is.close();
+ } catch (EOFException e) {
+ // ignore
+ }
+ Assert.assertEquals(failures.size(), 0);
+ } finally {
+ shuffleHandler.stop();
+ FileUtil.fullyDelete(absLogDir);
+ }
+ }
}