You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by de...@apache.org on 2015/07/06 10:20:44 UTC

hadoop git commit: MAPREDUCE-6425. ShuffleHandler passes wrong "base" parameter to getMapOutputInfo if mapId is not in the cache. Contributed by zhihai xu.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 688617d6d -> bff67dfe2


MAPREDUCE-6425. ShuffleHandler passes wrong "base" parameter to
getMapOutputInfo if mapId is not in the cache. Contributed by zhihai xu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/bff67dfe
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/bff67dfe
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/bff67dfe

Branch: refs/heads/trunk
Commit: bff67dfe2f811654ffb1bbcbd87509c185f452b6
Parents: 688617d
Author: Devaraj K <de...@apache.org>
Authored: Mon Jul 6 13:46:37 2015 +0530
Committer: Devaraj K <de...@apache.org>
Committed: Mon Jul 6 13:46:37 2015 +0530

----------------------------------------------------------------------
 hadoop-mapreduce-project/CHANGES.txt            |   3 +
 .../apache/hadoop/mapred/ShuffleHandler.java    |   3 +-
 .../hadoop/mapred/TestShuffleHandler.java       | 101 +++++++++++++++++++
 3 files changed, 106 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff67dfe/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 2f80615..2458403 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -542,6 +542,9 @@ Release 2.7.2 - UNRELEASED
 
   BUG FIXES
 
+    MAPREDUCE-6425. ShuffleHandler passes wrong "base" parameter to getMapOutputInfo
+    if mapId is not in the cache. (zhihai xu via devaraj)
+
 Release 2.7.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff67dfe/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
index eedf42b..ee1be23 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
@@ -815,7 +815,8 @@ public class ShuffleHandler extends AuxiliaryService {
         try {
           MapOutputInfo info = mapOutputInfoMap.get(mapId);
           if (info == null) {
-            info = getMapOutputInfo(outputBasePathStr, mapId, reduceId, user);
+            info = getMapOutputInfo(outputBasePathStr + mapId,
+                mapId, reduceId, user);
           }
           lastMap =
               sendMapOutput(ctx, ch, user, mapId,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bff67dfe/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
index 7053653..746071f 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
@@ -601,6 +601,7 @@ public class TestShuffleHandler {
       Assert.assertTrue((new String(byteArr)).contains(message));
     } finally {
       shuffleHandler.stop();
+      FileUtil.fullyDelete(absLogDir);
     }
   }
 
@@ -829,4 +830,104 @@ public class TestShuffleHandler {
     conn.disconnect();
     return rc;
   }
+
+  @Test(timeout = 100000)
+  public void testGetMapOutputInfo() throws Exception {
+    final ArrayList<Throwable> failures = new ArrayList<Throwable>(1);
+    Configuration conf = new Configuration();
+    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+    conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
+    File absLogDir = new File("target", TestShuffleHandler.class.
+        getSimpleName() + "LocDir").getAbsoluteFile();
+    conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath());
+    ApplicationId appId = ApplicationId.newInstance(12345, 1);
+    String appAttemptId = "attempt_12345_1_m_1_0";
+    String user = "randomUser";
+    String reducerId = "0";
+    List<File> fileMap = new ArrayList<File>();
+    createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId,
+        conf, fileMap);
+    ShuffleHandler shuffleHandler = new ShuffleHandler() {
+      @Override
+      protected Shuffle getShuffle(Configuration conf) {
+        // replace the shuffle handler with one stubbed for testing
+        return new Shuffle(conf) {
+          @Override
+          protected void populateHeaders(List<String> mapIds,
+              String outputBaseStr, String user, int reduce,
+              HttpRequest request, HttpResponse response,
+              boolean keepAliveParam, Map<String, MapOutputInfo> infoMap)
+              throws IOException {
+            // Only set response headers and skip everything else
+            // send some dummy value for content-length
+            super.setResponseHeaders(response, keepAliveParam, 100);
+          }
+          @Override
+          protected void verifyRequest(String appid,
+              ChannelHandlerContext ctx, HttpRequest request,
+              HttpResponse response, URL requestUri) throws IOException {
+            // Do nothing.
+          }
+          @Override
+          protected void sendError(ChannelHandlerContext ctx, String message,
+              HttpResponseStatus status) {
+            if (failures.size() == 0) {
+              failures.add(new Error(message));
+              ctx.getChannel().close();
+            }
+          }
+          @Override
+          protected ChannelFuture sendMapOutput(ChannelHandlerContext ctx,
+              Channel ch, String user, String mapId, int reduce,
+              MapOutputInfo info) throws IOException {
+            // send a shuffle header
+            ShuffleHeader header =
+                new ShuffleHeader("attempt_12345_1_m_1_0", 5678, 5678, 1);
+            DataOutputBuffer dob = new DataOutputBuffer();
+            header.write(dob);
+            return ch.write(wrappedBuffer(dob.getData(), 0, dob.getLength()));
+          }
+        };
+      }
+    };
+    shuffleHandler.init(conf);
+    try {
+      shuffleHandler.start();
+      DataOutputBuffer outputBuffer = new DataOutputBuffer();
+      outputBuffer.reset();
+      Token<JobTokenIdentifier> jt =
+          new Token<JobTokenIdentifier>("identifier".getBytes(),
+          "password".getBytes(), new Text(user), new Text("shuffleService"));
+      jt.write(outputBuffer);
+      shuffleHandler
+          .initializeApplication(new ApplicationInitializationContext(user,
+          appId, ByteBuffer.wrap(outputBuffer.getData(), 0,
+          outputBuffer.getLength())));
+      URL url =
+          new URL(
+              "http://127.0.0.1:"
+                  + shuffleHandler.getConfig().get(
+                      ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
+                  + "/mapOutput?job=job_12345_0001&reduce=" + reducerId
+                  + "&map=attempt_12345_1_m_1_0");
+      HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+      conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_NAME,
+          ShuffleHeader.DEFAULT_HTTP_HEADER_NAME);
+      conn.setRequestProperty(ShuffleHeader.HTTP_HEADER_VERSION,
+          ShuffleHeader.DEFAULT_HTTP_HEADER_VERSION);
+      conn.connect();
+      try {
+        DataInputStream is = new DataInputStream(conn.getInputStream());
+        ShuffleHeader header = new ShuffleHeader();
+        header.readFields(is);
+        is.close();
+      } catch (EOFException e) {
+        // ignore
+      }
+      Assert.assertEquals(failures.size(), 0);
+    } finally {
+      shuffleHandler.stop();
+      FileUtil.fullyDelete(absLogDir);
+    }
+  }
 }