You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2013/05/12 23:59:48 UTC

svn commit: r1481658 - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/o...

Author: vinodkv
Date: Sun May 12 21:59:48 2013
New Revision: 1481658

URL: http://svn.apache.org/r1481658
Log:
MAPREDUCE-5208. Modified ShuffleHandler to use SecureIOUtils for reading local files. Contributed by Omkar Vinit Joshi.
svn merge --ignore-ancestry -c 1481657 ../../trunk/

Added:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/resources/krb5.conf
      - copied unchanged from r1481657, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/resources/krb5.conf
Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1481658&r1=1481657&r2=1481658&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Sun May 12 21:59:48 2013
@@ -239,6 +239,9 @@ Release 2.0.5-beta - UNRELEASED
     MAPREDUCE-5239. Updated MR App to reflect YarnRemoteException changes after
     YARN-634. (Siddharth Seth via vinodkv)
 
+    MAPREDUCE-5208. Modified ShuffleHandler to use SecureIOUtils for reading
+    local files. (Omkar Vinit Joshi via vinodkv)
+
 Release 2.0.4-alpha - 2013-04-25
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java?rev=1481658&r1=1481657&r2=1481658&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java Sun May 12 21:59:48 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.mapred;
 
 import static org.apache.hadoop.mapred.MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH;
 
+import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.LongBuffer;
@@ -34,6 +35,7 @@ import org.apache.hadoop.fs.FSDataOutput
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SecureIOUtils;
 import org.apache.hadoop.util.PureJavaCrc32;
 
 @InterfaceAudience.LimitedPrivate({"MapReduce"})
@@ -65,17 +67,19 @@ public class SpillRecord {
       throws IOException {
 
     final FileSystem rfs = FileSystem.getLocal(job).getRaw();
-    final FSDataInputStream in = rfs.open(indexFileName);
+    final FSDataInputStream in =
+        SecureIOUtils.openFSDataInputStream(new File(indexFileName.toUri()
+            .getRawPath()), expectedIndexOwner, null);
     try {
       final long length = rfs.getFileStatus(indexFileName).getLen();
       final int partitions = (int) length / MAP_OUTPUT_INDEX_RECORD_LENGTH;
       final int size = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
-
       buf = ByteBuffer.allocate(size);
       if (crc != null) {
         crc.reset();
         CheckedInputStream chk = new CheckedInputStream(in, crc);
         IOUtils.readFully(chk, buf.array(), 0, size);
+        
         if (chk.getChecksum().getValue() != in.readLong()) {
           throw new ChecksumException("Checksum error reading spill index: " +
                                 indexFileName, -1);

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1481658&r1=1481657&r2=1481658&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Sun May 12 21:59:48 2013
@@ -58,9 +58,9 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataInputByteBuffer;
 import org.apache.hadoop.io.DataOutputBuffer;
 import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.SecureIOUtils;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
-import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
 import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
@@ -71,6 +71,7 @@ import org.apache.hadoop.metrics2.lib.De
 import org.apache.hadoop.metrics2.lib.MutableCounterInt;
 import org.apache.hadoop.metrics2.lib.MutableCounterLong;
 import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.security.ssl.SSLFactory;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -490,8 +491,14 @@ public class ShuffleHandler extends Abst
             return;
           }
         } catch (IOException e) {
-          LOG.error("Shuffle error ", e);
-          sendError(ctx, e.getMessage(), INTERNAL_SERVER_ERROR);
+          LOG.error("Shuffle error :", e);
+          StringBuffer sb = new StringBuffer(e.getMessage());
+          Throwable t = e;
+          while (t.getCause() != null) {
+            sb.append(t.getCause().getMessage());
+            t = t.getCause();
+          }
+          sendError(ctx,sb.toString() , INTERNAL_SERVER_ERROR);
           return;
         }
       }
@@ -572,7 +579,7 @@ public class ShuffleHandler extends Abst
       final File spillfile = new File(mapOutputFileName.toString());
       RandomAccessFile spill;
       try {
-        spill = new RandomAccessFile(spillfile, "r");
+        spill = SecureIOUtils.openForRandomRead(spillfile, "r", user, null);
       } catch (FileNotFoundException e) {
         LOG.info(spillfile + " not found");
         return null;

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java?rev=1481658&r1=1481657&r2=1481658&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java Sun May 12 21:59:48 2013
@@ -24,22 +24,48 @@ import static org.apache.hadoop.test.Moc
 import static org.apache.hadoop.test.MockitoMaker.stub;
 import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeTrue;
+
 import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.net.HttpURLConnection;
 import java.net.SocketException;
 import java.net.URL;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.zip.CheckedOutputStream;
+import java.util.zip.Checksum;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
 import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.metrics2.MetricsSource;
 import org.apache.hadoop.metrics2.MetricsSystem;
 import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.PureJavaCrc32;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
+import org.apache.hadoop.yarn.util.BuilderUtils;
 import org.jboss.netty.channel.Channel;
 import org.jboss.netty.channel.ChannelFuture;
 import org.jboss.netty.channel.ChannelHandlerContext;
@@ -245,4 +271,131 @@ public class TestShuffleHandler {
     
     shuffleHandler.stop(); 
   }
+  
+  @Test(timeout = 100000)
+  public void testMapFileAccess() throws IOException {
+    // This will run only in NativeIO is enabled as SecureIOUtils need it
+    assumeTrue(NativeIO.isAvailable());
+    Configuration conf = new Configuration();
+    conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+    conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
+    conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+        "kerberos");
+    UserGroupInformation.setConfiguration(conf);
+    File absLogDir = new File("target",
+        TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile();
+    conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath());
+    ApplicationId appId = BuilderUtils.newApplicationId(12345, 1);
+    System.out.println(appId.toString());
+    String appAttemptId = "attempt_12345_1_m_1_0";
+    String user = "randomUser";
+    String reducerId = "0";
+    List<File> fileMap = new ArrayList<File>();
+    createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId,
+        conf, fileMap);
+    ShuffleHandler shuffleHandler = new ShuffleHandler() {
+
+      @Override
+      protected Shuffle getShuffle(Configuration conf) {
+        // replace the shuffle handler with one stubbed for testing
+        return new Shuffle(conf) {
+
+          @Override
+          protected void verifyRequest(String appid, ChannelHandlerContext ctx,
+              HttpRequest request, HttpResponse response, URL requestUri)
+              throws IOException {
+          }
+
+        };
+      }
+    };
+    shuffleHandler.init(conf);
+    try {
+      shuffleHandler.start();
+      DataOutputBuffer outputBuffer = new DataOutputBuffer();
+      outputBuffer.reset();
+      Token<JobTokenIdentifier> jt =
+          new Token<JobTokenIdentifier>("identifier".getBytes(),
+              "password".getBytes(), new Text(user), new Text("shuffleService"));
+      jt.write(outputBuffer);
+      shuffleHandler.initApp(user, appId,
+          ByteBuffer.wrap(outputBuffer.getData(), 0, outputBuffer.getLength()));
+      URL url =
+          new URL(
+              "http://127.0.0.1:"
+                  + shuffleHandler.getConfig().get(
+                      ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
+                  + "/mapOutput?job=job_12345_0001&reduce=" + reducerId
+                  + "&map=attempt_12345_1_m_1_0");
+      HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+      conn.connect();
+      byte[] byteArr = new byte[10000];
+      try {
+        DataInputStream is = new DataInputStream(conn.getInputStream());
+        is.readFully(byteArr);
+      } catch (EOFException e) {
+        // ignore
+      }
+      // Retrieve file owner name
+      FileInputStream is = new FileInputStream(fileMap.get(0));
+      String owner = NativeIO.POSIX.getFstat(is.getFD()).getOwner();
+      is.close();
+
+      String message =
+          "Owner '" + owner + "' for path " + fileMap.get(0).getAbsolutePath()
+              + " did not match expected owner '" + user + "'";
+      Assert.assertTrue((new String(byteArr)).contains(message));
+    } finally {
+      shuffleHandler.stop();
+    }
+  }
+
+  public static void createShuffleHandlerFiles(File logDir, String user,
+      String appId, String appAttemptId, Configuration conf,
+      List<File> fileMap) throws IOException {
+    String attemptDir =
+        StringUtils.join(Path.SEPARATOR,
+            new String[] { logDir.getAbsolutePath(),
+                ContainerLocalizer.USERCACHE, user,
+                ContainerLocalizer.APPCACHE, appId, "output", appAttemptId });
+    File appAttemptDir = new File(attemptDir);
+    appAttemptDir.mkdirs();
+    System.out.println(appAttemptDir.getAbsolutePath());
+    File indexFile = new File(appAttemptDir, "file.out.index");
+    fileMap.add(indexFile);
+    createIndexFile(indexFile, conf);
+    File mapOutputFile = new File(appAttemptDir, "file.out");
+    fileMap.add(mapOutputFile);
+    createMapOutputFile(mapOutputFile, conf);
+  }
+
+  public static void
+      createMapOutputFile(File mapOutputFile, Configuration conf)
+          throws IOException {
+    FileOutputStream out = new FileOutputStream(mapOutputFile);
+    out.write("Creating new dummy map output file. Used only for testing"
+        .getBytes());
+    out.flush();
+    out.close();
+  }
+
+  public static void createIndexFile(File indexFile, Configuration conf)
+      throws IOException {
+    if (indexFile.exists()) {
+      System.out.println("Deleting existing file");
+      indexFile.delete();
+    }
+    indexFile.createNewFile();
+    FSDataOutputStream output = FileSystem.getLocal(conf).getRaw().append(
+        new Path(indexFile.getAbsolutePath()));
+    Checksum crc = new PureJavaCrc32();
+    crc.reset();
+    CheckedOutputStream chk = new CheckedOutputStream(output, crc);
+    String msg = "Writing new index file. This file will be used only " +
+        "for the testing.";
+    chk.write(Arrays.copyOf(msg.getBytes(),
+        MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH));
+    output.writeLong(chk.getChecksum().getValue());
+    output.close();
+  }
 }