You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:42:39 UTC
svn commit: r1077116 [2/2] - in
/hadoop/common/branches/branch-0.20-security-patches/src:
c++/task-controller/ c++/task-controller/tests/
docs/src/documentation/content/xdocs/ mapred/org/apache/hadoop/filecache/
mapred/org/apache/hadoop/mapred/ mapred/...
Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java?rev=1077116&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/tasktracker/Localizer.java Fri Mar 4 03:42:38 2011
@@ -0,0 +1,361 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapreduce.server.tasktracker;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.JobID;
+import org.apache.hadoop.mapred.TaskController;
+import org.apache.hadoop.mapred.TaskTracker;
+import org.apache.hadoop.mapred.TaskController.InitializationContext;
+
+/**
+ *
+ * NOTE: This class is internal only and not intended for users!!
+ */
+public class Localizer {
+
+ static final Log LOG = LogFactory.getLog(Localizer.class);
+
+ private FileSystem fs;
+ private String[] localDirs;
+ private TaskController taskController;
+
+ /**
+ * Create a Localizer instance
+ *
+ * @param fileSys
+ * @param lDirs
+ * @param tc
+ */
+ public Localizer(FileSystem fileSys, String[] lDirs, TaskController tc) {
+ fs = fileSys;
+ localDirs = lDirs;
+ taskController = tc;
+ }
+
+ /**
+ * NOTE: This class is internal only class and not intended for users!!
+ *
+ */
+ public static class PermissionsHandler {
+ /**
+ * Permission information useful for setting permissions for a given path.
+ * Using this, one can set all possible combinations of permissions for the
+ * owner of the file. But permissions for the group and all others can only
+ * be set together, i.e. permissions for group cannot be set different from
+ * those for others and vice versa.
+ */
+ public static class PermissionsInfo {
+ public boolean readPermissions;
+ public boolean writePermissions;
+ public boolean executablePermissions;
+ public boolean readPermsOwnerOnly;
+ public boolean writePermsOwnerOnly;
+ public boolean executePermsOwnerOnly;
+
+ /**
+ * Create a permissions-info object with the given attributes
+ *
+ * @param readPerms
+ * @param writePerms
+ * @param executePerms
+ * @param readOwnerOnly
+ * @param writeOwnerOnly
+ * @param executeOwnerOnly
+ */
+ public PermissionsInfo(boolean readPerms, boolean writePerms,
+ boolean executePerms, boolean readOwnerOnly, boolean writeOwnerOnly,
+ boolean executeOwnerOnly) {
+ readPermissions = readPerms;
+ writePermissions = writePerms;
+ executablePermissions = executePerms;
+ readPermsOwnerOnly = readOwnerOnly;
+ writePermsOwnerOnly = writeOwnerOnly;
+ executePermsOwnerOnly = executeOwnerOnly;
+ }
+ }
+
+ /**
+ * Set permission on the given file path using the specified permissions
+ * information. We use java api to set permission instead of spawning chmod
+ * processes. This saves a lot of time. Using this, one can set all possible
+ * combinations of permissions for the owner of the file. But permissions
+ * for the group and all others can only be set together, i.e. permissions
+ * for group cannot be set different from those for others and vice versa.
+ *
+ * This method should satisfy the needs of most of the applications. For
+ * those it doesn't, {@link FileUtil#chmod} can be used.
+ *
+ * @param f file path
+ * @param pInfo permissions information
+ * @return true if success, false otherwise
+ */
+ public static boolean setPermissions(File f, PermissionsInfo pInfo) {
+ if (pInfo == null) {
+ LOG.debug(" PermissionsInfo is null, returning.");
+ return true;
+ }
+
+ LOG.debug("Setting permission for " + f.getAbsolutePath());
+
+ boolean ret = true;
+
+ // Clear all the flags
+ ret = f.setReadable(false, false) && ret;
+ ret = f.setWritable(false, false) && ret;
+ ret = f.setExecutable(false, false) && ret;
+
+ ret = f.setReadable(pInfo.readPermissions, pInfo.readPermsOwnerOnly);
+ LOG.debug("Readable status for " + f + " set to " + ret);
+ ret =
+ f.setWritable(pInfo.writePermissions, pInfo.writePermsOwnerOnly)
+ && ret;
+ LOG.debug("Writable status for " + f + " set to " + ret);
+ ret =
+ f.setExecutable(pInfo.executablePermissions,
+ pInfo.executePermsOwnerOnly)
+ && ret;
+
+ LOG.debug("Executable status for " + f + " set to " + ret);
+ return ret;
+ }
+
+ /**
+ * Permissions rwxr_xr_x
+ */
+ public static final PermissionsInfo sevenFiveFive =
+ new PermissionsInfo(true, true, true, false, true, false);
+ /**
+ * Completely private permissions
+ */
+ public static final PermissionsInfo sevenZeroZero =
+ new PermissionsInfo(true, true, true, true, true, true);
+ }
+
+ // Data-structure for synchronizing localization of user directories.
+ private Map<String, AtomicBoolean> localizedUsers =
+ new HashMap<String, AtomicBoolean>();
+
+ /**
+ * Initialize the local directories for a particular user on this TT. This
+ * involves creation and setting permissions of the following directories
+ * <ul>
+ * <li>$mapred.local.dir/taskTracker/$user</li>
+ * <li>$mapred.local.dir/taskTracker/$user/jobcache</li>
+ * <li>$mapred.local.dir/taskTracker/$user/distcache</li>
+ * </ul>
+ *
+ * @param user
+ * @throws IOException
+ */
+ public void initializeUserDirs(String user)
+ throws IOException {
+
+ if (user == null) {
+ // This shouldn't happen in general
+ throw new IOException(
+ "User is null. Cannot initialized user-directories.");
+ }
+
+ AtomicBoolean localizedUser;
+ synchronized (localizedUsers) {
+ if (!localizedUsers.containsKey(user)) {
+ localizedUsers.put(user, new AtomicBoolean(false));
+ }
+ localizedUser = localizedUsers.get(user);
+ }
+
+ synchronized (localizedUser) {
+
+ if (localizedUser.get()) {
+ // User-directories are already localized for his user.
+ LOG.info("User-directories for the user " + user
+ + " are already initialized on this TT. Not doing anything.");
+ return;
+ }
+
+ LOG.info("Initializing user " + user + " on this TT.");
+
+ boolean userDirStatus = false;
+ boolean jobCacheDirStatus = false;
+ boolean distributedCacheDirStatus = false;
+
+ for (String localDir : localDirs) {
+
+ Path userDir = new Path(localDir, TaskTracker.getUserDir(user));
+
+ // Set up the user-directory.
+ if (fs.exists(userDir) || fs.mkdirs(userDir)) {
+
+ // Set permissions on the user-directory
+ PermissionsHandler.setPermissions(
+ new File(userDir.toUri().getPath()),
+ PermissionsHandler.sevenZeroZero);
+ userDirStatus = true;
+
+ // Set up the jobcache directory
+ File jobCacheDir =
+ new File(localDir, TaskTracker.getJobCacheSubdir(user));
+ if (jobCacheDir.exists() || jobCacheDir.mkdirs()) {
+ // Set permissions on the jobcache-directory
+ PermissionsHandler.setPermissions(jobCacheDir,
+ PermissionsHandler.sevenZeroZero);
+ jobCacheDirStatus = true;
+ } else {
+ LOG.warn("Unable to create job cache directory : "
+ + jobCacheDir.getPath());
+ }
+
+ // Set up the cache directory used for distributed cache files
+ File distributedCacheDir =
+ new File(localDir, TaskTracker.getDistributedCacheDir(user));
+ if (distributedCacheDir.exists() || distributedCacheDir.mkdirs()) {
+ // Set permissions on the distcache-directory
+ PermissionsHandler.setPermissions(distributedCacheDir,
+ PermissionsHandler.sevenZeroZero);
+ distributedCacheDirStatus = true;
+ } else {
+ LOG.warn("Unable to create distributed-cache directory : "
+ + distributedCacheDir.getPath());
+ }
+ } else {
+ LOG.warn("Unable to create the user directory : " + userDir);
+ }
+ }
+
+ if (!userDirStatus) {
+ throw new IOException("Not able to initialize user directories "
+ + "in any of the configured local directories for user " + user);
+ }
+ if (!jobCacheDirStatus) {
+ throw new IOException("Not able to initialize job-cache directories "
+ + "in any of the configured local directories for user " + user);
+ }
+ if (!distributedCacheDirStatus) {
+ throw new IOException(
+ "Not able to initialize distributed-cache directories "
+ + "in any of the configured local directories for user "
+ + user);
+ }
+
+ // Now, run the task-controller specific code to initialize the
+ // user-directories.
+ InitializationContext context = new InitializationContext();
+ context.user = user;
+ context.workDir = null;
+ taskController.initializeUser(context);
+
+ // Localization of the user is done
+ localizedUser.set(true);
+ }
+ }
+
+ /**
+ * Prepare the job directories for a given job. To be called by the job
+ * localization code, only if the job is not already localized.
+ *
+ * <br>
+ * Here, we set 700 permissions on the job directories created on all disks.
+ * This we do so as to avoid any misuse by other users till the time
+ * {@link TaskController#initializeJob(JobInitializationContext)} is run at a
+ * later time to set proper private permissions on the job directories. <br>
+ *
+ * @param user
+ * @param jobId
+ * @throws IOException
+ */
+ public void initializeJobDirs(String user, JobID jobId)
+ throws IOException {
+ boolean initJobDirStatus = false;
+ String jobDirPath = TaskTracker.getLocalJobDir(user, jobId.toString());
+ for (String localDir : localDirs) {
+ Path jobDir = new Path(localDir, jobDirPath);
+ if (fs.exists(jobDir)) {
+ // this will happen on a partial execution of localizeJob. Sometimes
+ // copying job.xml to the local disk succeeds but copying job.jar might
+ // throw out an exception. We should clean up and then try again.
+ fs.delete(jobDir, true);
+ }
+
+ boolean jobDirStatus = fs.mkdirs(jobDir);
+ if (!jobDirStatus) {
+ LOG.warn("Not able to create job directory " + jobDir.toString());
+ }
+
+ initJobDirStatus = initJobDirStatus || jobDirStatus;
+
+ // job-dir has to be private to the TT
+ Localizer.PermissionsHandler.setPermissions(new File(jobDir.toUri()
+ .getPath()), Localizer.PermissionsHandler.sevenZeroZero);
+ }
+
+ if (!initJobDirStatus) {
+ throw new IOException("Not able to initialize job directories "
+ + "in any of the configured local directories for job "
+ + jobId.toString());
+ }
+ }
+
+ /**
+ * Create taskDirs on all the disks. Otherwise, in some cases, like when
+ * LinuxTaskController is in use, child might wish to balance load across
+ * disks but cannot itself create attempt directory because of the fact that
+ * job directory is writable only by the TT.
+ *
+ * @param user
+ * @param jobId
+ * @param attemptId
+ * @param isCleanupAttempt
+ * @throws IOException
+ */
+ public void initializeAttemptDirs(String user, String jobId,
+ String attemptId, boolean isCleanupAttempt)
+ throws IOException {
+
+ boolean initStatus = false;
+ String attemptDirPath =
+ TaskTracker.getLocalTaskDir(user, jobId, attemptId, isCleanupAttempt);
+
+ for (String localDir : localDirs) {
+ Path localAttemptDir = new Path(localDir, attemptDirPath);
+
+ boolean attemptDirStatus = fs.mkdirs(localAttemptDir);
+ if (!attemptDirStatus) {
+ LOG.warn("localAttemptDir " + localAttemptDir.toString()
+ + " couldn't be created.");
+ }
+ initStatus = initStatus || attemptDirStatus;
+ }
+
+ if (!initStatus) {
+ throw new IOException("Not able to initialize attempt directories "
+ + "in any of the configured local directories for the attempt "
+ + attemptId.toString());
+ }
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java?rev=1077116&r1=1077115&r2=1077116&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/filecache/TestTrackerDistributedCacheManager.java Fri Mar 4 03:42:38 2011
@@ -24,33 +24,72 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.util.Random;
+import javax.security.auth.login.LoginException;
+
import junit.framework.TestCase;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.DefaultTaskController;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.TaskController;
+import org.apache.hadoop.mapred.TaskTracker;
+import org.apache.hadoop.mapred.TaskController.InitializationContext;
+import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.filecache.TaskDistributedCacheManager;
+import org.apache.hadoop.filecache.TrackerDistributedCacheManager;
+import org.apache.hadoop.security.UserGroupInformation;
public class TestTrackerDistributedCacheManager extends TestCase {
- private static final String TEST_LOCAL_DIR_PROP = "test.local.dir";
- private static String TEST_CACHE_BASE_DIR =
- new Path(System.getProperty("test.build.data","/tmp/cachebasedir"))
- .toString().replace(' ', '+');
- private static String TEST_ROOT_DIR =
- System.getProperty("test.build.data", "/tmp/distributedcache");
+
+ protected String TEST_ROOT_DIR =
+ new File(System.getProperty("test.build.data", "/tmp"),
+ TestTrackerDistributedCacheManager.class.getSimpleName())
+ .getAbsolutePath();
+
+ protected File ROOT_MAPRED_LOCAL_DIR;
+ private static String TEST_CACHE_BASE_DIR;
+ protected int numLocalDirs = 6;
+
private static final int TEST_FILE_SIZE = 4 * 1024; // 4K
private static final int LOCAL_CACHE_LIMIT = 5 * 1024; //5K
- private Configuration conf;
- private Path firstCacheFile;
- private Path secondCacheFile;
+ protected Configuration conf;
+ protected Path firstCacheFile;
+ protected Path secondCacheFile;
+
+ protected LocalDirAllocator localDirAllocator =
+ new LocalDirAllocator(JobConf.MAPRED_LOCAL_DIR_PROPERTY);
@Override
protected void setUp() throws IOException {
+
+ // Prepare the tests' root dir
+ File TEST_ROOT = new File(TEST_ROOT_DIR);
+ if (!TEST_ROOT.exists()) {
+ TEST_ROOT.mkdirs();
+ }
+
+ // Prepare the tests' mapred-local-dir
+ ROOT_MAPRED_LOCAL_DIR = new File(TEST_ROOT_DIR, "mapred/local");
+ ROOT_MAPRED_LOCAL_DIR.mkdirs();
+ String []localDirs = new String[numLocalDirs];
+ for (int i = 0; i < numLocalDirs; i++) {
+ localDirs[i] = new File(ROOT_MAPRED_LOCAL_DIR, "0_" + i).getPath();
+ }
+
+ TEST_CACHE_BASE_DIR =
+ new File(TEST_ROOT_DIR, "cachebasedir").getAbsolutePath();
+
conf = new Configuration();
conf.setLong("local.cache.size", LOCAL_CACHE_LIMIT);
- conf.set(TEST_LOCAL_DIR_PROP, TEST_ROOT_DIR);
+ conf.setStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY, localDirs);
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
+
+ // Create the temporary cache files to be used in the tests.
firstCacheFile = new Path(TEST_ROOT_DIR, "firstcachefile");
secondCacheFile = new Path(TEST_ROOT_DIR, "secondcachefile");
createTempFile(firstCacheFile);
@@ -59,29 +98,43 @@ public class TestTrackerDistributedCache
/**
* This is the typical flow for using the DistributedCache classes.
+ *
+ * @throws IOException
+ * @throws LoginException
*/
- public void testManagerFlow() throws IOException {
- TrackerDistributedCacheManager manager =
- new TrackerDistributedCacheManager(conf);
- LocalDirAllocator localDirAllocator =
- new LocalDirAllocator(TEST_LOCAL_DIR_PROP);
+ public void testManagerFlow() throws IOException, LoginException {
+ // ****** Imitate JobClient code
// Configures a task/job with both a regular file and a "classpath" file.
Configuration subConf = new Configuration(conf);
DistributedCache.addCacheFile(firstCacheFile.toUri(), subConf);
DistributedCache.addFileToClassPath(secondCacheFile, subConf);
TrackerDistributedCacheManager.determineTimestamps(subConf);
+ // ****** End of imitating JobClient code
Path jobFile = new Path(TEST_ROOT_DIR, "job.xml");
FileOutputStream os = new FileOutputStream(new File(jobFile.toString()));
subConf.writeXml(os);
os.close();
+ String userName = getJobOwnerName();
+
+ // ****** Imitate TaskRunner code.
+ TrackerDistributedCacheManager manager =
+ new TrackerDistributedCacheManager(conf);
TaskDistributedCacheManager handle =
manager.newTaskDistributedCacheManager(subConf);
assertNull(null, DistributedCache.getLocalCacheFiles(subConf));
- handle.setup(localDirAllocator,
- new File(new Path(TEST_ROOT_DIR, "workdir").toString()), "distcache");
+ File workDir = new File(new Path(TEST_ROOT_DIR, "workdir").toString());
+ handle.setup(localDirAllocator, workDir, TaskTracker
+ .getDistributedCacheDir(userName));
+
+ InitializationContext context = new InitializationContext();
+ context.user = userName;
+ context.workDir = workDir;
+ getTaskController().initializeDistributedCache(context);
+ // ****** End of imitating TaskRunner code
+
Path[] localCacheFiles = DistributedCache.getLocalCacheFiles(subConf);
assertNotNull(null, localCacheFiles);
assertEquals(2, localCacheFiles.length);
@@ -94,12 +147,39 @@ public class TestTrackerDistributedCache
assertEquals(1, handle.getClassPaths().size());
assertEquals(cachedSecondFile.toString(), handle.getClassPaths().get(0));
+ checkFilePermissions(localCacheFiles);
+
// Cleanup
handle.release();
manager.purgeCache();
assertFalse(pathToFile(cachedFirstFile).exists());
}
+ /**
+ * Check proper permissions on the cache files
+ *
+ * @param localCacheFiles
+ * @throws IOException
+ */
+ protected void checkFilePermissions(Path[] localCacheFiles)
+ throws IOException {
+ Path cachedFirstFile = localCacheFiles[0];
+ Path cachedSecondFile = localCacheFiles[1];
+ // Both the files should have executable permissions on them.
+ assertTrue("First cache file is not executable!", new File(cachedFirstFile
+ .toUri().getPath()).canExecute());
+ assertTrue("Second cache file is not executable!", new File(
+ cachedSecondFile.toUri().getPath()).canExecute());
+ }
+
+ protected TaskController getTaskController() {
+ return new DefaultTaskController();
+ }
+
+ protected String getJobOwnerName() throws LoginException {
+ UserGroupInformation ugi = UserGroupInformation.login(conf);
+ return ugi.getUserName();
+ }
/** test delete cache */
public void testDeleteCache() throws Exception {
@@ -122,7 +202,7 @@ public class TestTrackerDistributedCache
new Path(TEST_CACHE_BASE_DIR));
assertTrue("DistributedCache failed deleting old" +
" cache when the cache store is full.",
- dirStatuses.length > 1);
+ dirStatuses.length == 1);
}
public void testFileSystemOtherThanDefault() throws Exception {
@@ -152,15 +232,16 @@ public class TestTrackerDistributedCache
protected void tearDown() throws IOException {
new File(firstCacheFile.toString()).delete();
new File(secondCacheFile.toString()).delete();
+ FileUtil.fullyDelete(new File(TEST_ROOT_DIR));
}
- private void assertFileLengthEquals(Path a, Path b)
+ protected void assertFileLengthEquals(Path a, Path b)
throws FileNotFoundException {
assertEquals("File sizes mismatch.",
pathToFile(a).length(), pathToFile(b).length());
}
- private File pathToFile(Path p) {
+ protected File pathToFile(Path p) {
return new File(p.toString());
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestIsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestIsolationRunner.java?rev=1077116&r1=1077115&r2=1077116&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestIsolationRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestIsolationRunner.java Fri Mar 4 03:42:38 2011
@@ -22,6 +22,8 @@ import java.io.File;
import java.io.IOException;
import java.util.UUID;
+import javax.security.auth.login.LoginException;
+
import junit.framework.TestCase;
import org.apache.hadoop.fs.FileStatus;
@@ -32,6 +34,7 @@ import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.security.UserGroupInformation;
/**
* Re-runs a map task using the IsolationRunner.
@@ -99,15 +102,19 @@ public class TestIsolationRunner extends
}
private Path getAttemptJobXml(JobConf conf, JobID jobId, boolean isMap)
- throws IOException {
+ throws IOException, LoginException {
String taskid =
new TaskAttemptID(new TaskID(jobId, isMap, 0), 0).toString();
return new LocalDirAllocator("mapred.local.dir").getLocalPathToRead(
- TaskTracker.getTaskConfFile(jobId.toString(), taskid, false), conf);
+ TaskTracker.getTaskConfFile(UserGroupInformation.login(conf)
+ .getUserName(), jobId.toString(), taskid, false), conf);
}
- public void testIsolationRunOfMapTask() throws
- IOException, InterruptedException, ClassNotFoundException {
+ public void testIsolationRunOfMapTask()
+ throws IOException,
+ InterruptedException,
+ ClassNotFoundException,
+ LoginException {
MiniMRCluster mr = null;
try {
mr = new MiniMRCluster(1, "file:///", 4);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java?rev=1077116&r1=1077115&r2=1077116&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLocalizationWithLinuxTaskController.java Fri Mar 4 03:42:38 2011
@@ -22,16 +22,11 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
-import javax.security.auth.login.LoginException;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
-import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
-import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController;
-import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
/**
* Test to verify localization of a job and localization of a task on a
@@ -45,7 +40,6 @@ public class TestLocalizationWithLinuxTa
LogFactory.getLog(TestLocalizationWithLinuxTaskController.class);
private File configFile;
- private MyLinuxTaskController taskController;
private static String taskTrackerSpecialGroup;
@@ -66,10 +60,24 @@ public class TestLocalizationWithLinuxTa
ClusterWithLinuxTaskController.createTaskControllerConf(path,
localDirs);
String execPath = path + "/task-controller";
- taskController.setTaskControllerExe(execPath);
+ ((MyLinuxTaskController) taskController).setTaskControllerExe(execPath);
taskTrackerSpecialGroup = getFilePermissionAttrs(execPath)[2];
taskController.setConf(trackerFConf);
taskController.setup();
+
+ tracker.setLocalizer(new Localizer(tracker.localFs, localDirs,
+ taskController));
+
+ // Rewrite conf so as to reflect task's correct user name.
+ String ugi =
+ System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI);
+ JobConf jobConf = new JobConf(task.getConf());
+ jobConf.setUser(ugi.split(",")[0]);
+ File jobConfFile = uploadJobConf(jobConf);
+ // Create the task again to change the job-user
+ task =
+ new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, 1);
+ task.setConf(jobConf);
}
@Override
@@ -91,75 +99,114 @@ public class TestLocalizationWithLinuxTa
}
/**
+ * Test the localization of a user on the TT when {@link LinuxTaskController}
+ * is in use.
+ */
+ @Override
+ public void testUserLocalization()
+ throws IOException {
+
+ if (!ClusterWithLinuxTaskController.shouldRun()) {
+ return;
+ }
+
+ super.testJobLocalization();
+ }
+
+ @Override
+ protected void checkUserLocalization()
+ throws IOException {
+ // Check the directory structure and permissions
+ for (String dir : localDirs) {
+
+ File localDir = new File(dir);
+ assertTrue("mapred.local.dir " + localDir + " isn'task created!",
+ localDir.exists());
+
+ File taskTrackerSubDir = new File(localDir, TaskTracker.SUBDIR);
+ assertTrue("taskTracker sub-dir in the local-dir " + localDir
+ + "is not created!", taskTrackerSubDir.exists());
+
+ File userDir = new File(taskTrackerSubDir, task.getUser());
+ assertTrue("user-dir in taskTrackerSubdir " + taskTrackerSubDir
+ + "is not created!", userDir.exists());
+ checkFilePermissions(userDir.getAbsolutePath(), "dr-xrws---", task
+ .getUser(), taskTrackerSpecialGroup);
+
+ File jobCache = new File(userDir, TaskTracker.JOBCACHE);
+ assertTrue("jobcache in the userDir " + userDir + " isn't created!",
+ jobCache.exists());
+ checkFilePermissions(jobCache.getAbsolutePath(), "dr-xrws---", task
+ .getUser(), taskTrackerSpecialGroup);
+
+ // Verify the distributed cache dir.
+ File distributedCacheDir =
+ new File(localDir, TaskTracker
+ .getDistributedCacheDir(task.getUser()));
+ assertTrue("distributed cache dir " + distributedCacheDir
+ + " doesn't exists!", distributedCacheDir.exists());
+ checkFilePermissions(distributedCacheDir.getAbsolutePath(),
+ "dr-xrws---", task.getUser(), taskTrackerSpecialGroup);
+ }
+ }
+
+ /**
* Test job localization with {@link LinuxTaskController}. Also check the
* permissions and file ownership of the job related files.
*/
@Override
public void testJobLocalization()
- throws IOException,
- LoginException {
+ throws IOException {
if (!ClusterWithLinuxTaskController.shouldRun()) {
return;
}
- // Do job localization
- JobConf localizedJobConf = tracker.localizeJobFiles(task);
-
- String ugi =
- System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI);
- localizedJobConf.setUser(ugi.split(",")[0]);
-
- // Now initialize the job via task-controller so as to set
- // ownership/permissions of jars, job-work-dir
- JobInitializationContext context = new JobInitializationContext();
- context.jobid = jobId;
- context.user = localizedJobConf.getUser();
- context.workDir =
- new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
-
- // /////////// The method being tested
- taskController.initializeJob(context);
- // ///////////
+ super.testJobLocalization();
+ }
+ @Override
+ protected void checkJobLocalization()
+ throws IOException {
for (String localDir : trackerFConf.getStrings("mapred.local.dir")) {
File jobDir =
- new File(localDir, TaskTracker.getLocalJobDir(jobId.toString()));
+ new File(localDir, TaskTracker.getLocalJobDir(task.getUser(), jobId
+ .toString()));
// check the private permissions on the job directory
- checkFilePermissions(jobDir.getAbsolutePath(), "dr-xrws---",
- localizedJobConf.getUser(), taskTrackerSpecialGroup);
+ checkFilePermissions(jobDir.getAbsolutePath(), "dr-xrws---", task
+ .getUser(), taskTrackerSpecialGroup);
}
// check the private permissions of various directories
List<Path> dirs = new ArrayList<Path>();
Path jarsDir =
- lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarsDir(jobId
- .toString()), trackerFConf);
+ lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarsDir(task.getUser(),
+ jobId.toString()), trackerFConf);
dirs.add(jarsDir);
dirs.add(new Path(jarsDir, "lib"));
for (Path dir : dirs) {
checkFilePermissions(dir.toUri().getPath(), "dr-xrws---",
- localizedJobConf.getUser(), taskTrackerSpecialGroup);
+ task.getUser(), taskTrackerSpecialGroup);
}
// job-work dir needs user writable permissions
Path jobWorkDir =
- lDirAlloc.getLocalPathToRead(TaskTracker.getJobWorkDir(jobId
- .toString()), trackerFConf);
- checkFilePermissions(jobWorkDir.toUri().getPath(), "drwxrws---",
- localizedJobConf.getUser(), taskTrackerSpecialGroup);
+ lDirAlloc.getLocalPathToRead(TaskTracker.getJobWorkDir(task.getUser(),
+ jobId.toString()), trackerFConf);
+ checkFilePermissions(jobWorkDir.toUri().getPath(), "drwxrws---", task
+ .getUser(), taskTrackerSpecialGroup);
// check the private permissions of various files
List<Path> files = new ArrayList<Path>();
- files.add(lDirAlloc.getLocalPathToRead(TaskTracker
- .getLocalJobConfFile(jobId.toString()), trackerFConf));
- files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(jobId
- .toString()), trackerFConf));
+ files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getLocalJobConfFile(
+ task.getUser(), jobId.toString()), trackerFConf));
+ files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(task
+ .getUser(), jobId.toString()), trackerFConf));
files.add(new Path(jarsDir, "lib" + Path.SEPARATOR + "lib1.jar"));
files.add(new Path(jarsDir, "lib" + Path.SEPARATOR + "lib2.jar"));
for (Path file : files) {
- checkFilePermissions(file.toUri().getPath(), "-r-xrwx---",
- localizedJobConf.getUser(), taskTrackerSpecialGroup);
+ checkFilePermissions(file.toUri().getPath(), "-r-xrwx---", task
+ .getUser(), taskTrackerSpecialGroup);
}
}
@@ -169,73 +216,50 @@ public class TestLocalizationWithLinuxTa
*/
@Override
public void testTaskLocalization()
- throws IOException,
- LoginException {
+ throws IOException {
if (!ClusterWithLinuxTaskController.shouldRun()) {
return;
}
- JobConf localizedJobConf = tracker.localizeJobFiles(task);
- String ugi =
- System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI);
- localizedJobConf.setUser(ugi.split(",")[0]);
-
- // Now initialize the job via task-controller so as to set
- // ownership/permissions of jars, job-work-dir
- JobInitializationContext jobContext = new JobInitializationContext();
- jobContext.jobid = jobId;
- jobContext.user = localizedJobConf.getUser();
- jobContext.workDir =
- new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
- taskController.initializeJob(jobContext);
-
- TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
- tip.setJobConf(localizedJobConf);
-
- // localize the task.
- tip.localizeTask(task);
- TaskRunner runner = task.createRunner(tracker, tip);
- runner.setupChildTaskConfiguration(lDirAlloc);
- Path workDir =
- lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(task
- .getJobID().toString(), task.getTaskID().toString(), task
- .isTaskCleanupTask()), trackerFConf);
- TaskRunner.createChildTmpDir(new File(workDir.toUri().getPath()),
- localizedJobConf);
- File[] logFiles = TaskRunner.prepareLogFiles(task.getTaskID());
-
- // Initialize task
- TaskControllerContext taskContext =
- new TaskController.TaskControllerContext();
- taskContext.env =
- new JvmEnv(null, null, null, null, -1, new File(localizedJobConf
- .get(TaskTracker.JOB_LOCAL_DIR)), null, localizedJobConf);
- taskContext.task = task;
- // /////////// The method being tested
- taskController.initializeTask(taskContext);
- // ///////////
+ super.testTaskLocalization();
+ }
+ @Override
+ protected void checkTaskLocalization()
+ throws IOException {
// check the private permissions of various directories
List<Path> dirs = new ArrayList<Path>();
- dirs.add(lDirAlloc.getLocalPathToRead(TaskTracker.getLocalTaskDir(jobId
- .toString(), taskId.toString()), trackerFConf));
- dirs.add(workDir);
- dirs.add(new Path(workDir, "tmp"));
- dirs.add(new Path(logFiles[1].getParentFile().getAbsolutePath()));
+ dirs.add(lDirAlloc.getLocalPathToRead(TaskTracker.getLocalTaskDir(task
+ .getUser(), jobId.toString(), taskId.toString()), trackerFConf));
+ dirs.add(attemptWorkDir);
+ dirs.add(new Path(attemptWorkDir, "tmp"));
+ dirs.add(new Path(attemptLogFiles[1].getParentFile().getAbsolutePath()));
for (Path dir : dirs) {
checkFilePermissions(dir.toUri().getPath(), "drwxrws---",
- localizedJobConf.getUser(), taskTrackerSpecialGroup);
+ task.getUser(), taskTrackerSpecialGroup);
}
// check the private permissions of various files
List<Path> files = new ArrayList<Path>();
files.add(lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
- .getJobID().toString(), task.getTaskID().toString(), task
- .isTaskCleanupTask()), trackerFConf));
+ .getUser(), task.getJobID().toString(), task.getTaskID().toString(),
+ task.isTaskCleanupTask()), trackerFConf));
for (Path file : files) {
- checkFilePermissions(file.toUri().getPath(), "-rwxrwx---",
- localizedJobConf.getUser(), taskTrackerSpecialGroup);
+ checkFilePermissions(file.toUri().getPath(), "-rwxrwx---", task
+ .getUser(), taskTrackerSpecialGroup);
+ }
+ }
+
+ /**
+ * Test cleanup of task files with {@link LinuxTaskController}.
+ */
+ @Override
+ public void testTaskCleanup()
+ throws IOException {
+ if (!ClusterWithLinuxTaskController.shouldRun()) {
+ return;
}
+ super.testTaskCleanup();
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java?rev=1077116&r1=1077115&r2=1077116&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFS.java Fri Mar 4 03:42:38 2011
@@ -27,6 +27,8 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
+import javax.security.auth.login.LoginException;
+
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
@@ -39,6 +41,8 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.util.StringUtils;
/**
* A JUnit test to test Mini Map-Reduce Cluster with Mini-DFS.
@@ -118,55 +122,108 @@ public class TestMiniMRWithDFS extends T
}
return result.toString();
}
-
+
/**
* Make sure that there are exactly the directories that we expect to find.
+ *
+ * <br/>
+ * <br/>
+ *
+ * For e.g., if we want to check the existence of *only* the directories for
+ * user1's tasks job1-attempt1, job1-attempt2, job2-attempt1, we pass user1 as
+ * user, {job1, job1, job2, job3} as jobIds and {attempt1, attempt2, attempt1,
+ * attempt3} as taskDirs.
+ *
* @param mr the map-reduce cluster
+ * @param user the name of the job-owner
+ * @param jobIds the list of jobs
* @param taskDirs the task ids that should be present
*/
- static void checkTaskDirectories(MiniMRCluster mr,
- String[] jobIds,
- String[] taskDirs) {
+ static void checkTaskDirectories(MiniMRCluster mr, String user,
+ String[] jobIds, String[] taskDirs) {
+
mr.waitUntilIdle();
int trackers = mr.getNumTaskTrackers();
- List<String> neededDirs = new ArrayList<String>(Arrays.asList(taskDirs));
- boolean[] found = new boolean[taskDirs.length];
- for(int i=0; i < trackers; ++i) {
- int numNotDel = 0;
+
+ List<String> observedJobDirs = new ArrayList<String>();
+ List<String> observedFilesInsideJobDir = new ArrayList<String>();
+
+ for (int i = 0; i < trackers; ++i) {
+
+ // Verify that mapred-local-dir and it's direct contents are valid
File localDir = new File(mr.getTaskTrackerLocalDir(i));
- LOG.debug("Tracker directory: " + localDir);
- File trackerDir = new File(localDir, TaskTracker.SUBDIR);
- assertTrue("local dir " + localDir + " does not exist.",
- localDir.isDirectory());
- assertTrue("task tracker dir " + trackerDir + " does not exist.",
- trackerDir.isDirectory());
- String contents[] = localDir.list();
- String trackerContents[] = trackerDir.list();
- for(int j=0; j < contents.length; ++j) {
- System.out.println("Local " + localDir + ": " + contents[j]);
- }
- for(int j=0; j < trackerContents.length; ++j) {
- System.out.println("Local jobcache " + trackerDir + ": " + trackerContents[j]);
- }
- for(int fileIdx = 0; fileIdx < contents.length; ++fileIdx) {
- String name = contents[fileIdx];
- if (!(TaskTracker.SUBDIR.equals(contents[fileIdx]))) {
- LOG.debug("Looking at " + name);
- assertTrue("Spurious directory " + name + " found in " +
- localDir, false);
+ assertTrue("Local dir " + localDir + " does not exist.", localDir
+ .isDirectory());
+ LOG.info("Verifying contents of mapred.local.dir "
+ + localDir.getAbsolutePath());
+
+ // Verify contents(user-dir) of tracker-sub-dir
+ File trackerSubDir = new File(localDir, TaskTracker.SUBDIR);
+ if (trackerSubDir.isDirectory()) {
+
+ // Verify contents of user-dir and populate the job-dirs/attempt-dirs
+ // lists
+ File userDir = new File(trackerSubDir, user);
+ if (userDir.isDirectory()) {
+ LOG.info("Verifying contents of user-dir "
+ + userDir.getAbsolutePath());
+ verifyContents(new String[] { TaskTracker.JOBCACHE,
+ TaskTracker.DISTCACHEDIR }, userDir.list());
+
+ File jobCacheDir =
+ new File(localDir, TaskTracker.getJobCacheSubdir(user));
+ String[] jobDirs = jobCacheDir.list();
+ observedJobDirs.addAll(Arrays.asList(jobDirs));
+
+ for (String jobDir : jobDirs) {
+ String[] attemptDirs = new File(jobCacheDir, jobDir).list();
+ observedFilesInsideJobDir.addAll(Arrays.asList(attemptDirs));
+ }
}
}
- for (int idx = 0; idx < neededDirs.size(); ++idx) {
- String name = neededDirs.get(idx);
- if (new File(new File(new File(trackerDir, TaskTracker.JOBCACHE),
- jobIds[idx]), name).isDirectory()) {
- found[idx] = true;
- numNotDel++;
- }
+ }
+
+ // Now verify that only expected job-dirs and attempt-dirs are present.
+ LOG.info("Verifying the list of job directories");
+ verifyContents(jobIds, observedJobDirs.toArray(new String[observedJobDirs
+ .size()]));
+ LOG.info("Verifying the list of task directories");
+ // All taskDirs should be present in the observed list. Other files like
+ // job.xml etc may be present too, we are not checking them here.
+ for (int j = 0; j < taskDirs.length; j++) {
+ assertTrue(
+ "Expected task-directory " + taskDirs[j] + " is not present!",
+ observedFilesInsideJobDir.contains(taskDirs[j]));
+ }
+ }
+
+ /**
+ * Check the list of expectedFiles against the list of observedFiles and make
+ * sure they both are the same. Duplicates can be present in either of the
+ * lists and all duplicate entries are treated as a single entity.
+ *
+ * @param expectedFiles
+ * @param observedFiles
+ */
+ private static void verifyContents(String[] expectedFiles,
+ String[] observedFiles) {
+ boolean[] foundExpectedFiles = new boolean[expectedFiles.length];
+ boolean[] validObservedFiles = new boolean[observedFiles.length];
+ for (int j = 0; j < observedFiles.length; ++j) {
+ for (int k = 0; k < expectedFiles.length; ++k) {
+ if (expectedFiles[k].equals(observedFiles[j])) {
+ foundExpectedFiles[k] = true;
+ validObservedFiles[j] = true;
+ }
}
}
- for(int i=0; i< found.length; i++) {
- assertTrue("Directory " + taskDirs[i] + " not found", found[i]);
+ for (int j = 0; j < foundExpectedFiles.length; j++) {
+ assertTrue("Expected file " + expectedFiles[j] + " not found",
+ foundExpectedFiles[j]);
+ }
+ for (int j = 0; j < validObservedFiles.length; j++) {
+ assertTrue("Unexpected file " + observedFiles[j] + " found",
+ validObservedFiles[j]);
}
}
@@ -176,7 +233,16 @@ public class TestMiniMRWithDFS extends T
NUM_MAPS, NUM_SAMPLES, jobconf).doubleValue();
double error = Math.abs(Math.PI - estimate);
assertTrue("Error in PI estimation "+error+" exceeds 0.01", (error < 0.01));
- checkTaskDirectories(mr, new String[]{}, new String[]{});
+ String userName = jobconf.getUser();
+ if (userName == null) {
+ try {
+ userName = UnixUserGroupInformation.login(jobconf).getUserName();
+ } catch (LoginException le) {
+ throw new IOException("Cannot get the login username : "
+ + StringUtils.stringifyException(le));
+ }
+ }
+ checkTaskDirectories(mr, userName, new String[] {}, new String[] {});
}
public static void runWordCount(MiniMRCluster mr, JobConf jobConf)
@@ -195,9 +261,19 @@ public class TestMiniMRWithDFS extends T
assertEquals("The\t1\nbrown\t1\nfox\t2\nhas\t1\nmany\t1\n" +
"quick\t1\nred\t1\nsilly\t1\nsox\t1\n", result.output);
JobID jobid = result.job.getID();
- TaskAttemptID taskid = new TaskAttemptID(new TaskID(jobid, true, 1),0);
- checkTaskDirectories(mr, new String[]{jobid.toString()},
- new String[]{taskid.toString()});
+ TaskAttemptID taskid = new TaskAttemptID(
+ new TaskID(jobid, true, 1),0);
+ String userName = jobConf.getUser();
+ if (userName == null) {
+ try {
+ userName = UnixUserGroupInformation.login(jobConf).getUserName();
+ } catch (LoginException le) {
+ throw new IOException("Cannot get the login username : "
+ + StringUtils.stringifyException(le));
+ }
+ }
+ checkTaskDirectories(mr, userName, new String[] { jobid.toString() },
+ new String[] { taskid.toString() });
// test with maps=0
jobConf = mr.createJobConf();
input = "owen is oom";
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java?rev=1077116&r1=1077115&r2=1077116&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestMiniMRWithDFSWithDistinctUsers.java Fri Mar 4 03:42:38 2011
@@ -128,6 +128,10 @@ public class TestMiniMRWithDFSWithDistin
Path outDir = new Path("/testing/distinct/output");
TestMiniMRClasspath.configureWordCount(fs, jobTrackerName, job1,
input, 2, 1, inDir, outDir);
+
+ job1 = createJobConf(job1, PI_UGI);
+ runJobAsUser(job1, PI_UGI);
+
JobConf job2 = mr.createJobConf();
Path inDir2 = new Path("/testing/distinct/input2");
Path outDir2 = new Path("/testing/distinct/output2");
@@ -135,8 +139,6 @@ public class TestMiniMRWithDFSWithDistin
input, 2, 1, inDir2, outDir2);
job2 = createJobConf(job2, WC_UGI);
runJobAsUser(job2, WC_UGI);
- JobConf wc = createJobConf(mr, WC_UGI);
- TestMiniMRWithDFS.runWordCount(mr, wc);
} finally {
if (dfs != null) { dfs.shutdown(); }
if (mr != null) { mr.shutdown();}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueManager.java?rev=1077116&r1=1077115&r2=1077116&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestQueueManager.java Fri Mar 4 03:42:38 2011
@@ -482,8 +482,8 @@ public class TestQueueManager extends Te
try {
conf.set("mapred.job.tracker", "localhost:"
+ miniMRCluster.getJobTrackerPort());
- JobClient jc = new JobClient(conf);
- jc.getJob(rjob.getJobID()).killJob();
+ JobClient client = new JobClient(miniMRCluster.createJobConf());
+ client.getJob(rjob.getID()).killJob();
if (!shouldSucceed) {
fail("should fail kill operation");
}
@@ -524,8 +524,8 @@ public class TestQueueManager extends Te
try {
conf.set("mapred.job.tracker", "localhost:"
+ miniMRCluster.getJobTrackerPort());
- JobClient jc = new JobClient(conf);
- jc.getJob(rjob.getJobID()).setJobPriority("VERY_LOW");
+ JobClient client = new JobClient(miniMRCluster.createJobConf());
+ client.getJob(rjob.getID()).setJobPriority("VERY_LOW");
if (!shouldSucceed) {
fail("changing priority should fail.");
}
@@ -605,6 +605,8 @@ public class TestQueueManager extends Te
if (shouldComplete) {
rJob = JobClient.runJob(jc);
} else {
+ // Job should be submitted as 'userInfo'. So both the client as well as
+ // the configuration should point to the same UGI.
rJob = new JobClient(jc).submitJob(jc);
}
return rJob;
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java?rev=1077116&r1=1077115&r2=1077116&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTaskTrackerLocalization.java Fri Mar 4 03:42:38 2011
@@ -18,21 +18,27 @@
package org.apache.hadoop.mapred;
import java.io.File;
+import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.jar.JarOutputStream;
import java.util.zip.ZipEntry;
-import javax.security.auth.login.LoginException;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.LocalDirAllocator;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.server.tasktracker.Localizer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.mapred.JvmManager.JvmEnv;
+import org.apache.hadoop.mapred.TaskController.JobInitializationContext;
+import org.apache.hadoop.mapred.TaskController.TaskControllerContext;
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import junit.framework.TestCase;
@@ -53,20 +59,53 @@ public class TestTaskTrackerLocalization
LogFactory.getLog(TestTaskTrackerLocalization.class);
protected TaskTracker tracker;
+ protected UserGroupInformation taskTrackerUGI;
+ protected TaskController taskController;
protected JobConf trackerFConf;
+ private JobConf localizedJobConf;
protected JobID jobId;
protected TaskAttemptID taskId;
protected Task task;
protected String[] localDirs;
protected static LocalDirAllocator lDirAlloc =
new LocalDirAllocator("mapred.local.dir");
+ protected Path attemptWorkDir;
+ protected File[] attemptLogFiles;
+ protected JobConf localizedTaskConf;
+
+ class InlineCleanupQueue extends CleanupQueue {
+ List<Path> stalePaths = new ArrayList<Path>();
+
+ public InlineCleanupQueue() {
+ // do nothing
+ }
+
+ @Override
+ public void addToQueue(FileSystem fs, Path... paths) {
+ // delete in-line
+ for (Path p : paths) {
+ try {
+ LOG.info("Trying to delete the path " + p);
+ if (!fs.delete(p, true)) {
+ LOG.warn("Stale path " + p.toUri().getPath());
+ stalePaths.add(p);
+ }
+ } catch (IOException e) {
+ LOG.warn("Caught exception while deleting path "
+ + p.toUri().getPath());
+ LOG.info(StringUtils.stringifyException(e));
+ stalePaths.add(p);
+ }
+ }
+ }
+ }
@Override
protected void setUp()
throws Exception {
TEST_ROOT_DIR =
- new File(System.getProperty("test.build.data", "/tmp"),
- "testTaskTrackerLocalization");
+ new File(System.getProperty("test.build.data", "/tmp"), getClass()
+ .getSimpleName());
if (!TEST_ROOT_DIR.exists()) {
TEST_ROOT_DIR.mkdirs();
}
@@ -86,30 +125,27 @@ public class TestTaskTrackerLocalization
}
trackerFConf.setStrings("mapred.local.dir", localDirs);
- // Create the job jar file
- File jobJarFile = new File(TEST_ROOT_DIR, "jobjar-on-dfs.jar");
- JarOutputStream jstream =
- new JarOutputStream(new FileOutputStream(jobJarFile));
- ZipEntry ze = new ZipEntry("lib/lib1.jar");
- jstream.putNextEntry(ze);
- jstream.closeEntry();
- ze = new ZipEntry("lib/lib2.jar");
- jstream.putNextEntry(ze);
- jstream.closeEntry();
- jstream.finish();
- jstream.close();
- trackerFConf.setJar(jobJarFile.toURI().toString());
+ // Create the job configuration file. Same as trackerConf in this test.
+ JobConf jobConf = trackerFConf;
- // Create the job configuration file
- File jobConfFile = new File(TEST_ROOT_DIR, "jobconf-on-dfs.xml");
- FileOutputStream out = new FileOutputStream(jobConfFile);
- trackerFConf.writeXml(out);
- out.close();
+ // JobClient sets the job credentials.
+ new JobClient().setUGIAndUserGroupNames(jobConf);
+
+ // JobClient uploads the job jar to the file system and sets it in the
+ // jobConf.
+ uploadJobJar(jobConf);
+
+ // JobClient uploads the jobConf to the file system.
+ File jobConfFile = uploadJobConf(jobConf);
// Set up the TaskTracker
tracker = new TaskTracker();
tracker.setConf(trackerFConf);
- tracker.systemFS = FileSystem.getLocal(trackerFConf); // for test case
+
+ // for test case system FS is the local FS
+ tracker.localFs = tracker.systemFS = FileSystem.getLocal(trackerFConf);
+
+ taskTrackerUGI = UserGroupInformation.login(trackerFConf);
// Set up the task to be localized
String jtIdentifier = "200907202331";
@@ -118,12 +154,53 @@ public class TestTaskTrackerLocalization
new TaskAttemptID(jtIdentifier, jobId.getId(), true, 1, 0);
task =
new MapTask(jobConfFile.toURI().toString(), taskId, 1, null, 1);
+ task.setConf(jobConf); // Set conf. Set user name in particular.
- TaskController taskController = new DefaultTaskController();
+ taskController = new DefaultTaskController();
taskController.setConf(trackerFConf);
taskController.setup();
+ tracker.setLocalizer(new Localizer(tracker.localFs, localDirs,
+ taskController));
}
+ /**
+ * @param jobConf
+ * @throws IOException
+ * @throws FileNotFoundException
+ */
+ private void uploadJobJar(JobConf jobConf)
+ throws IOException,
+ FileNotFoundException {
+ File jobJarFile = new File(TEST_ROOT_DIR, "jobjar-on-dfs.jar");
+ JarOutputStream jstream =
+ new JarOutputStream(new FileOutputStream(jobJarFile));
+ ZipEntry ze = new ZipEntry("lib/lib1.jar");
+ jstream.putNextEntry(ze);
+ jstream.closeEntry();
+ ze = new ZipEntry("lib/lib2.jar");
+ jstream.putNextEntry(ze);
+ jstream.closeEntry();
+ jstream.finish();
+ jstream.close();
+ jobConf.setJar(jobJarFile.toURI().toString());
+ }
+
+ /**
+ * @param jobConf
+ * @return
+ * @throws FileNotFoundException
+ * @throws IOException
+ */
+ protected File uploadJobConf(JobConf jobConf)
+ throws FileNotFoundException,
+ IOException {
+ File jobConfFile = new File(TEST_ROOT_DIR, "jobconf-on-dfs.xml");
+ FileOutputStream out = new FileOutputStream(jobConfFile);
+ jobConf.writeXml(out);
+ out.close();
+ return jobConfFile;
+ }
+
@Override
protected void tearDown()
throws Exception {
@@ -145,71 +222,71 @@ public class TestTaskTrackerLocalization
assertTrue("Path " + path + " has the permissions " + attrs[0]
+ " instead of the expected " + expectedPermissions, attrs[0]
.equals(expectedPermissions));
- assertTrue("Path " + path + " is not user owned not by "
- + expectedOwnerUser + " but by " + attrs[1], attrs[1]
- .equals(expectedOwnerUser));
- assertTrue("Path " + path + " is not group owned not by "
- + expectedOwnerGroup + " but by " + attrs[2], attrs[2]
- .equals(expectedOwnerGroup));
+ assertTrue("Path " + path + " is user owned not by " + expectedOwnerUser
+ + " but by " + attrs[1], attrs[1].equals(expectedOwnerUser));
+ assertTrue("Path " + path + " is group owned not by " + expectedOwnerGroup
+ + " but by " + attrs[2], attrs[2].equals(expectedOwnerGroup));
}
/**
* Verify the task-controller's setup functionality
*
* @throws IOException
- * @throws LoginException
*/
public void testTaskControllerSetup()
- throws IOException,
- LoginException {
+ throws IOException {
// Task-controller is already set up in the test's setup method. Now verify.
- UserGroupInformation ugi = UserGroupInformation.login(new JobConf());
for (String localDir : localDirs) {
// Verify the local-dir itself.
File lDir = new File(localDir);
assertTrue("localDir " + lDir + " doesn't exists!", lDir.exists());
- checkFilePermissions(lDir.getAbsolutePath(), "drwxr-xr-x", ugi
- .getUserName(), ugi.getGroupNames()[0]);
-
- // Verify the distributed cache dir.
- File distributedCacheDir =
- new File(localDir, TaskTracker.getDistributedCacheDir());
- assertTrue("distributed cache dir " + distributedCacheDir
- + " doesn't exists!", distributedCacheDir.exists());
- checkFilePermissions(distributedCacheDir.getAbsolutePath(),
- "drwxr-xr-x", ugi.getUserName(), ugi.getGroupNames()[0]);
-
- // Verify the job cache dir.
- File jobCacheDir = new File(localDir, TaskTracker.getJobCacheSubdir());
- assertTrue("jobCacheDir " + jobCacheDir + " doesn't exists!",
- jobCacheDir.exists());
- checkFilePermissions(jobCacheDir.getAbsolutePath(), "drwxr-xr-x", ugi
- .getUserName(), ugi.getGroupNames()[0]);
+ checkFilePermissions(lDir.getAbsolutePath(), "drwxr-xr-x", task
+ .getUser(), taskTrackerUGI.getGroupNames()[0]);
}
// Verify the pemissions on the userlogs dir
File taskLog = TaskLog.getUserLogDir();
- checkFilePermissions(taskLog.getAbsolutePath(), "drwxr-xr-x", ugi
- .getUserName(), ugi.getGroupNames()[0]);
+ checkFilePermissions(taskLog.getAbsolutePath(), "drwxr-xr-x", task
+ .getUser(), taskTrackerUGI.getGroupNames()[0]);
}
/**
- * Test job localization on a TT. Tests localization of job.xml, job.jar and
- * corresponding setting of configuration.
+ * Test the localization of a user on the TT.
*
* @throws IOException
- * @throws LoginException
*/
- public void testJobLocalization()
- throws IOException,
- LoginException {
+ public void testUserLocalization()
+ throws IOException {
// /////////// The main method being tested
- JobConf localizedJobConf = tracker.localizeJobFiles(task);
+ tracker.getLocalizer().initializeUserDirs(task.getUser());
// ///////////
- // Check the directory structure
+ // Check the directory structure and permissions
+ checkUserLocalization();
+
+ // For the sake of testing re-entrancy of initializeUserDirs(), we remove
+ // the user directories now and make sure that further calls of the method
+ // don't create directories any more.
+ for (String dir : localDirs) {
+ File userDir = new File(dir, TaskTracker.getUserDir(task.getUser()));
+ FileUtil.fullyDelete(userDir);
+ }
+
+ // Now call the method again.
+ tracker.getLocalizer().initializeUserDirs(task.getUser());
+
+ // Files should not be created now and so shouldn't be there anymore.
+ for (String dir : localDirs) {
+ File userDir = new File(dir, TaskTracker.getUserDir(task.getUser()));
+ assertFalse("Unexpectedly, user-dir " + userDir.getAbsolutePath()
+ + " exists!", userDir.exists());
+ }
+ }
+
+ protected void checkUserLocalization()
+ throws IOException {
for (String dir : localDirs) {
File localDir = new File(dir);
@@ -220,31 +297,87 @@ public class TestTaskTrackerLocalization
assertTrue("taskTracker sub-dir in the local-dir " + localDir
+ "is not created!", taskTrackerSubDir.exists());
- File jobCache = new File(taskTrackerSubDir, TaskTracker.JOBCACHE);
- assertTrue("jobcache in the taskTrackerSubdir " + taskTrackerSubDir
- + " isn'task created!", jobCache.exists());
+ File userDir = new File(taskTrackerSubDir, task.getUser());
+ assertTrue("user-dir in taskTrackerSubdir " + taskTrackerSubDir
+ + "is not created!", userDir.exists());
+ checkFilePermissions(userDir.getAbsolutePath(), "drwx------", task
+ .getUser(), taskTrackerUGI.getGroupNames()[0]);
+
+ File jobCache = new File(userDir, TaskTracker.JOBCACHE);
+ assertTrue("jobcache in the userDir " + userDir + " isn't created!",
+ jobCache.exists());
+ checkFilePermissions(jobCache.getAbsolutePath(), "drwx------", task
+ .getUser(), taskTrackerUGI.getGroupNames()[0]);
+
+ // Verify the distributed cache dir.
+ File distributedCacheDir =
+ new File(localDir, TaskTracker
+ .getDistributedCacheDir(task.getUser()));
+ assertTrue("distributed cache dir " + distributedCacheDir
+ + " doesn't exists!", distributedCacheDir.exists());
+ checkFilePermissions(distributedCacheDir.getAbsolutePath(),
+ "drwx------", task.getUser(), taskTrackerUGI.getGroupNames()[0]);
+ }
+ }
+
+ /**
+ * Test job localization on a TT. Tests localization of job.xml, job.jar and
+ * corresponding setting of configuration. Also test
+ * {@link TaskController#initializeJob(JobInitializationContext)}
+ *
+ * @throws IOException
+ */
+ public void testJobLocalization()
+ throws IOException {
+
+ tracker.getLocalizer().initializeUserDirs(task.getUser());
+
+ // /////////// The main method being tested
+ localizedJobConf = tracker.localizeJobFiles(task);
+ // ///////////
+
+ // Now initialize the job via task-controller so as to set
+ // ownership/permissions of jars, job-work-dir
+ JobInitializationContext context = new JobInitializationContext();
+ context.jobid = jobId;
+ context.user = task.getUser();
+ context.workDir =
+ new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
+
+ // /////////// The method being tested
+ taskController.initializeJob(context);
+ // ///////////
+
+ checkJobLocalization();
+ }
+
+ protected void checkJobLocalization()
+ throws IOException {
+ // Check the directory structure
+ for (String dir : localDirs) {
+
+ File localDir = new File(dir);
+ File taskTrackerSubDir = new File(localDir, TaskTracker.SUBDIR);
+ File userDir = new File(taskTrackerSubDir, task.getUser());
+ File jobCache = new File(userDir, TaskTracker.JOBCACHE);
File jobDir = new File(jobCache, jobId.toString());
- assertTrue("job-dir in " + jobCache + " isn'task created!", jobDir
- .exists());
+ assertTrue("job-dir in " + jobCache + " isn't created!", jobDir.exists());
// check the private permissions on the job directory
- UserGroupInformation ugi = UserGroupInformation.login(localizedJobConf);
- checkFilePermissions(jobDir.getAbsolutePath(), "drwx------", ugi
- .getUserName(), ugi.getGroupNames()[0]);
+ checkFilePermissions(jobDir.getAbsolutePath(), "drwx------", task
+ .getUser(), taskTrackerUGI.getGroupNames()[0]);
}
// check the localization of job.xml
- LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
-
assertTrue("job.xml is not localized on this TaskTracker!!", lDirAlloc
- .getLocalPathToRead(TaskTracker.getLocalJobConfFile(jobId.toString()),
- trackerFConf) != null);
+ .getLocalPathToRead(TaskTracker.getLocalJobConfFile(task.getUser(),
+ jobId.toString()), trackerFConf) != null);
// check the localization of job.jar
Path jarFileLocalized =
- lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(jobId
- .toString()), trackerFConf);
+ lDirAlloc.getLocalPathToRead(TaskTracker.getJobJarFile(task.getUser(),
+ jobId.toString()), trackerFConf);
assertTrue("job.jar is not localized on this TaskTracker!!",
jarFileLocalized != null);
assertTrue("lib/lib1.jar is not unjarred on this TaskTracker!!", new File(
@@ -256,8 +389,8 @@ public class TestTaskTrackerLocalization
// check the creation of job work directory
assertTrue("job-work dir is not created on this TaskTracker!!", lDirAlloc
- .getLocalPathToRead(TaskTracker.getJobWorkDir(jobId.toString()),
- trackerFConf) != null);
+ .getLocalPathToRead(TaskTracker.getJobWorkDir(task.getUser(), jobId
+ .toString()), trackerFConf) != null);
// Check the setting of job.local.dir and job.jar which will eventually be
// used by the user's task
@@ -267,11 +400,11 @@ public class TestTaskTrackerLocalization
String localizedJobJar = localizedJobConf.getJar();
for (String localDir : localizedJobConf.getStrings("mapred.local.dir")) {
if (localizedJobLocalDir.equals(localDir + Path.SEPARATOR
- + TaskTracker.getJobWorkDir(jobId.toString()))) {
+ + TaskTracker.getJobWorkDir(task.getUser(), jobId.toString()))) {
jobLocalDirFlag = true;
}
if (localizedJobJar.equals(localDir + Path.SEPARATOR
- + TaskTracker.getJobJarFile(jobId.toString()))) {
+ + TaskTracker.getJobJarFile(task.getUser(), jobId.toString()))) {
mapredJarFlag = true;
}
}
@@ -287,13 +420,21 @@ public class TestTaskTrackerLocalization
* Test task localization on a TT.
*
* @throws IOException
- * @throws LoginException
*/
public void testTaskLocalization()
- throws IOException,
- LoginException {
+ throws IOException {
+
+ tracker.getLocalizer().initializeUserDirs(task.getUser());
+ localizedJobConf = tracker.localizeJobFiles(task);
- JobConf localizedJobConf = tracker.localizeJobFiles(task);
+ // Now initialize the job via task-controller so as to set
+ // ownership/permissions of jars, job-work-dir
+ JobInitializationContext jobContext = new JobInitializationContext();
+ jobContext.jobid = jobId;
+ jobContext.user = task.getUser();
+ jobContext.workDir =
+ new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
+ taskController.initializeJob(jobContext);
TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
tip.setJobConf(localizedJobConf);
@@ -304,77 +445,194 @@ public class TestTaskTrackerLocalization
// check the functionality of localizeTask
for (String dir : trackerFConf.getStrings("mapred.local.dir")) {
- assertTrue("attempt-dir in localDir " + dir + " is not created!!",
- new File(dir, TaskTracker.getLocalTaskDir(jobId.toString(), taskId
- .toString())).exists());
+ File attemptDir =
+ new File(dir, TaskTracker.getLocalTaskDir(task.getUser(), jobId
+ .toString(), taskId.toString()));
+ assertTrue("attempt-dir " + attemptDir + " in localDir " + dir
+ + " is not created!!", attemptDir.exists());
}
- Path workDir =
- lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(task
- .getJobID().toString(), task.getTaskID().toString(), task
- .isTaskCleanupTask()), trackerFConf);
+ attemptWorkDir =
+ lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(
+ task.getUser(), task.getJobID().toString(), task.getTaskID()
+ .toString(), task.isTaskCleanupTask()), trackerFConf);
assertTrue("atttempt work dir for " + taskId.toString()
- + " is not created in any of the configured dirs!!", workDir != null);
+ + " is not created in any of the configured dirs!!",
+ attemptWorkDir != null);
TaskRunner runner = task.createRunner(tracker, tip);
// /////// Few more methods being tested
runner.setupChildTaskConfiguration(lDirAlloc);
- TaskRunner.createChildTmpDir(new File(workDir.toUri().getPath()),
+ TaskRunner.createChildTmpDir(new File(attemptWorkDir.toUri().getPath()),
localizedJobConf);
- File[] logFiles = TaskRunner.prepareLogFiles(task.getTaskID());
- // ///////
+ attemptLogFiles = TaskRunner.prepareLogFiles(task.getTaskID());
// Make sure the task-conf file is created
Path localTaskFile =
lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
- .getJobID().toString(), task.getTaskID().toString(), task
- .isTaskCleanupTask()), trackerFConf);
+ .getUser(), task.getJobID().toString(), task.getTaskID()
+ .toString(), task.isTaskCleanupTask()), trackerFConf);
assertTrue("Task conf file " + localTaskFile.toString()
+ " is not created!!", new File(localTaskFile.toUri().getPath())
.exists());
// /////// One more method being tested. This happens in child space.
- JobConf localizedTaskConf = new JobConf(localTaskFile);
+ localizedTaskConf = new JobConf(localTaskFile);
TaskRunner.setupChildMapredLocalDirs(task, localizedTaskConf);
// ///////
+ // Initialize task via TaskController
+ TaskControllerContext taskContext =
+ new TaskController.TaskControllerContext();
+ taskContext.env =
+ new JvmEnv(null, null, null, null, -1, new File(localizedJobConf
+ .get(TaskTracker.JOB_LOCAL_DIR)), null, localizedJobConf);
+ taskContext.task = task;
+ // /////////// The method being tested
+ taskController.initializeTask(taskContext);
+ // ///////////
+
+ checkTaskLocalization();
+ }
+
+ protected void checkTaskLocalization()
+ throws IOException {
// Make sure that the mapred.local.dir is sandboxed
for (String childMapredLocalDir : localizedTaskConf
.getStrings("mapred.local.dir")) {
assertTrue("Local dir " + childMapredLocalDir + " is not sandboxed !!",
- childMapredLocalDir.endsWith(TaskTracker.getLocalTaskDir(jobId
- .toString(), taskId.toString(), false)));
+ childMapredLocalDir.endsWith(TaskTracker.getLocalTaskDir(task
+ .getUser(), jobId.toString(), taskId.toString(), false)));
}
// Make sure task task.getJobFile is changed and pointed correctly.
assertTrue(task.getJobFile().endsWith(
- TaskTracker
- .getTaskConfFile(jobId.toString(), taskId.toString(), false)));
+ TaskTracker.getTaskConfFile(task.getUser(), jobId.toString(), taskId
+ .toString(), false)));
// Make sure that the tmp directories are created
assertTrue("tmp dir is not created in workDir "
- + workDir.toUri().getPath(),
- new File(workDir.toUri().getPath(), "tmp").exists());
+ + attemptWorkDir.toUri().getPath(), new File(attemptWorkDir.toUri()
+ .getPath(), "tmp").exists());
- // Make sure that the log are setup properly
+ // Make sure that the logs are setup properly
File logDir =
new File(HADOOP_LOG_DIR, TaskLog.USERLOGS_DIR_NAME + Path.SEPARATOR
+ task.getTaskID().toString());
assertTrue("task's log dir " + logDir.toString() + " doesn't exist!",
logDir.exists());
- UserGroupInformation ugi = UserGroupInformation.login(localizedJobConf);
- checkFilePermissions(logDir.getAbsolutePath(), "drwx------", ugi
- .getUserName(), ugi.getGroupNames()[0]);
+ checkFilePermissions(logDir.getAbsolutePath(), "drwx------", task
+ .getUser(), taskTrackerUGI.getGroupNames()[0]);
File expectedStdout = new File(logDir, TaskLog.LogName.STDOUT.toString());
assertTrue("stdout log file is improper. Expected : "
- + expectedStdout.toString() + " Observed : " + logFiles[0].toString(),
- expectedStdout.toString().equals(logFiles[0].toString()));
+ + expectedStdout.toString() + " Observed : "
+ + attemptLogFiles[0].toString(), expectedStdout.toString().equals(
+ attemptLogFiles[0].toString()));
File expectedStderr =
new File(logDir, Path.SEPARATOR + TaskLog.LogName.STDERR.toString());
assertTrue("stderr log file is improper. Expected : "
- + expectedStderr.toString() + " Observed : " + logFiles[1].toString(),
- expectedStderr.toString().equals(logFiles[1].toString()));
+ + expectedStderr.toString() + " Observed : "
+ + attemptLogFiles[1].toString(), expectedStderr.toString().equals(
+ attemptLogFiles[1].toString()));
+ }
+
+ /**
+ * @throws IOException
+ */
+ public void testTaskCleanup()
+ throws IOException {
+
+ // Localize job and localize task.
+ tracker.getLocalizer().initializeUserDirs(task.getUser());
+ localizedJobConf = tracker.localizeJobFiles(task);
+ // Now initialize the job via task-controller so as to set
+ // ownership/permissions of jars, job-work-dir
+ JobInitializationContext jobContext = new JobInitializationContext();
+ jobContext.jobid = jobId;
+ jobContext.user = localizedJobConf.getUser();
+ jobContext.workDir =
+ new File(localizedJobConf.get(TaskTracker.JOB_LOCAL_DIR));
+ taskController.initializeJob(jobContext);
+ TaskInProgress tip = tracker.new TaskInProgress(task, trackerFConf);
+ tip.setJobConf(localizedJobConf);
+ tip.localizeTask(task);
+ Path workDir =
+ lDirAlloc.getLocalPathToRead(TaskTracker.getTaskWorkDir(
+ task.getUser(), task.getJobID().toString(), task.getTaskID()
+ .toString(), task.isTaskCleanupTask()), trackerFConf);
+ TaskRunner runner = task.createRunner(tracker, tip);
+ tip.setTaskRunner(runner);
+ runner.setupChildTaskConfiguration(lDirAlloc);
+ TaskRunner.createChildTmpDir(new File(workDir.toUri().getPath()),
+ localizedJobConf);
+ TaskRunner.prepareLogFiles(task.getTaskID());
+ Path localTaskFile =
+ lDirAlloc.getLocalPathToRead(TaskTracker.getTaskConfFile(task
+ .getUser(), task.getJobID().toString(), task.getTaskID()
+ .toString(), task.isTaskCleanupTask()), trackerFConf);
+ JobConf localizedTaskConf = new JobConf(localTaskFile);
+ TaskRunner.setupChildMapredLocalDirs(task, localizedTaskConf);
+ TaskControllerContext taskContext =
+ new TaskController.TaskControllerContext();
+ taskContext.env =
+ new JvmEnv(null, null, null, null, -1, new File(localizedJobConf
+ .get(TaskTracker.JOB_LOCAL_DIR)), null, localizedJobConf);
+ taskContext.task = task;
+ // /////////// The method being tested
+ taskController.initializeTask(taskContext);
+
+ // TODO: Let the task run and create files.
+
+ InlineCleanupQueue cleanupQueue = new InlineCleanupQueue();
+ tracker.directoryCleanupThread = cleanupQueue;
+
+ // ////////// The central methods being tested
+ tip.removeTaskFiles(true, taskId);
+ tracker.removeJobFiles(task.getUser(), jobId.toString());
+ // //////////
+
+ // TODO: make sure that all files intended to be deleted are deleted.
+
+ assertTrue("Some task files are not deleted!! Number of stale paths is "
+ + cleanupQueue.stalePaths.size(), cleanupQueue.stalePaths.size() == 0);
+
+ // Check that the empty $mapred.local.dir/taskTracker/$user dirs are still
+ // there.
+ for (String localDir : localDirs) {
+ Path userDir =
+ new Path(localDir, TaskTracker.getUserDir(task.getUser()));
+ assertTrue("User directory " + userDir + " is not present!!",
+ tracker.localFs.exists(userDir));
+ }
+
+ // Test userlogs cleanup.
+ verifyUserLogsCleanup();
+ }
+
+ /**
+ * Test userlogs cleanup.
+ *
+ * @throws IOException
+ */
+ private void verifyUserLogsCleanup()
+ throws IOException {
+ Path logDir =
+ new Path(HADOOP_LOG_DIR.toURI().getPath(), TaskLog.USERLOGS_DIR_NAME
+ + Path.SEPARATOR + task.getTaskID().toString());
+
+ // Logs should be there before cleanup.
+ assertTrue("Userlogs dir " + logDir + " is not presen as expected!!",
+ tracker.localFs.exists(logDir));
+
+ // ////////// Another being tested
+ TaskLog.cleanup(-1); // -1 so as to move purgeTimeStamp to future and file
+ // modification time behind retainTimeStatmp
+ // //////////
+
+ // Logs should be gone after cleanup.
+ assertFalse("Userlogs dir " + logDir + " is not deleted as expected!!",
+ tracker.localFs.exists(logDir));
}
}
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java?rev=1077116&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTrackerDistributedCacheManagerWithLinuxTaskController.java Fri Mar 4 03:42:38 2011
@@ -0,0 +1,186 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+
+import javax.security.auth.login.LoginException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.ClusterWithLinuxTaskController.MyLinuxTaskController;
+import org.apache.hadoop.filecache.TestTrackerDistributedCacheManager;
+
+/**
+ * Test the DistributedCacheManager when LinuxTaskController is used.
+ *
+ */
+public class TestTrackerDistributedCacheManagerWithLinuxTaskController extends
+ TestTrackerDistributedCacheManager {
+
+ private File configFile;
+ private MyLinuxTaskController taskController;
+ private String taskTrackerSpecialGroup;
+
+ private static final Log LOG =
+ LogFactory
+ .getLog(TestTrackerDistributedCacheManagerWithLinuxTaskController.class);
+
+ @Override
+ protected void setUp()
+ throws IOException {
+
+ if (!ClusterWithLinuxTaskController.shouldRun()) {
+ return;
+ }
+
+ TEST_ROOT_DIR =
+ new File(System.getProperty("test.build.data", "/tmp"),
+ TestTrackerDistributedCacheManagerWithLinuxTaskController.class
+ .getSimpleName()).getAbsolutePath();
+
+ super.setUp();
+
+ taskController = new MyLinuxTaskController();
+ String path =
+ System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_PATH);
+ configFile =
+ ClusterWithLinuxTaskController.createTaskControllerConf(path, conf
+ .getStrings(JobConf.MAPRED_LOCAL_DIR_PROPERTY));
+ String execPath = path + "/task-controller";
+ taskController.setTaskControllerExe(execPath);
+ taskController.setConf(conf);
+ taskController.setup();
+
+ taskTrackerSpecialGroup =
+ TestTaskTrackerLocalization.getFilePermissionAttrs(execPath)[2];
+ }
+
+ @Override
+ protected void tearDown()
+ throws IOException {
+ if (!ClusterWithLinuxTaskController.shouldRun()) {
+ return;
+ }
+ if (configFile != null) {
+ configFile.delete();
+ }
+ super.tearDown();
+ }
+
+ /**
+ * Test the control flow of distributed cache manager when LinuxTaskController
+ * is used.
+ */
+ @Override
+ public void testManagerFlow()
+ throws IOException,
+ LoginException {
+ if (!ClusterWithLinuxTaskController.shouldRun()) {
+ return;
+ }
+
+ super.testManagerFlow();
+ }
+
+ @Override
+ protected String getJobOwnerName() {
+ String ugi =
+ System.getProperty(ClusterWithLinuxTaskController.TASKCONTROLLER_UGI);
+ String userName = ugi.split(",")[0];
+ return userName;
+ }
+
+ @Override
+ protected TaskController getTaskController() {
+ return taskController;
+ }
+
+ @Override
+ protected void checkFilePermissions(Path[] localCacheFiles)
+ throws IOException {
+ String cachedFirstFile = localCacheFiles[0].toUri().getPath();
+ String cachedSecondFile = localCacheFiles[1].toUri().getPath();
+ String userName = getJobOwnerName();
+
+ // First make sure that the cache files have proper permissions.
+ TestTaskTrackerLocalization.checkFilePermissions(cachedFirstFile,
+ "-r-xrwx---", userName, taskTrackerSpecialGroup);
+ TestTaskTrackerLocalization.checkFilePermissions(cachedSecondFile,
+ "-r-xrwx---", userName, taskTrackerSpecialGroup);
+
+ // Now. make sure that all the path components also have proper
+ // permissions.
+ checkPermissionOnPathComponents(cachedFirstFile, userName);
+ checkPermissionOnPathComponents(cachedSecondFile, userName);
+ }
+
+ /**
+ * @param cachedFilePath
+ * @param userName
+ * @throws IOException
+ */
+ private void checkPermissionOnPathComponents(String cachedFilePath,
+ String userName)
+ throws IOException {
+ // The trailing distcache/file/... string
+ String trailingStringForFirstFile =
+ cachedFilePath.replaceFirst(ROOT_MAPRED_LOCAL_DIR.getAbsolutePath()
+ + Path.SEPARATOR + "0_[0-" + (numLocalDirs - 1) + "]"
+ + Path.SEPARATOR + TaskTracker.getDistributedCacheDir(userName),
+ "");
+ LOG.info("Leading path for cacheFirstFile is : "
+ + trailingStringForFirstFile);
+ // The leading mapred.local.dir/0_[0-n]/taskTracker/$user string.
+ String leadingStringForFirstFile =
+ cachedFilePath.substring(0, cachedFilePath
+ .lastIndexOf(trailingStringForFirstFile));
+ LOG.info("Leading path for cacheFirstFile is : "
+ + leadingStringForFirstFile);
+
+ // Now check path permissions, starting with cache file's parent dir.
+ File path = new File(cachedFilePath).getParentFile();
+ while (!path.getAbsolutePath().equals(leadingStringForFirstFile)) {
+ TestTaskTrackerLocalization.checkFilePermissions(path.getAbsolutePath(),
+ "dr-xrws---", userName, taskTrackerSpecialGroup);
+ path = path.getParentFile();
+ }
+ }
+
+ @Override
+ public void testDeleteCache()
+ throws Exception {
+ if (!ClusterWithLinuxTaskController.shouldRun()) {
+ return;
+ }
+ super.testDeleteCache();
+ }
+
+ @Override
+ public void testFileSystemOtherThanDefault()
+ throws Exception {
+ if (!ClusterWithLinuxTaskController.shouldRun()) {
+ return;
+ }
+ super.testFileSystemOtherThanDefault();
+ }
+}