You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2013/05/12 23:59:16 UTC
svn commit: r1481657 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/
hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/ha...
Author: vinodkv
Date: Sun May 12 21:59:16 2013
New Revision: 1481657
URL: http://svn.apache.org/r1481657
Log:
MAPREDUCE-5208. Modified ShuffleHandler to use SecureIOUtils for reading local files. Contributed by Omkar Vinit Joshi.
Added:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/resources/krb5.conf
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1481657&r1=1481656&r2=1481657&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Sun May 12 21:59:16 2013
@@ -401,6 +401,9 @@ Release 2.0.5-beta - UNRELEASED
MAPREDUCE-5239. Updated MR App to reflect YarnRemoteException changes after
YARN-634. (Siddharth Seth via vinodkv)
+ MAPREDUCE-5208. Modified ShuffleHandler to use SecureIOUtils for reading
+ local files. (Omkar Vinit Joshi via vinodkv)
+
Release 2.0.4-alpha - 2013-04-25
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java?rev=1481657&r1=1481656&r2=1481657&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SpillRecord.java Sun May 12 21:59:16 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.mapred;
import static org.apache.hadoop.mapred.MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH;
+import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.LongBuffer;
@@ -34,6 +35,7 @@ import org.apache.hadoop.fs.FSDataOutput
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.util.PureJavaCrc32;
@InterfaceAudience.LimitedPrivate({"MapReduce"})
@@ -65,17 +67,19 @@ public class SpillRecord {
throws IOException {
final FileSystem rfs = FileSystem.getLocal(job).getRaw();
- final FSDataInputStream in = rfs.open(indexFileName);
+ final FSDataInputStream in =
+ SecureIOUtils.openFSDataInputStream(new File(indexFileName.toUri()
+ .getRawPath()), expectedIndexOwner, null);
try {
final long length = rfs.getFileStatus(indexFileName).getLen();
final int partitions = (int) length / MAP_OUTPUT_INDEX_RECORD_LENGTH;
final int size = partitions * MAP_OUTPUT_INDEX_RECORD_LENGTH;
-
buf = ByteBuffer.allocate(size);
if (crc != null) {
crc.reset();
CheckedInputStream chk = new CheckedInputStream(in, crc);
IOUtils.readFully(chk, buf.array(), 0, size);
+
if (chk.getChecksum().getValue() != in.readLong()) {
throw new ChecksumException("Checksum error reading spill index: " +
indexFileName, -1);
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java?rev=1481657&r1=1481656&r2=1481657&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/main/java/org/apache/hadoop/mapred/ShuffleHandler.java Sun May 12 21:59:16 2013
@@ -58,9 +58,9 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataInputByteBuffer;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.SecureIOUtils;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.security.SecureShuffleUtils;
-import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager;
import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
@@ -71,6 +71,7 @@ import org.apache.hadoop.metrics2.lib.De
import org.apache.hadoop.metrics2.lib.MutableCounterInt;
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
+import org.apache.hadoop.security.ssl.SSLFactory;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -490,8 +491,14 @@ public class ShuffleHandler extends Abst
return;
}
} catch (IOException e) {
- LOG.error("Shuffle error ", e);
- sendError(ctx, e.getMessage(), INTERNAL_SERVER_ERROR);
+ LOG.error("Shuffle error :", e);
+ StringBuffer sb = new StringBuffer(e.getMessage());
+ Throwable t = e;
+ while (t.getCause() != null) {
+ sb.append(t.getCause().getMessage());
+ t = t.getCause();
+ }
+ sendError(ctx,sb.toString() , INTERNAL_SERVER_ERROR);
return;
}
}
@@ -572,7 +579,7 @@ public class ShuffleHandler extends Abst
final File spillfile = new File(mapOutputFileName.toString());
RandomAccessFile spill;
try {
- spill = new RandomAccessFile(spillfile, "r");
+ spill = SecureIOUtils.openForRandomRead(spillfile, "r", user, null);
} catch (FileNotFoundException e) {
LOG.info(spillfile + " not found");
return null;
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java?rev=1481657&r1=1481656&r2=1481657&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/java/org/apache/hadoop/mapred/TestShuffleHandler.java Sun May 12 21:59:16 2013
@@ -24,22 +24,48 @@ import static org.apache.hadoop.test.Moc
import static org.apache.hadoop.test.MockitoMaker.stub;
import static org.jboss.netty.buffer.ChannelBuffers.wrappedBuffer;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeTrue;
+
import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.SocketException;
import java.net.URL;
+import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.zip.CheckedOutputStream;
+import java.util.zip.Checksum;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DataOutputBuffer;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.mapreduce.security.token.JobTokenIdentifier;
import org.apache.hadoop.mapreduce.task.reduce.ShuffleHeader;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.impl.MetricsSystemImpl;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.PureJavaCrc32;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.ContainerLocalizer;
+import org.apache.hadoop.yarn.util.BuilderUtils;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelHandlerContext;
@@ -245,4 +271,131 @@ public class TestShuffleHandler {
shuffleHandler.stop();
}
+
+ @Test(timeout = 100000)
+ public void testMapFileAccess() throws IOException {
+ // This will run only in NativeIO is enabled as SecureIOUtils need it
+ assumeTrue(NativeIO.isAvailable());
+ Configuration conf = new Configuration();
+ conf.setInt(ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY, 0);
+ conf.setInt(ShuffleHandler.MAX_SHUFFLE_CONNECTIONS, 3);
+ conf.set(CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
+ "kerberos");
+ UserGroupInformation.setConfiguration(conf);
+ File absLogDir = new File("target",
+ TestShuffleHandler.class.getSimpleName() + "LocDir").getAbsoluteFile();
+ conf.set(YarnConfiguration.NM_LOCAL_DIRS, absLogDir.getAbsolutePath());
+ ApplicationId appId = BuilderUtils.newApplicationId(12345, 1);
+ System.out.println(appId.toString());
+ String appAttemptId = "attempt_12345_1_m_1_0";
+ String user = "randomUser";
+ String reducerId = "0";
+ List<File> fileMap = new ArrayList<File>();
+ createShuffleHandlerFiles(absLogDir, user, appId.toString(), appAttemptId,
+ conf, fileMap);
+ ShuffleHandler shuffleHandler = new ShuffleHandler() {
+
+ @Override
+ protected Shuffle getShuffle(Configuration conf) {
+ // replace the shuffle handler with one stubbed for testing
+ return new Shuffle(conf) {
+
+ @Override
+ protected void verifyRequest(String appid, ChannelHandlerContext ctx,
+ HttpRequest request, HttpResponse response, URL requestUri)
+ throws IOException {
+ }
+
+ };
+ }
+ };
+ shuffleHandler.init(conf);
+ try {
+ shuffleHandler.start();
+ DataOutputBuffer outputBuffer = new DataOutputBuffer();
+ outputBuffer.reset();
+ Token<JobTokenIdentifier> jt =
+ new Token<JobTokenIdentifier>("identifier".getBytes(),
+ "password".getBytes(), new Text(user), new Text("shuffleService"));
+ jt.write(outputBuffer);
+ shuffleHandler.initApp(user, appId,
+ ByteBuffer.wrap(outputBuffer.getData(), 0, outputBuffer.getLength()));
+ URL url =
+ new URL(
+ "http://127.0.0.1:"
+ + shuffleHandler.getConfig().get(
+ ShuffleHandler.SHUFFLE_PORT_CONFIG_KEY)
+ + "/mapOutput?job=job_12345_0001&reduce=" + reducerId
+ + "&map=attempt_12345_1_m_1_0");
+ HttpURLConnection conn = (HttpURLConnection) url.openConnection();
+ conn.connect();
+ byte[] byteArr = new byte[10000];
+ try {
+ DataInputStream is = new DataInputStream(conn.getInputStream());
+ is.readFully(byteArr);
+ } catch (EOFException e) {
+ // ignore
+ }
+ // Retrieve file owner name
+ FileInputStream is = new FileInputStream(fileMap.get(0));
+ String owner = NativeIO.POSIX.getFstat(is.getFD()).getOwner();
+ is.close();
+
+ String message =
+ "Owner '" + owner + "' for path " + fileMap.get(0).getAbsolutePath()
+ + " did not match expected owner '" + user + "'";
+ Assert.assertTrue((new String(byteArr)).contains(message));
+ } finally {
+ shuffleHandler.stop();
+ }
+ }
+
+ public static void createShuffleHandlerFiles(File logDir, String user,
+ String appId, String appAttemptId, Configuration conf,
+ List<File> fileMap) throws IOException {
+ String attemptDir =
+ StringUtils.join(Path.SEPARATOR,
+ new String[] { logDir.getAbsolutePath(),
+ ContainerLocalizer.USERCACHE, user,
+ ContainerLocalizer.APPCACHE, appId, "output", appAttemptId });
+ File appAttemptDir = new File(attemptDir);
+ appAttemptDir.mkdirs();
+ System.out.println(appAttemptDir.getAbsolutePath());
+ File indexFile = new File(appAttemptDir, "file.out.index");
+ fileMap.add(indexFile);
+ createIndexFile(indexFile, conf);
+ File mapOutputFile = new File(appAttemptDir, "file.out");
+ fileMap.add(mapOutputFile);
+ createMapOutputFile(mapOutputFile, conf);
+ }
+
+ public static void
+ createMapOutputFile(File mapOutputFile, Configuration conf)
+ throws IOException {
+ FileOutputStream out = new FileOutputStream(mapOutputFile);
+ out.write("Creating new dummy map output file. Used only for testing"
+ .getBytes());
+ out.flush();
+ out.close();
+ }
+
+ public static void createIndexFile(File indexFile, Configuration conf)
+ throws IOException {
+ if (indexFile.exists()) {
+ System.out.println("Deleting existing file");
+ indexFile.delete();
+ }
+ indexFile.createNewFile();
+ FSDataOutputStream output = FileSystem.getLocal(conf).getRaw().append(
+ new Path(indexFile.getAbsolutePath()));
+ Checksum crc = new PureJavaCrc32();
+ crc.reset();
+ CheckedOutputStream chk = new CheckedOutputStream(output, crc);
+ String msg = "Writing new index file. This file will be used only " +
+ "for the testing.";
+ chk.write(Arrays.copyOf(msg.getBytes(),
+ MapTask.MAP_OUTPUT_INDEX_RECORD_LENGTH));
+ output.writeLong(chk.getChecksum().getValue());
+ output.close();
+ }
}
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/resources/krb5.conf
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/resources/krb5.conf?rev=1481657&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/resources/krb5.conf (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-shuffle/src/test/resources/krb5.conf Sun May 12 21:59:16 2013
@@ -0,0 +1,28 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+[libdefaults]
+ default_realm = APACHE.ORG
+ udp_preference_limit = 1
+ extra_addresses = 127.0.0.1
+[realms]
+ APACHE.ORG = {
+ admin_server = localhost:88
+ kdc = localhost:88
+ }
+[domain_realm]
+ localhost = APACHE.ORG