You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by co...@apache.org on 2010/06/21 21:02:51 UTC
svn commit: r956666 [3/4] - in /hadoop/mapreduce/trunk: ./ ivy/
src/test/aop/build/ src/test/mapred/org/apache/hadoop/mapred/
src/test/mapred/testjar/ src/test/system/ src/test/system/aop/
src/test/system/aop/org/ src/test/system/aop/org/apache/ src/te...
Added: hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestDistributedCacheModifiedFile.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestDistributedCacheModifiedFile.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestDistributedCacheModifiedFile.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestDistributedCacheModifiedFile.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,346 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.net.URI;
+import java.util.Collection;
+import java.util.ArrayList;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.TTClient;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.UtilsForTests;
+
+import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.SleepJob;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+/**
+ * Verify the Distributed Cache functionality. This test scenario is for a
+ * distributed cache file behaviour when it is modified before and after being
+ * accessed by maximum two jobs. Once a job uses a distributed cache file that
+ * file is stored in the mapred.local.dir. If the next job uses the same file,
+ * but with differnt timestamp, then that file is stored again. So, if two jobs
+ * choose the same tasktracker for their job execution then, the distributed
+ * cache file should be found twice.
+ *
+ * This testcase runs a job with a distributed cache file. All the tasks'
+ * corresponding tasktracker's handle is got and checked for the presence of
+ * distributed cache with proper permissions in the proper directory. Next when
+ * job runs again and if any of its tasks hits the same tasktracker, which ran
+ * one of the task of the previous job, then that file should be uploaded again
+ * and task should not use the old file. This is verified.
+ */
+
+public class TestDistributedCacheModifiedFile {
+
+ private static MRCluster cluster = null;
+ private static FileSystem dfs = null;
+ private static FileSystem ttFs = null;
+ private static JobClient client = null;
+ private static FsPermission permission = new FsPermission((short) 00777);
+
+ private static String uriPath = "hdfs:///tmp/test.txt";
+ private static final Path URIPATH = new Path(uriPath);
+ private String distributedFileName = "test.txt";
+
+ static final Log LOG =
+ LogFactory.getLog(TestDistributedCacheModifiedFile.class);
+
+ public TestDistributedCacheModifiedFile() throws Exception {
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ cluster = MRCluster.createCluster(new Configuration());
+ cluster.setUp();
+ client = cluster.getJTClient().getClient();
+ dfs = client.getFs();
+ // Deleting the file if it already exists
+ dfs.delete(URIPATH, true);
+
+ Collection<TTClient> tts = cluster.getTTClients();
+ // Stopping all TTs
+ for (TTClient tt : tts) {
+ tt.kill();
+ }
+ // Starting all TTs
+ for (TTClient tt : tts) {
+ tt.start();
+ }
+ // Waiting for 5 seconds to make sure tasktrackers are ready
+ Thread.sleep(5000);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ cluster.tearDown();
+ dfs.delete(URIPATH, true);
+
+ Collection<TTClient> tts = cluster.getTTClients();
+ // Stopping all TTs
+ for (TTClient tt : tts) {
+ tt.kill();
+ }
+ // Starting all TTs
+ for (TTClient tt : tts) {
+ tt.start();
+ }
+ }
+
+ @Test
+ /**
+ * This tests Distributed Cache for modified file
+ * @param none
+ * @return void
+ */
+ public void testDistributedCache() throws Exception {
+ Configuration conf = new Configuration(cluster.getConf());
+ JTProtocol wovenClient = cluster.getJTClient().getProxy();
+
+ // This counter will check for count of a loop,
+ // which might become infinite.
+ int count = 0;
+ // This boolean will decide whether to run job again
+ boolean continueLoop = true;
+ // counter for job Loop
+ int countLoop = 0;
+ // This counter increases with all the tasktrackers in which tasks ran
+ int taskTrackerCounter = 0;
+ // This will store all the tasktrackers in which tasks ran
+ ArrayList<String> taskTrackerCollection = new ArrayList<String>();
+ // This boolean tells if two tasks ran onteh same tasktracker or not
+ boolean taskTrackerFound = false;
+
+ do {
+ SleepJob job = new SleepJob();
+ job.setConf(conf);
+ Job slpJob = job.createJob(5, 1, 1000, 1000, 100, 100);
+
+ // Before starting, Modify the file
+ String input = "This will be the content of\n" + "distributed cache\n";
+ // Creating the path with the file
+ DataOutputStream file =
+ UtilsForTests.createTmpFileDFS(dfs, URIPATH, permission, input);
+
+ DistributedCache.createSymlink(conf);
+ URI uri = URI.create(uriPath);
+ DistributedCache.addCacheFile(uri, conf);
+ JobConf jconf = new JobConf(conf);
+
+ // Controls the job till all verification is done
+ FinishTaskControlAction.configureControlActionForJob(conf);
+
+ slpJob.submit();
+ // Submitting the job
+ RunningJob rJob =
+ cluster.getJTClient().getClient().getJob(
+ org.apache.hadoop.mapred.JobID.downgrade(slpJob.getJobID()));
+
+ // counter for job Loop
+ countLoop++;
+
+ TTClient tClient = null;
+ JobInfo jInfo = wovenClient.getJobInfo(rJob.getID());
+ LOG.info("jInfo is :" + jInfo);
+
+ // Assert if jobInfo is null
+ Assert.assertNotNull("jobInfo is null", jInfo);
+
+ // Wait for the job to start running.
+ count = 0;
+ while (jInfo.getStatus().getRunState() != JobStatus.RUNNING) {
+ UtilsForTests.waitFor(10000);
+ count++;
+ jInfo = wovenClient.getJobInfo(rJob.getID());
+ // If the count goes beyond a point, then break; This is to avoid
+ // infinite loop under unforeseen circumstances. Testcase will anyway
+ // fail later.
+ if (count > 10) {
+ Assert.fail("job has not reached running state for more than"
+ + "100 seconds. Failing at this point");
+ }
+ }
+
+ LOG.info("job id is :" + rJob.getID().toString());
+
+ TaskInfo[] taskInfos =
+ cluster.getJTClient().getProxy().getTaskInfo(rJob.getID());
+
+ boolean distCacheFileIsFound;
+
+ for (TaskInfo taskInfo : taskInfos) {
+ distCacheFileIsFound = false;
+ String[] taskTrackers = taskInfo.getTaskTrackers();
+ for (String taskTracker : taskTrackers) {
+ // Formatting tasktracker to get just its FQDN
+ taskTracker = UtilsForTests.getFQDNofTT(taskTracker);
+ LOG.info("taskTracker is :" + taskTracker);
+
+ // The tasktrackerFound variable is initialized
+ taskTrackerFound = false;
+
+ // This will be entered from the second job onwards
+ if (countLoop > 1) {
+ if (taskTracker != null) {
+ continueLoop = taskTrackerCollection.contains(taskTracker);
+ }
+ if (continueLoop) {
+ taskTrackerFound = true;
+ }
+ }
+ // Collecting the tasktrackers
+ if (taskTracker != null)
+ taskTrackerCollection.add(taskTracker);
+
+ // we have loopped through two times to look for task
+ // getting submitted on same tasktrackers.The same tasktracker
+ // for subsequent jobs was not hit maybe because of many number
+ // of tasktrackers. So, testcase has to stop here.
+ if (countLoop > 1) {
+ continueLoop = false;
+ }
+
+ tClient = cluster.getTTClient(taskTracker);
+
+ // tClient maybe null because the task is already dead. Ex: setup
+ if (tClient == null) {
+ continue;
+ }
+
+ String[] localDirs = tClient.getMapredLocalDirs();
+ int distributedFileCount = 0;
+ // Go to every single path
+ for (String localDir : localDirs) {
+ // Public Distributed cache will always be stored under
+ // mapre.local.dir/tasktracker/archive
+ localDir =
+ localDir
+ + Path.SEPARATOR
+ + TaskTracker.getPublicDistributedCacheDir();
+ LOG.info("localDir is : " + localDir);
+
+ // Get file status of all the directories
+ // and files under that path.
+ FileStatus[] fileStatuses =
+ tClient.listStatus(localDir, true, true);
+ for (FileStatus fileStatus : fileStatuses) {
+ Path path = fileStatus.getPath();
+ LOG.info("path is :" + path.toString());
+ // Checking if the received path ends with
+ // the distributed filename
+ distCacheFileIsFound =
+ (path.toString()).endsWith(distributedFileName);
+ // If file is found, check for its permission.
+ // Since the file is found break out of loop
+ if (distCacheFileIsFound) {
+ LOG.info("PATH found is :" + path.toString());
+ distributedFileCount++;
+ String filename = path.getName();
+ FsPermission fsPerm = fileStatus.getPermission();
+ Assert.assertTrue("File Permission is not 777", fsPerm
+ .equals(new FsPermission("777")));
+ }
+ }
+ }
+
+ LOG.debug("The distributed FileCount is :" + distributedFileCount);
+ LOG.debug("The taskTrackerFound is :" + taskTrackerFound);
+
+ // If distributed cache is modified in dfs
+ // between two job runs, it can be present more than once
+ // in any of the task tracker, in which job ran.
+ if (distributedFileCount != 2 && taskTrackerFound) {
+ Assert.fail("The distributed cache file has to be two. "
+ + "But found was " + distributedFileCount);
+ } else if (distributedFileCount > 1 && !taskTrackerFound) {
+ Assert.fail("The distributed cache file cannot more than one."
+ + " But found was " + distributedFileCount);
+ } else if (distributedFileCount < 1)
+ Assert.fail("The distributed cache file is less than one. "
+ + "But found was " + distributedFileCount);
+ if (!distCacheFileIsFound) {
+ Assert.assertEquals(
+ "The distributed cache file does not exist",
+ distCacheFileIsFound, false);
+ }
+ }
+ }
+ // Allow the job to continue through MR control job.
+ for (TaskInfo taskInfoRemaining : taskInfos) {
+ FinishTaskControlAction action =
+ new FinishTaskControlAction(TaskID.downgrade(taskInfoRemaining
+ .getTaskID()));
+ Collection<TTClient> tts = cluster.getTTClients();
+ for (TTClient cli : tts) {
+ cli.getProxy().sendAction(action);
+ }
+ }
+
+ // Killing the job because all the verification needed
+ // for this testcase is completed.
+ rJob.killJob();
+
+ // Waiting for 3 seconds for cleanup to start
+ Thread.sleep(3000);
+
+ // Getting the last cleanup task's tasktracker also, as
+ // distributed cache gets uploaded even during cleanup.
+ TaskInfo[] myTaskInfos = wovenClient.getTaskInfo(rJob.getID());
+ if (myTaskInfos != null) {
+ for (TaskInfo info : myTaskInfos) {
+ if (info.isSetupOrCleanup()) {
+ String[] taskTrackers = info.getTaskTrackers();
+ for (String taskTracker : taskTrackers) {
+ // Formatting tasktracker to get just its FQDN
+ taskTracker = UtilsForTests.getFQDNofTT(taskTracker);
+ LOG.info("taskTracker is :" + taskTracker);
+ // Collecting the tasktrackers
+ if (taskTracker != null)
+ taskTrackerCollection.add(taskTracker);
+ }
+ }
+ }
+ }
+
+ // Making sure that the job is complete.
+ while (jInfo != null && !jInfo.getStatus().isJobComplete()) {
+ Thread.sleep(10000);
+ jInfo = wovenClient.getJobInfo(rJob.getID());
+ }
+
+ } while (continueLoop);
+ }
+}
Added: hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestDistributedCachePrivateFile.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestDistributedCachePrivateFile.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestDistributedCachePrivateFile.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestDistributedCachePrivateFile.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,284 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.net.URI;
+import java.util.Collection;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.TTClient;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.UtilsForTests;
+
+import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.SleepJob;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+/**
+ * Verify the Distributed Cache functionality.
+ * This test scenario is for a distributed cache file behaviour
+ * when the file is private. Once a job uses a distributed
+ * cache file with private permissions that file is stored in the
+ * mapred.local.dir, under the directory which has the same name
+ * as job submitter's username. The directory has 700 permission
+ * and the file under it, should have 777 permissions.
+*/
+
+public class TestDistributedCachePrivateFile {
+
+ private static MRCluster cluster = null;
+ private static FileSystem dfs = null;
+ private static JobClient client = null;
+ private static FsPermission permission = new FsPermission((short)00770);
+
+ private static String uriPath = "hdfs:///tmp/test.txt";
+ private static final Path URIPATH = new Path(uriPath);
+ private String distributedFileName = "test.txt";
+
+ static final Log LOG = LogFactory.
+ getLog(TestDistributedCachePrivateFile.class);
+
+ public TestDistributedCachePrivateFile() throws Exception {
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ cluster = MRCluster.createCluster(new Configuration());
+ cluster.setUp();
+ client = cluster.getJTClient().getClient();
+ dfs = client.getFs();
+ //Deleting the file if it already exists
+ dfs.delete(URIPATH, true);
+
+ Collection<TTClient> tts = cluster.getTTClients();
+ //Stopping all TTs
+ for (TTClient tt : tts) {
+ tt.kill();
+ }
+ //Starting all TTs
+ for (TTClient tt : tts) {
+ tt.start();
+ }
+
+ String input = "This will be the content of\n" + "distributed cache\n";
+ //Creating the path with the file
+ DataOutputStream file =
+ UtilsForTests.createTmpFileDFS(dfs, URIPATH, permission, input);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ cluster.tearDown();
+ dfs.delete(URIPATH, true);
+
+ Collection<TTClient> tts = cluster.getTTClients();
+ //Stopping all TTs
+ for (TTClient tt : tts) {
+ tt.kill();
+ }
+ //Starting all TTs
+ for (TTClient tt : tts) {
+ tt.start();
+ }
+ }
+
+ @Test
+ /**
+ * This tests Distributed Cache for private file
+ * @param none
+ * @return void
+ */
+ public void testDistributedCache() throws Exception {
+ Configuration conf = new Configuration(cluster.getConf());
+ JTProtocol wovenClient = cluster.getJTClient().getProxy();
+
+ //This counter will check for count of a loop,
+ //which might become infinite.
+ int count = 0;
+
+ SleepJob job = new SleepJob();
+ job.setConf(conf);
+ Job slpJob = job.createJob(5, 1, 1000, 1000, 100, 100);
+
+ DistributedCache.createSymlink(conf);
+ URI uri = URI.create(uriPath);
+ DistributedCache.addCacheFile(uri, conf);
+ JobConf jconf = new JobConf(conf);
+
+ //Controls the job till all verification is done
+ FinishTaskControlAction.configureControlActionForJob(conf);
+
+ //Submitting the job
+ slpJob.submit();
+ RunningJob rJob =
+ cluster.getJTClient().getClient().getJob(org.apache.hadoop.mapred.JobID.downgrade(slpJob.getJobID()));
+
+ JobStatus[] jobStatus = client.getAllJobs();
+ String userName = jobStatus[0].getUsername();
+
+ TTClient tClient = null;
+ JobInfo jInfo = wovenClient.getJobInfo(rJob.getID());
+ LOG.info("jInfo is :" + jInfo);
+
+ //Assert if jobInfo is null
+ Assert.assertNotNull("jobInfo is null", jInfo);
+
+ //Wait for the job to start running.
+ count = 0;
+ while (jInfo.getStatus().getRunState() != JobStatus.RUNNING) {
+ UtilsForTests.waitFor(10000);
+ count++;
+ jInfo = wovenClient.getJobInfo(rJob.getID());
+ //If the count goes beyond a point, then Assert; This is to avoid
+ //infinite loop under unforeseen circumstances.
+ if (count > 10) {
+ Assert.fail("job has not reached running state for more than" +
+ "100 seconds. Failing at this point");
+ }
+ }
+
+ LOG.info("job id is :" + rJob.getID().toString());
+
+ TaskInfo[] taskInfos = cluster.getJTClient().getProxy()
+ .getTaskInfo(rJob.getID());
+
+ boolean distCacheFileIsFound;
+
+ for (TaskInfo taskInfo : taskInfos) {
+ distCacheFileIsFound = false;
+ String[] taskTrackers = taskInfo.getTaskTrackers();
+
+ for(String taskTracker : taskTrackers) {
+ //Getting the exact FQDN of the tasktracker from
+ //the tasktracker string.
+ taskTracker = UtilsForTests.getFQDNofTT(taskTracker);
+ tClient = cluster.getTTClient(taskTracker);
+ String[] localDirs = tClient.getMapredLocalDirs();
+ int distributedFileCount = 0;
+ String localDirOnly = null;
+
+ boolean FileNotPresentForThisDirectoryPath = false;
+
+ //Go to every single path
+ for (String localDir : localDirs) {
+ FileNotPresentForThisDirectoryPath = false;
+ localDirOnly = localDir;
+
+ //Public Distributed cache will always be stored under
+ //mapred.local.dir/tasktracker/archive
+ localDirOnly = localDir + Path.SEPARATOR + TaskTracker.SUBDIR +
+ Path.SEPARATOR + userName;
+
+ //Private Distributed cache will always be stored under
+ //mapre.local.dir/taskTracker/<username>/distcache
+ //Checking for username directory to check if it has the
+ //proper permissions
+ localDir = localDir + Path.SEPARATOR +
+ TaskTracker.getPrivateDistributedCacheDir(userName);
+
+ FileStatus fileStatusMapredLocalDirUserName = null;
+
+ try {
+ fileStatusMapredLocalDirUserName = tClient.
+ getFileStatus(localDirOnly, true);
+ } catch (Exception e) {
+ LOG.info("LocalDirOnly :" + localDirOnly + " not found");
+ FileNotPresentForThisDirectoryPath = true;
+ }
+
+ //File will only be stored under one of the mapred.lcoal.dir
+ //If other paths were hit, just continue
+ if (FileNotPresentForThisDirectoryPath)
+ continue;
+
+ Path pathMapredLocalDirUserName =
+ fileStatusMapredLocalDirUserName.getPath();
+ FsPermission fsPermMapredLocalDirUserName =
+ fileStatusMapredLocalDirUserName.getPermission();
+ Assert.assertTrue("Directory Permission is not 700",
+ fsPermMapredLocalDirUserName.equals(new FsPermission("700")));
+
+ //Get file status of all the directories
+ //and files under that path.
+ FileStatus[] fileStatuses = tClient.listStatus(localDir,
+ true, true);
+ for (FileStatus fileStatus : fileStatuses) {
+ Path path = fileStatus.getPath();
+ LOG.info("path is :" + path.toString());
+ //Checking if the received path ends with
+ //the distributed filename
+ distCacheFileIsFound = (path.toString()).
+ endsWith(distributedFileName);
+ //If file is found, check for its permission.
+ //Since the file is found break out of loop
+ if (distCacheFileIsFound){
+ LOG.info("PATH found is :" + path.toString());
+ distributedFileCount++;
+ String filename = path.getName();
+ FsPermission fsPerm = fileStatus.getPermission();
+ Assert.assertTrue("File Permission is not 777",
+ fsPerm.equals(new FsPermission("777")));
+ }
+ }
+ }
+
+ LOG.info("Distributed File count is :" + distributedFileCount);
+
+ if (distributedFileCount > 1) {
+ Assert.fail("The distributed cache file is more than one");
+ } else if (distributedFileCount < 1)
+ Assert.fail("The distributed cache file is less than one");
+ if (!distCacheFileIsFound) {
+ Assert.assertEquals("The distributed cache file does not exist",
+ distCacheFileIsFound, false);
+ }
+ }
+
+ //Allow the job to continue through MR control job.
+ for (TaskInfo taskInfoRemaining : taskInfos) {
+ FinishTaskControlAction action = new FinishTaskControlAction(TaskID
+ .downgrade(taskInfoRemaining.getTaskID()));
+ Collection<TTClient> tts = cluster.getTTClients();
+ for (TTClient cli : tts) {
+ cli.getProxy().sendAction(action);
+ }
+ }
+
+ //Killing the job because all the verification needed
+ //for this testcase is completed.
+ rJob.killJob();
+ }
+ }
+}
Added: hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestDistributedCacheUnModifiedFile.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestDistributedCacheUnModifiedFile.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestDistributedCacheUnModifiedFile.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestDistributedCacheUnModifiedFile.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,305 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.net.URI;
+import java.util.Collection;
+import java.util.ArrayList;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.TTClient;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.UtilsForTests;
+
+import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.SleepJob;
+
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.AfterClass;
+import org.junit.Test;
+
+/**
+ * Verify the Distributed Cache functionality. This test scenario is for a
+ * distributed cache file behaviour when it is not modified before and after
+ * being accessed by maximum two jobs. Once a job uses a distributed cache file
+ * that file is stored in the mapred.local.dir. If the next job uses the same
+ * file, then that is not stored again. So, if two jobs choose the same
+ * tasktracker for their job execution then, the distributed cache file should
+ * not be found twice.
+ *
+ * This testcase runs a job with a distributed cache file. All the tasks'
+ * corresponding tasktracker's handle is got and checked for the presence of
+ * distributed cache with proper permissions in the proper directory. Next when
+ * job runs again and if any of its tasks hits the same tasktracker, which ran
+ * one of the task of the previous job, then that file should not be uploaded
+ * again and task use the old file. This is verified.
+ */
+
+public class TestDistributedCacheUnModifiedFile {
+
+ private static MRCluster cluster = null;
+ private static FileSystem dfs = null;
+ private static FileSystem ttFs = null;
+ private static JobClient client = null;
+ private static FsPermission permission = new FsPermission((short) 00777);
+
+ private static String uriPath = "hdfs:///tmp/test.txt";
+ private static final Path URIPATH = new Path(uriPath);
+ private String distributedFileName = "test.txt";
+
+ static final Log LOG =
+ LogFactory.getLog(TestDistributedCacheUnModifiedFile.class);
+
+ public TestDistributedCacheUnModifiedFile() throws Exception {
+ }
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ cluster = MRCluster.createCluster(new Configuration());
+ cluster.setUp();
+ client = cluster.getJTClient().getClient();
+ dfs = client.getFs();
+ // Deleting the file if it already exists
+ dfs.delete(URIPATH, true);
+
+ Collection<TTClient> tts = cluster.getTTClients();
+ // Stopping all TTs
+ for (TTClient tt : tts) {
+ tt.kill();
+ }
+ // Starting all TTs
+ for (TTClient tt : tts) {
+ tt.start();
+ }
+
+ // Waiting for 5 seconds to make sure tasktrackers are ready
+ Thread.sleep(5000);
+
+ String input = "This will be the content of\n" + "distributed cache\n";
+ // Creating the path with the file
+ DataOutputStream file =
+ UtilsForTests.createTmpFileDFS(dfs, URIPATH, permission, input);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ cluster.tearDown();
+ dfs.delete(URIPATH, true);
+
+ Collection<TTClient> tts = cluster.getTTClients();
+ // Stopping all TTs
+ for (TTClient tt : tts) {
+ tt.kill();
+ }
+ // Starting all TTs
+ for (TTClient tt : tts) {
+ tt.start();
+ }
+ }
+
+ @Test
+ /**
+ * This tests Distributed Cache for unmodified file
+ * @param none
+ * @return void
+ */
+ public void testDistributedCache() throws Exception {
+ Configuration conf = new Configuration(cluster.getConf());
+ JTProtocol wovenClient = cluster.getJTClient().getProxy();
+
+ // This counter will check for count of a loop,
+ // which might become infinite.
+ int count = 0;
+ // This boolean will decide whether to run job again
+ boolean continueLoop = true;
+ // counter for job Loop
+ int countLoop = 0;
+ // This counter incerases with all the tasktrackers in which tasks ran
+ int taskTrackerCounter = 0;
+ // This will store all the tasktrackers in which tasks ran
+ ArrayList<String> taskTrackerCollection = new ArrayList<String>();
+
+ do {
+ SleepJob job = new SleepJob();
+ job.setConf(conf);
+ Job slpJob = job.createJob(5, 1, 1000, 1000, 100, 100);
+
+ DistributedCache.createSymlink(conf);
+ URI uri = URI.create(uriPath);
+ DistributedCache.addCacheFile(uri, conf);
+ JobConf jconf = new JobConf(conf);
+
+ // Controls the job till all verification is done
+ FinishTaskControlAction.configureControlActionForJob(conf);
+
+ // Submitting the job
+ slpJob.submit();
+ RunningJob rJob =
+ cluster.getJTClient().getClient().getJob(
+ org.apache.hadoop.mapred.JobID.downgrade(slpJob.getJobID()));
+
+ // counter for job Loop
+ countLoop++;
+
+ TTClient tClient = null;
+ JobInfo jInfo = wovenClient.getJobInfo(rJob.getID());
+ LOG.info("jInfo is :" + jInfo);
+
+ // Assert if jobInfo is null
+ Assert.assertNotNull("jobInfo is null", jInfo);
+
+ // Wait for the job to start running.
+ count = 0;
+ while (jInfo.getStatus().getRunState() != JobStatus.RUNNING) {
+ UtilsForTests.waitFor(10000);
+ count++;
+ jInfo = wovenClient.getJobInfo(rJob.getID());
+ // If the count goes beyond a point, then break; This is to avoid
+ // infinite loop under unforeseen circumstances. Testcase will anyway
+ // fail later.
+ if (count > 10) {
+ Assert.fail("job has not reached running state for more than"
+ + "100 seconds. Failing at this point");
+ }
+ }
+
+ LOG.info("job id is :" + rJob.getID().toString());
+
+ TaskInfo[] taskInfos =
+ cluster.getJTClient().getProxy().getTaskInfo(rJob.getID());
+
+ boolean distCacheFileIsFound;
+
+ for (TaskInfo taskInfo : taskInfos) {
+ distCacheFileIsFound = false;
+ String[] taskTrackers = taskInfo.getTaskTrackers();
+ for (String taskTracker : taskTrackers) {
+ // Formatting tasktracker to get just its FQDN
+ taskTracker = UtilsForTests.getFQDNofTT(taskTracker);
+ LOG.info("taskTracker is :" + taskTracker);
+
+ // This will be entered from the second job onwards
+ if (countLoop > 1) {
+ if (taskTracker != null) {
+ continueLoop = taskTrackerCollection.contains(taskTracker);
+ }
+ if (!continueLoop) {
+ break;
+ }
+ }
+
+ // Collecting the tasktrackers
+ if (taskTracker != null)
+ taskTrackerCollection.add(taskTracker);
+
+ // we have loopped through enough number of times to look for task
+ // getting submitted on same tasktrackers.The same tasktracker
+ // for subsequent jobs was not hit maybe because of many number
+ // of tasktrackers. So, testcase has to stop here.
+ if (countLoop > 2) {
+ continueLoop = false;
+ }
+
+ tClient = cluster.getTTClient(taskTracker);
+
+ // tClient maybe null because the task is already dead. Ex: setup
+ if (tClient == null) {
+ continue;
+ }
+
+ String[] localDirs = tClient.getMapredLocalDirs();
+ int distributedFileCount = 0;
+ // Go to every single path
+ for (String localDir : localDirs) {
+ // Public Distributed cache will always be stored under
+ // mapre.local.dir/tasktracker/archive
+ localDir =
+ localDir
+ + Path.SEPARATOR
+ + TaskTracker.getPublicDistributedCacheDir();
+ LOG.info("localDir is : " + localDir);
+
+ // Get file status of all the directories
+ // and files under that path.
+ FileStatus[] fileStatuses =
+ tClient.listStatus(localDir, true, true);
+ for (FileStatus fileStatus : fileStatuses) {
+ Path path = fileStatus.getPath();
+ LOG.info("path is :" + path.toString());
+ // Checking if the received path ends with
+ // the distributed filename
+ distCacheFileIsFound =
+ (path.toString()).endsWith(distributedFileName);
+ // If file is found, check for its permission.
+ // Since the file is found break out of loop
+ if (distCacheFileIsFound) {
+ LOG.info("PATH found is :" + path.toString());
+ distributedFileCount++;
+ String filename = path.getName();
+ FsPermission fsPerm = fileStatus.getPermission();
+ Assert.assertTrue("File Permission is not 777", fsPerm
+ .equals(new FsPermission("777")));
+ }
+ }
+ }
+
+ // Since distributed cache is unmodified in dfs
+ // between two job runs, it should not be present more than once
+ // in any of the task tracker, in which job ran.
+ if (distributedFileCount > 1) {
+ Assert.fail("The distributed cache file is more than one");
+ } else if (distributedFileCount < 1)
+ Assert.fail("The distributed cache file is less than one");
+ if (!distCacheFileIsFound) {
+ Assert.assertEquals(
+ "The distributed cache file does not exist",
+ distCacheFileIsFound, false);
+ }
+ }
+ }
+ // Allow the job to continue through MR control job.
+ for (TaskInfo taskInfoRemaining : taskInfos) {
+ FinishTaskControlAction action =
+ new FinishTaskControlAction(TaskID.downgrade(taskInfoRemaining
+ .getTaskID()));
+ Collection<TTClient> tts = cluster.getTTClients();
+ for (TTClient cli : tts) {
+ cli.getProxy().sendAction(action);
+ }
+ }
+
+ // Killing the job because all the verification needed
+ // for this testcase is completed.
+ rJob.killJob();
+ } while (continueLoop);
+ }
+}
Added: hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestFileOwner.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestFileOwner.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestFileOwner.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestFileOwner.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.MRConfig;
+import org.apache.hadoop.mapreduce.SleepJob;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.mapreduce.test.system.FinishTaskControlAction;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+import org.apache.hadoop.mapreduce.test.system.TTClient;
+import org.apache.hadoop.mapreduce.test.system.TTInfo;
+import org.apache.hadoop.mapreduce.test.system.TTTaskInfo;
+import org.apache.hadoop.mapreduce.test.system.TaskInfo;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestFileOwner {
+ public static MRCluster cluster;
+
+ private StringBuffer jobIdDir = new StringBuffer();
+ private JTProtocol wovenClient = null;
+ private static final Log LOG = LogFactory.getLog(TestFileOwner.class);
+ private String taskController = null;
+ private final FsPermission PERM_777 = new FsPermission("777");
+ private final FsPermission PERM_755 = new FsPermission("755");
+ private final FsPermission PERM_644 = new FsPermission("644");
+
+ @BeforeClass
+ public static void setUp() throws java.lang.Exception {
+ cluster = MRCluster.createCluster(new Configuration());
+ cluster.setUp();
+ }
+
+ /*
+ * The test is used to check the file permission of local files in
+ * mapred.local.dir. The job control is used which will make the tasks wait
+ * for completion until it is signaled
+ *
+ * @throws Exception in case of test errors
+ */
+ @Test
+ public void testFilePermission() throws Exception {
+ wovenClient = cluster.getJTClient().getProxy();
+ Configuration conf = new Configuration(cluster.getConf());
+ FinishTaskControlAction.configureControlActionForJob(conf);
+ SleepJob job = new SleepJob();
+ job.setConf(conf);
+ Job slpJob = job.createJob(1, 0, 100, 100, 100, 100);
+ JobConf jconf = new JobConf(conf);
+ slpJob.submit();
+ RunningJob rJob =
+ cluster.getJTClient().getClient().getJob(
+ org.apache.hadoop.mapred.JobID.downgrade(slpJob.getJobID()));
+ taskController = conf.get(TTConfig.TT_TASK_CONTROLLER);
+ // get the job info so we can get the env variables from the daemon.
+ // Now wait for the task to be in the running state, only then the
+ // directories will be created
+ JobInfo info = wovenClient.getJobInfo(rJob.getID());
+ Assert.assertNotNull("JobInfo is null", info);
+ JobID id = rJob.getID();
+ while (info.runningMaps() != 1) {
+ Thread.sleep(1000);
+ info = wovenClient.getJobInfo(id);
+ }
+ TaskInfo[] myTaskInfos = wovenClient.getTaskInfo(id);
+ for (TaskInfo tInfo : myTaskInfos) {
+ if (!tInfo.isSetupOrCleanup()) {
+ String[] taskTrackers = tInfo.getTaskTrackers();
+ for (String taskTracker : taskTrackers) {
+ TTInfo ttInfo = wovenClient.getTTInfo(taskTracker);
+ TTClient ttCli = cluster.getTTClient(ttInfo.getStatus().getHost());
+ Assert.assertNotNull("TTClient instance is null", ttCli);
+ TTTaskInfo ttTaskInfo = ttCli.getProxy().getTask(tInfo.getTaskID());
+ Assert.assertNotNull("TTTaskInfo is null", ttTaskInfo);
+ while (ttTaskInfo.getTaskStatus().getRunState() != TaskStatus.State.RUNNING) {
+ Thread.sleep(100);
+ ttTaskInfo = ttCli.getProxy().getTask(tInfo.getTaskID());
+ }
+ testPermissionWithTaskController(ttCli, conf, info);
+ FinishTaskControlAction action =
+ new FinishTaskControlAction(TaskID.downgrade(tInfo.getTaskID()));
+ for (TTClient cli : cluster.getTTClients()) {
+ cli.getProxy().sendAction(action);
+ }
+ }
+ }
+ }
+ JobInfo jInfo = wovenClient.getJobInfo(id);
+ jInfo = cluster.getJTClient().getProxy().getJobInfo(id);
+ while (!jInfo.getStatus().isJobComplete()) {
+ Thread.sleep(100);
+ jInfo = cluster.getJTClient().getProxy().getJobInfo(id);
+ }
+ }
+
+ private void testPermissionWithTaskController(
+ TTClient tClient, Configuration conf, JobInfo info) {
+ Assert.assertNotNull("TTclient is null", tClient);
+ FsPermission fsPerm = null;
+ String[] pathInfo = conf.getStrings(MRConfig.LOCAL_DIR);
+ for (int i = 0; i < pathInfo.length; i++) {
+ // First verify the jobid directory exists
+ jobIdDir = new StringBuffer();
+ String userName = null;
+ try {
+ JobStatus[] jobStatus = cluster.getJTClient().getClient().getAllJobs();
+ userName = jobStatus[0].getUsername();
+ } catch (Exception ex) {
+ LOG.error("Failed to get user name");
+ boolean status = false;
+ Assert.assertTrue("Failed to get the userName", status);
+ }
+ jobIdDir.append(pathInfo[i]).append(Path.SEPARATOR);
+ jobIdDir.append(TaskTracker.getLocalJobDir(userName, info
+ .getID().toString()));
+ FileStatus[] fs = null;
+ try {
+ fs = tClient.listStatus(jobIdDir.toString(), true);
+ } catch (Exception ex) {
+ LOG.error("Failed to get the jobIdDir files " + ex);
+ }
+ Assert.assertEquals("Filestatus length is zero", fs.length != 0, true);
+ for (FileStatus file : fs) {
+ try {
+ String filename = file.getPath().getName();
+ if (filename.equals(TaskTracker.JOBFILE)) {
+ if (taskController == DefaultTaskController.class.getName()) {
+ fsPerm = file.getPermission();
+ Assert.assertTrue("FilePermission failed for " + filename, fsPerm
+ .equals(PERM_777));
+ }
+ }
+ if (filename.startsWith("attempt")) {
+ StringBuffer attemptDir = new StringBuffer(jobIdDir);
+ attemptDir.append(Path.SEPARATOR).append(filename);
+ if (tClient.getFileStatus(attemptDir.toString(), true) != null) {
+ FileStatus[] attemptFs =
+ tClient.listStatus(attemptDir.toString(), true, true);
+ for (FileStatus attemptfz : attemptFs) {
+ Assert.assertNotNull("FileStatus is null", attemptfz);
+ fsPerm = attemptfz.getPermission();
+ Assert.assertNotNull("FsPermission is null", fsPerm);
+ if (taskController == DefaultTaskController.class.getName()) {
+ if (!attemptfz.isDir()) {
+ Assert.assertTrue(
+ "FilePermission failed for " + filename, fsPerm
+ .equals(PERM_777));
+ } else {
+ Assert.assertTrue(
+ "FilePermission failed for " + filename, fsPerm
+ .equals(PERM_755));
+ }
+ }
+ }
+ }
+ }
+ if (filename.equals(TaskTracker.TASKJARDIR)) {
+ StringBuffer jarsDir = new StringBuffer(jobIdDir);
+ jarsDir.append(Path.SEPARATOR).append(filename);
+ FileStatus[] jarsFs =
+ tClient.listStatus(jarsDir.toString(), true, true);
+ for (FileStatus jarsfz : jarsFs) {
+ Assert.assertNotNull("FileStatus is null", jarsfz);
+ fsPerm = jarsfz.getPermission();
+ Assert.assertNotNull("File permission is null", fsPerm);
+ if (taskController == DefaultTaskController.class.getName()) {
+ if (!jarsfz.isDir()) {
+ if (jarsfz.getPath().getName().equals("job.jar")) {
+ Assert.assertTrue(
+ "FilePermission failed for " + filename, fsPerm
+ .equals(PERM_777));
+ } else {
+ Assert.assertTrue(
+ "FilePermission failed for " + filename, fsPerm
+ .equals(PERM_644));
+ }
+ } else {
+ Assert.assertTrue(
+ "FilePermission failed for " + filename, fsPerm
+ .equals(PERM_755));
+ }
+ }
+ }
+ }
+ } catch (Exception ex) {
+ LOG.error("The exception occurred while searching for nonexsistent"
+ + "file, ignoring and continuing. " + ex);
+ }
+ }// for loop ends
+ }// for loop ends
+ }
+
+ @AfterClass
+ public static void tearDown() throws java.lang.Exception {
+ cluster.tearDown();
+ }
+}
Added: hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestJobKill.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestJobKill.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestJobKill.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestJobKill.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import testjar.JobKillCommitter;
+
+public class TestJobKill {
+ private static final Log LOG = LogFactory.getLog(TestJobKill.class);
+ private JTProtocol wovenClient = null;
+ private static Path outDir = new Path("output");
+ private static Path inDir = new Path("input");
+ private static FileSystem fs = null;
+ private static MRCluster cluster;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ cluster = MRCluster.createCluster(new Configuration());
+ cluster.setUp();
+ fs = inDir.getFileSystem(cluster.getJTClient().getConf());
+ if(!fs.exists(inDir)){
+ fs.create(inDir);
+ }
+ if (fs.exists(outDir)) {
+ fs.delete(outDir,true);
+ }
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ if(fs.exists(inDir)) {
+ fs.delete(inDir,true);
+ }
+ if (fs.exists(outDir)) {
+ fs.delete(outDir,true);
+ }
+ cluster.tearDown();
+ }
+
+ /*
+ * The test case intention is to test the job failure due to system
+ * exceptions, so the exceptions are thrown intentionally and the job is
+ * verified for failure. At the end of the test, the verification is made
+ * that the success file is not present in the hdfs location. This is because
+ * the success file only should exist if the actual job had succeeded.
+ *
+ * @throws Exception in a case of test errors
+ */
+ @Test
+ public void testSystemJobKill() throws Exception {
+ wovenClient = cluster.getJTClient().getProxy();
+ Configuration conf = new Configuration(cluster.getConf());
+ conf.set(MRJobConfig.MAP_MAX_ATTEMPTS, "1");
+ conf.set(MRJobConfig.REDUCE_MAX_ATTEMPTS, "1");
+ // fail the mapper job
+ failJob(conf, JobKillCommitter.CommitterWithNoError.class, "JobMapperFail",
+ JobKillCommitter.MapperFail.class, JobKillCommitter.ReducerPass.class,
+ false);
+ // fail the reducer job
+ failJob(conf, JobKillCommitter.CommitterWithNoError.class,
+ "JobReducerFail", JobKillCommitter.MapperPass.class,
+ JobKillCommitter.ReducerFail.class,false);
+ // fail the set up job
+ failJob(conf, JobKillCommitter.CommitterWithFailSetup.class,
+ "JobSetupFail", JobKillCommitter.MapperPass.class,
+ JobKillCommitter.ReducerPass.class,false);
+ // fail the clean up job
+ failJob(conf, JobKillCommitter.CommitterWithFailCleanup.class,
+ "JobCleanupFail", JobKillCommitter.MapperPass.class,
+ JobKillCommitter.ReducerPass.class,false);
+ }
+
+ private void failJob(Configuration conf,
+ Class<? extends OutputCommitter> theClass, String confName,
+ Class<? extends Mapper> mapClass, Class<? extends Reducer> redClass,
+ boolean isUserKill)
+ throws Exception {
+ Job job = new Job(conf, confName);
+ job.setJarByClass(JobKillCommitter.class);
+ job.setMapperClass(mapClass);
+ job.setCombinerClass(redClass);
+ job.setMapOutputKeyClass(Text.class);
+ job.setMapOutputValueClass(Text.class);
+ job.setReducerClass(redClass);
+ job.setNumReduceTasks(1);
+ FileInputFormat.addInputPath(job, inDir);
+ FileOutputFormat.setOutputPath(job, outDir);
+ JobConf jconf = new JobConf(job.getConfiguration(), JobKillCommitter.class);
+ jconf.setOutputCommitter(theClass);
+ if(!isUserKill)
+ {
+ RunningJob rJob = cluster.getJTClient().getClient().submitJob(jconf);
+ JobID id = rJob.getID();
+ JobInfo jInfo = wovenClient.getJobInfo(id);
+ Assert.assertTrue("Job is not in PREP state",
+ jInfo.getStatus().getRunState() == JobStatus.PREP);
+ }
+ else
+ {
+ //user kill job
+ RunningJob rJob = cluster.getJTClient().getClient().submitJob(jconf);
+ JobInfo info = wovenClient.getJobInfo(rJob.getID());
+ Assert.assertNotNull("Job Info is null",info);
+ JobID id = rJob.getID();
+ while (info.runningMaps() != 1) {
+ Thread.sleep(1000);
+ info = wovenClient.getJobInfo(id);
+ }
+ rJob.killJob();
+ }
+ checkCleanup(jconf);
+ deleteOutputDir();
+ }
+
+ /**
+ * This test is used to kill the job by explicity calling the kill api
+ * and making sure the clean up happens
+ * @throws Exception
+ */
+ @Test
+ public void testUserJobKill() throws Exception{
+ wovenClient = cluster.getJTClient().getProxy();
+ Configuration conf = new Configuration(cluster.getConf());
+ conf.set(MRJobConfig.MAP_MAX_ATTEMPTS, "1");
+ conf.set(MRJobConfig.REDUCE_MAX_ATTEMPTS, "1");
+ // fail the mapper job
+ failJob(conf, JobKillCommitter.CommitterWithNoError.class, "JobUserKill",
+ JobKillCommitter.MapperPassSleep.class,
+ JobKillCommitter.ReducerPass.class,true);
+ }
+
+ private void checkCleanup(JobConf conf) throws Exception {
+ if (outDir != null) {
+ if (fs.exists(outDir)) {
+ Path filePath = new Path(outDir,
+ FileOutputCommitter.SUCCEEDED_FILE_NAME);
+ // check to make sure the success file is not there since the job
+ // failed.
+ Assert.assertTrue("The success file is present when the job failed",
+ !fs.exists(filePath));
+ }
+ }
+ }
+
+ private void deleteOutputDir() throws Exception {
+ if (fs != null) {
+ fs.delete(outDir, true);
+ }
+ }
+}
Added: hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestPushConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestPushConfig.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestPushConfig.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestPushConfig.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+import java.io.File;
+import java.io.FileOutputStream;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+import org.apache.hadoop.test.system.AbstractDaemonClient;
+import org.apache.hadoop.test.system.process.HadoopDaemonRemoteCluster;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestPushConfig {
+ private static MRCluster cluster;
+ private String localConfDir = "localconf";
+ private static final Log LOG = LogFactory.getLog(
+ TestPushConfig.class.getName());
+
+ @BeforeClass
+ public static void before() throws Exception {
+ String [] expExcludeList = new String[2];
+ expExcludeList[0] = "java.net.ConnectException";
+ expExcludeList[1] = "java.io.IOException";
+
+ cluster = MRCluster.createCluster(new Configuration());
+ cluster.setExcludeExpList(expExcludeList);
+ cluster.setUp();
+ }
+
+ @AfterClass
+ public static void after() throws Exception {
+ cluster.tearDown();
+ }
+
+ /**
+ * This test about testing the pushConfig feature. The pushConfig functionality
+ * available as part of the cluster process manager. The functionality takes
+ * in local input directory and pushes all the files from the local to the
+ * remote conf directory. This functionality is required is change the config
+ * on the fly and restart the cluster which will be used by other test cases
+ * @throws Exception
+ */
+ @Test
+ public void testPushConfig() throws Exception {
+ final String DUMMY_CONFIG_STRING = "mapreduce.newdummy.conf";
+ final String DUMMY_CONFIG_STRING_VALUE = "HerriotTestRules";
+ Configuration origconf = new Configuration(cluster.getConf());
+ origconf.set(DUMMY_CONFIG_STRING, DUMMY_CONFIG_STRING_VALUE);
+ String localDir = HadoopDaemonRemoteCluster.getDeployedHadoopConfDir() +
+ File.separator + localConfDir;
+ File lFile = new File(localDir);
+ if(!lFile.exists()){
+ lFile.mkdir();
+ }
+ String mapredConf = localDir + File.separator + "mapred-site.xml";
+ File file = new File(mapredConf);
+ origconf.writeXml(new FileOutputStream(file));
+ Configuration daemonConf = cluster.getJTClient().getProxy().getDaemonConf();
+ Assert.assertTrue("Dummy varialble is expected to be null before restart.",
+ daemonConf.get(DUMMY_CONFIG_STRING) == null);
+ String newDir = cluster.getClusterManager().pushConfig(localDir);
+ cluster.stop();
+ AbstractDaemonClient cli = cluster.getJTClient();
+ waitForClusterStop(cli);
+ // make sure the cluster has actually stopped
+ cluster.getClusterManager().start(newDir);
+ cli = cluster.getJTClient();
+ waitForClusterStart(cli);
+ // make sure the cluster has actually started
+ Configuration newconf = cluster.getJTClient().getProxy().getDaemonConf();
+ Assert.assertTrue("Extra varialble is expected to be set",
+ newconf.get(DUMMY_CONFIG_STRING).equals(DUMMY_CONFIG_STRING_VALUE));
+ cluster.getClusterManager().stop(newDir);
+ cli = cluster.getJTClient();
+ // make sure the cluster has actually stopped
+ waitForClusterStop(cli);
+ // start the daemons with original conf dir
+ cluster.getClusterManager().start();
+ cli = cluster.getJTClient();
+ waitForClusterStart(cli);
+ daemonConf = cluster.getJTClient().getProxy().getDaemonConf();
+ Assert.assertTrue("Dummy variable is expected to be null after restart.",
+ daemonConf.get(DUMMY_CONFIG_STRING) == null);
+ lFile.delete();
+ }
+
+ private void waitForClusterStop(AbstractDaemonClient cli) throws Exception {
+ int i=1;
+ while (i < 40) {
+ try {
+ cli.ping();
+ Thread.sleep(1000);
+ i++;
+ } catch (Exception e) {
+ break;
+ }
+ }
+ for (AbstractDaemonClient tcli : cluster.getTTClients()) {
+ i = 1;
+ while (i < 40) {
+ try {
+ tcli.ping();
+ Thread.sleep(1000);
+ i++;
+ } catch (Exception e) {
+ break;
+ }
+ }
+ if (i >= 40) {
+ Assert.fail("TT on " + tcli.getHostName() + " Should have been down.");
+ }
+ }
+ }
+
+ private void waitForClusterStart(AbstractDaemonClient cli) throws Exception {
+ int i=1;
+ while (i < 40) {
+ try {
+ cli.ping();
+ break;
+ } catch (Exception e) {
+ i++;
+ Thread.sleep(1000);
+ LOG.info("Waiting for Jobtracker on host : "
+ + cli.getHostName() + " to come up.");
+ }
+ }
+ for (AbstractDaemonClient tcli : cluster.getTTClients()) {
+ i = 1;
+ while (i < 40) {
+ try {
+ tcli.ping();
+ break;
+ } catch (Exception e) {
+ i++;
+ Thread.sleep(1000);
+ LOG.info("Waiting for Tasktracker on host : "
+ + tcli.getHostName() + " to come up.");
+ }
+ }
+ }
+ }
+}
Added: hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestSortValidate.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestSortValidate.java?rev=956666&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestSortValidate.java (added)
+++ hadoop/mapreduce/trunk/src/test/system/test/org/apache/hadoop/mapred/TestSortValidate.java Mon Jun 21 19:02:49 2010
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import junit.framework.Assert;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.examples.RandomWriter;
+import org.apache.hadoop.examples.Sort;
+
+import org.apache.hadoop.mapreduce.test.system.JTProtocol;
+import org.apache.hadoop.mapreduce.test.system.JobInfo;
+import org.apache.hadoop.mapreduce.test.system.MRCluster;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * A System test to test the Map-Reduce framework's sort
+ * with a real Map-Reduce Cluster.
+ */
+public class TestSortValidate {
+ // Input/Output paths for sort
+ private static final Path SORT_INPUT_PATH = new Path("inputDirectory");
+ private static final Path SORT_OUTPUT_PATH = new Path("outputDirectory");
+
+ // make it big enough to cause a spill in the map
+ private static final int RW_BYTES_PER_MAP = 3 * 1024 * 1024;
+ private static final int RW_MAPS_PER_HOST = 2;
+
+ private MRCluster cluster = null;
+ private FileSystem dfs = null;
+ private JobClient client = null;
+
+ private static final Log LOG = LogFactory.getLog(TestSortValidate.class);
+
+ public TestSortValidate()
+ throws Exception {
+ cluster = MRCluster.createCluster(new Configuration());
+ }
+
+ @Before
+ public void setUp() throws java.lang.Exception {
+ cluster.setUp();
+ client = cluster.getJTClient().getClient();
+
+ dfs = client.getFs();
+ dfs.delete(SORT_INPUT_PATH, true);
+ dfs.delete(SORT_OUTPUT_PATH, true);
+ }
+
+ @After
+ public void after() throws Exception {
+ cluster.tearDown();
+ dfs.delete(SORT_INPUT_PATH, true);
+ dfs.delete(SORT_OUTPUT_PATH, true);
+ }
+
+ public void runRandomWriter(Configuration job, Path sortInput)
+ throws Exception {
+ // Scale down the default settings for RandomWriter for the test-case
+ // Generates NUM_HADOOP_SLAVES * RW_MAPS_PER_HOST * RW_BYTES_PER_MAP
+ job.setInt("test.randomwrite.bytes_per_map", RW_BYTES_PER_MAP);
+ job.setInt("test.randomwriter.maps_per_host", RW_MAPS_PER_HOST);
+ String[] rwArgs = {sortInput.toString()};
+
+ runAndVerify(job,new RandomWriter(), rwArgs);
+ }
+
+ private void runAndVerify(Configuration job, Tool tool, String[] args)
+ throws Exception {
+
+ // This calculates the previous number fo jobs submitted before a new
+ // job gets submitted.
+ int prevJobsNum = 0;
+
+ // JTProtocol wovenClient
+ JTProtocol wovenClient = cluster.getJTClient().getProxy();
+
+ // JobStatus
+ JobStatus[] jobStatus = null;
+
+ // JobID
+ JobID id = null;
+
+ // RunningJob rJob;
+ RunningJob rJob = null;
+
+ // JobInfo jInfo;
+ JobInfo jInfo = null;
+
+ //Getting the previous job numbers that are submitted.
+ jobStatus = client.getAllJobs();
+ prevJobsNum = jobStatus.length;
+
+ // Run RandomWriter
+ Assert.assertEquals(ToolRunner.run(job, tool, args), 0);
+
+ //Waiting for the job to appear in the jobstatus
+ jobStatus = client.getAllJobs();
+
+ while (jobStatus.length - prevJobsNum == 0) {
+ LOG.info("Waiting for the job to appear in the jobStatus");
+ Thread.sleep(1000);
+ jobStatus = client.getAllJobs();
+ }
+
+ //Getting the jobId of the just submitted job
+ //The just submitted job is always added in the first slot of jobstatus
+ id = jobStatus[0].getJobID();
+
+ rJob = client.getJob(id);
+
+ jInfo = wovenClient.getJobInfo(id);
+
+ //Making sure that the job is complete.
+ while (jInfo != null && !jInfo.getStatus().isJobComplete()) {
+ Thread.sleep(10000);
+ jInfo = wovenClient.getJobInfo(id);
+ }
+
+ cluster.getJTClient().verifyCompletedJob(id);
+ }
+
+ private void runSort(Configuration job, Path sortInput, Path sortOutput)
+ throws Exception {
+
+ job.setInt("io.sort.mb", 1);
+
+ // Setup command-line arguments to 'sort'
+ String[] sortArgs = {sortInput.toString(), sortOutput.toString()};
+
+ runAndVerify(job,new Sort(), sortArgs);
+
+ }
+
+ private void runSortValidator(Configuration job,
+ Path sortInput, Path sortOutput)
+ throws Exception {
+ String[] svArgs = {"-sortInput", sortInput.toString(),
+ "-sortOutput", sortOutput.toString()};
+
+ runAndVerify(job,new SortValidator(), svArgs);
+
+ }
+
+ @Test
+ public void testMapReduceSort() throws Exception {
+ // Run randomwriter to generate input for 'sort'
+ runRandomWriter(cluster.getConf(), SORT_INPUT_PATH);
+
+ // Run sort
+ runSort(cluster.getConf(), SORT_INPUT_PATH, SORT_OUTPUT_PATH);
+
+ // Run sort-validator to check if sort worked correctly
+ runSortValidator(cluster.getConf(), SORT_INPUT_PATH,
+ SORT_OUTPUT_PATH);
+ }
+}