You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/03/17 19:44:24 UTC

svn commit: r1082633 - in /hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src: java/org/apache/hadoop/mapred/gridmix/ test/system/org/apache/hadoop/mapred/gridmix/ test/system/org/apache/hadoop/mapred/gridmix/test/system/

Author: acmurthy
Date: Thu Mar 17 18:44:24 2011
New Revision: 1082633

URL: http://svn.apache.org/viewvc?rev=1082633&view=rev
Log:
System tests for HDFS and local FS based distributed cache emulation feature. Contributed by Vinay Kumar

Modified:
    hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java
    hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/GridmixSystemTestCase.java
    hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestGridMixDataGeneration.java
    hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridMixConfig.java
    hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridmixJobVerification.java
    hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/UtilsForGridmix.java

Modified: hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java?rev=1082633&r1=1082632&r2=1082633&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/DistributedCacheEmulator.java Thu Mar 17 18:44:24 2011
@@ -366,7 +366,7 @@ class DistributedCacheEmulator {
    * @return true if the path provided is of a local file system based
    *              distributed cache file
    */
-  private boolean isLocalDistCacheFile(String filePath, String user,
+  static boolean isLocalDistCacheFile(String filePath, String user,
                                        boolean visibility) {
     return (!visibility && filePath.contains(user + "/.staging"));
   }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/GridmixSystemTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/GridmixSystemTestCase.java?rev=1082633&r1=1082632&r2=1082633&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/GridmixSystemTestCase.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/GridmixSystemTestCase.java Thu Mar 17 18:44:24 2011
@@ -74,10 +74,11 @@ public class GridmixSystemTestCase {
   @AfterClass
   public static void after() throws Exception {
     UtilsForGridmix.cleanup(gridmixDir, rtClient.getDaemonConf());
-    org.apache.hadoop.fs.FileUtil.fullyDelete(new java.io.File("/tmp/gridmix-st"));
+    org.apache.hadoop.fs.FileUtil.fullyDelete(new java.io.File(System.
+        getProperty("java.io.tmpdir") + "/gridmix-st/"));
     cluster.tearDown();
-    if (gridmixJS.getJobConf().get("gridmix.user.resolve.class").
-        contains("RoundRobin")) {
+    if (gridmixJS != null && gridmixJS.getJobConf().
+        get("gridmix.user.resolve.class"). contains("RoundRobin")) {
        List<String> proxyUsers = UtilsForGridmix.
            listProxyUsers(gridmixJS.getJobConf(),
            UserGroupInformation.getLoginUser().getShortUserName());
@@ -97,19 +98,19 @@ public class GridmixSystemTestCase {
   
   public static void runGridmixAndVerify(String [] runtimeValues, 
       String [] otherValues, String tracePath, int mode) throws Exception {
-    jobids = runGridmix(runtimeValues, otherValues, mode);
+    List<JobID> jobids = runGridmix(runtimeValues, otherValues, mode);
     gridmixJV = new GridmixJobVerification(
         new Path(tracePath), gridmixJS.getJobConf(), jtClient);
     gridmixJV.verifyGridmixJobsWithJobStories(jobids);  
   }
-  
+
   public static List<JobID> runGridmix(String[] runtimeValues, 
      String[] otherValues, int mode) throws Exception {
     gridmixJS = new GridmixJobSubmission(rtClient.getDaemonConf(),
        jtClient, gridmixDir);
     gridmixJS.submitJobs(runtimeValues, otherValues, mode);
-    jobids = UtilsForGridmix.listGridmixJobIDs(jtClient.getClient(), 
-       gridmixJS.getGridmixJobCount());
+    List<JobID> jobids = UtilsForGridmix.listGridmixJobIDs(
+       jtClient.getClient(), gridmixJS.getGridmixJobCount());
     return jobids;
   }
   
@@ -126,4 +127,10 @@ public class GridmixSystemTestCase {
     }
     return null;
   }
+
+  public static boolean isLocalDistCache(String fileName, String userName, 
+     boolean visibility) {
+    return DistributedCacheEmulator.isLocalDistCacheFile(fileName, 
+        userName, visibility);
+  }
 }

Modified: hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestGridMixDataGeneration.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestGridMixDataGeneration.java?rev=1082633&r1=1082632&r2=1082633&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestGridMixDataGeneration.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/TestGridMixDataGeneration.java Thu Mar 17 18:44:24 2011
@@ -177,9 +177,10 @@ public class TestGridMixDataGeneration {
        dataSize + 0.1 > inputSize || dataSize - 0.1 < inputSize);
  
     JobClient jobClient = jtClient.getClient();
+    int len = jobClient.getAllJobs().length;
     LOG.info("Verify the job status after completion of job.");
     Assert.assertEquals("Job has not succeeded.", JobStatus.SUCCEEDED, 
-       jobClient.getAllJobs()[0].getRunState());
+       jobClient.getAllJobs()[len-1].getRunState());
   }
   
   private void verifyEachNodeSize(Path inputDir) throws IOException {

Modified: hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridMixConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridMixConfig.java?rev=1082633&r1=1082632&r2=1082633&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridMixConfig.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridMixConfig.java Thu Mar 17 18:44:24 2011
@@ -108,4 +108,29 @@ public class GridMixConfig {
    */
   public static final String GRIDMIX_DISTCACHE_ENABLE = 
       "gridmix.distributed-cache-emulation.enable";
+
+  /**
+   * Gridmix distributed cache visibilities.
+   */
+  public static final String GRIDMIX_DISTCACHE_VISIBILITIES =
+    "mapreduce.job.cache.files.visibilities";
+  
+  /**
+   * Gridmix distributed cache files.
+   */
+  public static final String GRIDMIX_DISTCACHE_FILES = 
+    "mapreduce.job.cache.files";
+  
+  /**
+   * Gridmix distributed cache files size.
+   */
+  public static final String GRIDMIX_DISTCACHE_FILESSIZE =
+    "mapreduce.job.cache.files.filesizes";
+
+  /**
+   * Gridmix distributed cache files time stamp.
+   */
+  public static final String GRIDMIX_DISTCACHE_TIMESTAMP =
+    "mapreduce.job.cache.files.timestamps";
 }
+

Modified: hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridmixJobVerification.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridmixJobVerification.java?rev=1082633&r1=1082632&r2=1082633&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridmixJobVerification.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/GridmixJobVerification.java Thu Mar 17 18:44:24 2011
@@ -18,17 +18,25 @@
 package org.apache.hadoop.mapred.gridmix.test.system;
 
 import java.io.IOException;
+import java.io.File;
 import java.util.Iterator;
 import java.util.List;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Map;
 import java.util.HashMap;
+import java.util.SortedMap;
+import java.util.TreeMap;
 import java.util.Set;
+import java.util.Collections;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.Counter;
@@ -45,6 +53,7 @@ import org.apache.hadoop.tools.rumen.Tas
 import org.junit.Assert;
 import java.text.ParseException;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.mapred.gridmix.GridmixSystemTestCase;
 /**
  * Verifying each Gridmix job with corresponding job story in a trace file.
  */
@@ -54,6 +63,8 @@ public class GridmixJobVerification {
   private Path path;
   private Configuration conf;
   private JTClient jtClient;
+  private Map<String, List<JobConf>> simuAndOrigJobsInfo = 
+      new HashMap<String, List<JobConf>>();
   /**
    * Gridmix job verification constructor
    * @param path - path of the gridmix output directory.
@@ -76,11 +87,12 @@ public class GridmixJobVerification {
   public void verifyGridmixJobsWithJobStories(List<JobID> jobids) 
       throws IOException, ParseException {
 
-    List<Long> origSubmissionTime = new ArrayList<Long>();
-    List<Long> simuSubmissionTime = new ArrayList<Long>();
+    SortedMap <Long, String> origSubmissionTime = new TreeMap <Long, String>();
+    SortedMap <Long, String> simuSubmissionTime = new TreeMap<Long, String>();
     GridmixJobStory gjs = new GridmixJobStory(path, conf);
     final Iterator<JobID> ite = jobids.iterator();
-    java.io.File destFolder = new java.io.File("/tmp/gridmix-st/");
+    File destFolder = new File(System.getProperty("java.io.tmpdir") + 
+        "/gridmix-st/");
 
     destFolder.mkdir();
     while (ite.hasNext()) {
@@ -93,129 +105,491 @@ public class GridmixJobVerification {
       long expReduceInputRecs = 0;
       long expReduceOutputRecs = 0;
       
-      JobID currJobId = ite.next();
-      String historyFilePath = jtClient.getProxy().
-          getJobHistoryLocationForRetiredJob(currJobId);
-      Path jhpath = new Path(historyFilePath);
-      FileSystem fs = jhpath.getFileSystem(conf);
-      JobHistoryParser jhparser = new JobHistoryParser(fs, jhpath);
-      JobHistoryParser.JobInfo jhInfo = jhparser.parse();
+      JobID simuJobId = ite.next();
+      JobHistoryParser.JobInfo jhInfo = getSimulatedJobHistory(simuJobId);
+      Assert.assertNotNull("Job history not found.", jhInfo);
       Counters counters = jhInfo.getTotalCounters();
-
-      fs.copyToLocalFile(jhpath,new Path(destFolder.toString()));
-      fs.copyToLocalFile(new Path(historyFilePath + "_conf.xml"), 
-          new Path(destFolder.toString()));
-      JobConf jobConf = new JobConf(conf);
-      jobConf.addResource(new Path("/tmp/gridmix-st/" + 
-          currJobId + "_conf.xml"));
-      String origJobId = jobConf.get("gridmix.job.original-job-id");
+      JobConf simuJobConf = getSimulatedJobConf(simuJobId,destFolder);
+      String origJobId = simuJobConf.get("gridmix.job.original-job-id");
       LOG.info("OriginalJobID<->CurrentJobID:" + 
-          origJobId + "<->" + currJobId);
+          origJobId + "<->" + simuJobId);
 
       ZombieJob zombieJob = gjs.getZombieJob(JobID.forName(origJobId));
-      LoggedJob loggedJob = zombieJob.getLoggedJob();
-      
-      for (int index = 0; index < zombieJob.getNumberMaps(); index ++) {
-        TaskInfo mapTask = zombieJob.getTaskInfo(TaskType.MAP, index);
-        expMapInputBytes += mapTask.getInputBytes();
-        expMapOutputBytes += mapTask.getOutputBytes();
-        expMapInputRecs += mapTask.getInputRecords();
-        expMapOutputRecs += mapTask.getOutputRecords();
-      }
+      Map<String, Long> mapJobCounters = getJobMapCounters(zombieJob);
+      Map<String, Long> reduceJobCounters = getJobReduceCounters(zombieJob);
 
-      for (int index = 0; index < zombieJob.getNumberReduces(); index ++) {
-        TaskInfo reduceTask = zombieJob.getTaskInfo(TaskType.REDUCE, index);
-        expReduceInputBytes += reduceTask.getInputBytes();
-        expReduceOutputBytes += reduceTask.getOutputBytes();
-        expReduceInputRecs += reduceTask.getInputRecords();
-        expReduceOutputRecs += reduceTask.getOutputRecords();
+      if (simuJobConf.get("gridmix.job-submission.policy").contains("REPLAY")) {
+          origSubmissionTime.put(zombieJob.getSubmissionTime(), 
+              origJobId.toString() + "^" + simuJobId); 
+          simuSubmissionTime.put(jhInfo.getSubmitTime() , 
+              origJobId.toString() + "^" + simuJobId); ;
       }
 
-      LOG.info("Verifying the job <" + currJobId + "> and wait for a while...");
-      Assert.assertEquals("Job id has not matched",
-          zombieJob.getJobID(), JobID.forName(origJobId));
-
-      Assert.assertEquals("Job maps have not matched", 
-          zombieJob.getNumberMaps(), 
-          jhInfo.getTotalMaps());
-
-      if (!jobConf.getBoolean("gridmix.sleep.maptask-only",false)) {
-        Assert.assertEquals("Job reducers have not matched",
-            zombieJob.getNumberReduces(), jhInfo.getTotalReduces());
+      LOG.info("Verifying the job <" + simuJobId + "> and wait for a while...");
+      verifySimulatedJobSummary(zombieJob, jhInfo, simuJobConf);
+      verifyJobMapCounters(counters,mapJobCounters,simuJobConf);
+      verifyJobReduceCounters(counters,reduceJobCounters,simuJobConf); 
+      verifyDistributeCache(zombieJob,simuJobConf);
+      setJobDistributedCacheInfo(simuJobId.toString(), simuJobConf, 
+         zombieJob.getJobConf());
+      LOG.info("Done.");
+    }
+    verifyDistributedCacheBetweenJobs(simuAndOrigJobsInfo);
+  }
+
+  /**
+   * Verify the distributed cache files between the jobs in a gridmix run.
+   * @param jobsInfo - jobConfs of simulated and original jobs as a map.
+   */
+  public void verifyDistributedCacheBetweenJobs(Map<String, 
+      List<JobConf>> jobsInfo) {
+     if (jobsInfo.size() > 1) {
+       Map<String, Integer> simJobfilesOccurBtnJobs = 
+          getDistcacheFilesOccurenceBetweenJobs(jobsInfo, 0);
+       Map<String, Integer> origJobfilesOccurBtnJobs = 
+           getDistcacheFilesOccurenceBetweenJobs(jobsInfo, 1);
+       List<Integer> simuOccurList = 
+           getMapValuesAsList(simJobfilesOccurBtnJobs);
+       Collections.sort(simuOccurList);
+       List<Integer> origOccurList = 
+           getMapValuesAsList(origJobfilesOccurBtnJobs);
+       Collections.sort(origOccurList);
+       Assert.assertTrue("The unique count of distibuted cache files in " + 
+           "simulated jobs have not matched with the unique count of " + 
+           "original jobs distributed files ", 
+           simuOccurList.size() == origOccurList.size());
+       int index = 0;
+       for(Integer origDistFileCount : origOccurList) {
+         Assert.assertTrue("Distributed cache file reused in simulated " +
+             "jobs has not matched with reused of distributed cache file " + 
+             "in original jobs.",origDistFileCount == simuOccurList.get(index));
+         index ++;
+       }
+     }
+  }
+  
+  private List<Integer> getMapValuesAsList(Map<String,Integer> jobOccurs) {
+    List<Integer> occursList = new ArrayList<Integer>();
+    Set<String> files = jobOccurs.keySet();
+    Iterator<String > ite = files.iterator();
+    while(ite.hasNext()) {
+      String file = ite.next(); 
+      occursList.add(jobOccurs.get(file));
+    }
+    return occursList;
+  }
+  
+
+  /**
+   * Get the unique distributed cache files and occurrence between the jobs.
+   * @param jobsInfo - job's configurations as a map.
+   * @param jobConfIndex - 0 for simulated job configuration and 
+   *                       1 for original jobs configuration.
+   * @return  - unique distributed cache files and occurrences as map.
+   */
+  private Map<String, Integer> getDistcacheFilesOccurenceBetweenJobs(
+      Map<String, List<JobConf>> jobsInfo, int jobConfIndex) {
+    Map<String,Integer> filesOccurBtnJobs = new HashMap <String,Integer>();
+    Set<String> jobIds = jobsInfo.keySet();
+    Iterator<String > ite = jobIds.iterator();
+    while(ite.hasNext()){
+        String jobId = ite.next();
+        List<JobConf> jobconfs = jobsInfo.get(jobId);
+        String [] distCacheFiles = jobconfs.get(jobConfIndex).
+        get(GridMixConfig.GRIDMIX_DISTCACHE_FILES).split(",");
+        String [] distCacheFileTimeStamps = jobconfs.get(jobConfIndex).
+        get(GridMixConfig.GRIDMIX_DISTCACHE_TIMESTAMP).split(",");
+        String [] distCacheFileVisib = jobconfs.get(jobConfIndex).
+        get(GridMixConfig.GRIDMIX_DISTCACHE_VISIBILITIES).split(",");
+        int indx = 0;
+        for (String distCacheFile : distCacheFiles) {
+          String fileAndSize = distCacheFile + "^" + 
+              distCacheFileTimeStamps[indx] + "^" +  
+              jobconfs.get(jobConfIndex).getUser();
+          if (filesOccurBtnJobs.get(fileAndSize)!= null) {
+            int count = filesOccurBtnJobs.get(fileAndSize);
+            count ++;
+            filesOccurBtnJobs.put(fileAndSize,count);
+          } else {
+            filesOccurBtnJobs.put(fileAndSize,1);
+          }
+        }
+    }
+    return filesOccurBtnJobs;
+  }
+  
+  private void setJobDistributedCacheInfo(String jobId, JobConf simuJobConf, 
+     JobConf origJobConf) {
+    if (simuJobConf.get(GridMixConfig.GRIDMIX_DISTCACHE_FILES)!= null) {
+      List<JobConf> jobConfs = new ArrayList<JobConf>();
+      jobConfs.add(simuJobConf);
+      jobConfs.add(origJobConf);
+      simuAndOrigJobsInfo.put(jobId,jobConfs);
+    }
+  }
+
+  /**
+   * Verify the job subimssion order between the jobs in replay mode.
+   * @param origSubmissionTime - sorted map of original jobs submission times.
+   * @param simuSubmissionTime - sorted map of simulated jobs submission times.
+   */
+  public void verifyJobSumissionTime(SortedMap<Long, String> origSubmissionTime, 
+     SortedMap<Long, String> simuSubmissionTime){
+    Assert.assertTrue("Simulated job's submission time count has " + 
+       "not match with Original job's submission time count.", 
+       origSubmissionTime.size() == simuSubmissionTime.size());
+    for ( int index = 0; index < origSubmissionTime.size(); index ++) {
+        String origAndSimuJobID = origSubmissionTime.get(index);
+        String simuAndorigJobID = simuSubmissionTime.get(index);
+        Assert.assertEquals("Simulated jobs have not submitted in same " + 
+            "order as original jobs submitted inREPLAY mode.", 
+            origAndSimuJobID, simuAndorigJobID);
+    }
+  }
+
+  /**
+   * It verifies the distributed cache emulation of  a job.
+   * @param zombieJob - Original job story.
+   * @param simuJobConf - Simulated job configuration.
+   */
+  public void verifyDistributeCache(ZombieJob zombieJob, 
+      JobConf simuJobConf) throws IOException {
+
+    if (simuJobConf.getBoolean(GridMixConfig.GRIDMIX_DISTCACHE_ENABLE, false)) {
+      JobConf origJobConf = zombieJob.getJobConf();
+      checkFileVisibility(simuJobConf);
+      checkDistcacheFiles(simuJobConf,origJobConf);
+      checkFileSizes(simuJobConf,origJobConf);
+      checkFileStamps(simuJobConf,origJobConf);
+    } else {
+      Assert.assertNull("Configuration has distributed cache visibilites" +
+          "without enabled distributed cache emulation.", simuJobConf.
+          get(GridMixConfig.GRIDMIX_DISTCACHE_VISIBILITIES));
+      Assert.assertNull("Configuration has distributed cache files time " +
+          "stamps without enabled distributed cache emulation.",simuJobConf.
+          get(GridMixConfig.GRIDMIX_DISTCACHE_TIMESTAMP));
+      Assert.assertNull("Configuration has distributed cache files paths" +
+          "without enabled distributed cache emulation.",simuJobConf.
+          get(GridMixConfig.GRIDMIX_DISTCACHE_FILES));
+      Assert.assertNull("Configuration has distributed cache files sizes" +
+          "without enabled distributed cache emulation.",simuJobConf.
+          get(GridMixConfig.GRIDMIX_DISTCACHE_FILESSIZE));
+    }  
+  }
+
+  private void checkFileStamps(JobConf simuJobConf, JobConf origJobConf) {
+    //Verify simulated jobs against distributed cache files time stamps.
+    String [] origDCFTS = origJobConf.get(GridMixConfig.
+        GRIDMIX_DISTCACHE_TIMESTAMP).split(",");
+    String [] simuDCFTS = simuJobConf.get(GridMixConfig.
+        GRIDMIX_DISTCACHE_TIMESTAMP).split(",");
+    for (int index = 0; index < origDCFTS.length; index++) {
+       Assert.assertTrue("Invalid time stamps between original " +
+           "and simulated job",Long.parseLong(origDCFTS[index]) < 
+           Long.parseLong(simuDCFTS[index]));
+    }
+  }
+
+  private void checkFileVisibility(JobConf simuJobConf ) {
+    // Verify simulated jobs against distributed cache files visibilities.
+    String [] distFiles = simuJobConf.get(GridMixConfig.
+        GRIDMIX_DISTCACHE_FILES).split(",");
+    String [] simuDistVisibilities = simuJobConf.get(GridMixConfig.
+        GRIDMIX_DISTCACHE_VISIBILITIES).split(",");
+    List<Boolean> expFileVisibility = new ArrayList<Boolean >();
+    int index = 0;
+    for (String distFile : distFiles) {
+      if (!GridmixSystemTestCase.isLocalDistCache(distFile,
+          simuJobConf.getUser(),Boolean.valueOf(simuDistVisibilities[index]))) {
+        expFileVisibility.add(true);
       } else {
-        Assert.assertEquals("Job reducers have not matched",
-            0, jhInfo.getTotalReduces());
+        expFileVisibility.add(false);
       }
-
-      Assert.assertEquals("Job status has not matched.", 
-          zombieJob.getOutcome().name(), 
-          convertJobStatus(jhInfo.getJobStatus()));
-
-      Assert.assertEquals("Job priority has not matched.", 
-         loggedJob.getPriority().toString(), jhInfo.getPriority());
-
-      if (jobConf.get("gridmix.user.resolve.class").contains("RoundRobin")) {
-         Assert.assertTrue(currJobId + "has not impersonate with other user.",
-             !jhInfo.getUsername().equals(UserGroupInformation.
-             getLoginUser().getShortUserName()));
+      index ++;
+    }
+    index = 0;
+    for (String actFileVisibility :  simuDistVisibilities) {
+      Assert.assertEquals("Simulated job distributed cache file " +
+          "visibilities has not matched.", 
+           expFileVisibility.get(index),Boolean.valueOf(actFileVisibility));
+      index ++;
+    }
+  }
+  
+  private void checkDistcacheFiles(JobConf simuJobConf, JobConf origJobConf) 
+      throws IOException {
+    //Verify simulated jobs against distributed cache files.
+    String [] origDistFiles = origJobConf.get(GridMixConfig.
+            GRIDMIX_DISTCACHE_FILES).split(",");
+    String [] simuDistFiles = simuJobConf.get(GridMixConfig.
+            GRIDMIX_DISTCACHE_FILES).split(",");
+    String [] simuDistVisibilities = simuJobConf.get(GridMixConfig.
+        GRIDMIX_DISTCACHE_VISIBILITIES).split(",");
+    Assert.assertEquals("No. of simulatued job's distcache files " +
+          "haven't matched with no.of original job's distcache files", 
+          origDistFiles.length, simuDistFiles.length);
+
+    int index = 0;
+    for (String simDistFile : simuDistFiles) {
+      Path distPath = new Path(simDistFile);
+      if (!GridmixSystemTestCase.isLocalDistCache(simDistFile, 
+          simuJobConf.getUser(),Boolean.valueOf(simuDistVisibilities[index]))) {
+        FileSystem fs = distPath.getFileSystem(conf);
+        FileStatus fstat = fs.getFileStatus(distPath);
+        FsPermission permission = fstat.getPermission();
+          Assert.assertTrue("HDFS distributed cache file has wrong " + 
+              "permissions for users.", FsAction.READ_WRITE.SYMBOL == 
+              permission.getUserAction().SYMBOL);
+          Assert.assertTrue("HDFS distributed cache file has wrong " + 
+              "permissions for groups.",FsAction.READ.SYMBOL == 
+              permission.getGroupAction().SYMBOL);
+          Assert.assertTrue("HDSFS distributed cache file has wrong " + 
+              "permissions for others.",FsAction.READ.SYMBOL == 
+              permission.getOtherAction().SYMBOL);
       }
+    }
+    index ++;
+  }
 
-      if (jobConf.get("gridmix.job-submission.policy").contains("REPLAY")) {
-        origSubmissionTime.add(zombieJob.getSubmissionTime()); 
-        simuSubmissionTime.add(jhInfo.getSubmitTime());
-      }
+  private void checkFileSizes(JobConf simuJobConf, JobConf origJobConf) {
+    // Verify simulated jobs against distributed cache files size.
+    List<String> origDistFilesSize = Arrays.asList(origJobConf.
+        get(GridMixConfig.GRIDMIX_DISTCACHE_FILESSIZE).split(","));
+    Collections.sort(origDistFilesSize);
+    List<String> simuDistFilesSize = Arrays.asList(simuJobConf.
+        get(GridMixConfig.GRIDMIX_DISTCACHE_FILESSIZE).split(","));
+    Collections.sort(simuDistFilesSize);
+    Assert.assertEquals("Simulated job's file size list has not " + 
+        "matched with the Original job's file size list.",
+        origDistFilesSize.size(),
+        simuDistFilesSize.size());
+    for ( int index = 0; index < origDistFilesSize.size(); index ++ ) {
+       Assert.assertEquals("Simulated job distcache file size has not " +
+        "matched with original job distcache file size.", 
+        origDistFilesSize.get(index), simuDistFilesSize.get(index));
+    }
+  }
 
-      if (!jobConf.get("gridmix.job.type", "LOADJOB").equals("SLEEPJOB")) {
-        
-        //The below statements have commented due to a bug(MAPREDUCE-2135).
-      /*  Assert.assertTrue("Map input bytes have not matched.<exp:[" + 
-            convertBytes(expMapInputBytes) +"]><act:[" + 
-            convertBytes(getCounterValue(counters,"HDFS_BYTES_READ")) + "]>", 
-            convertBytes(expMapInputBytes).equals( 
-            convertBytes(getCounterValue(counters,"HDFS_BYTES_READ"))));
-
-        Assert.assertTrue("Map output bytes has not matched.<exp:[" +
-            convertBytes(expMapOutputBytes) + "]><act:[" +
-            convertBytes(getCounterValue(counters, "MAP_OUTPUT_BYTES")) + "]>", 
-            convertBytes(expMapOutputBytes).equals( 
-            convertBytes(getCounterValue(counters, "MAP_OUTPUT_BYTES"))));*/
-
-        Assert.assertEquals("Map input records have not matched.<exp:[" +
-            expMapInputRecs + "]><act:[" +
-            getCounterValue(counters, "MAP_INPUT_RECORDS") + "]>", 
-            expMapInputRecs, getCounterValue(counters, "MAP_INPUT_RECORDS"));
-
-        // The below statements have commented due to a bug(MAPREDUCE-2154).
-        /*Assert.assertEquals("Map output records have not matched.<exp:[" +
-            expMapOutputRecs + "]><act:[" +
-            getCounterValue(counters, "MAP_OUTPUT_RECORDS") + "]>", 
-            expMapOutputRecs, getCounterValue(counters, "MAP_OUTPUT_RECORDS"));*/
-
-        /*Assert.assertTrue("Reduce input bytes have not matched.<exp:[" +
-            convertBytes(expReduceInputBytes) + "]><act:[" +
-            convertBytes(getCounterValue(counters,"REDUCE_SHUFFLE_BYTES")) + "]>", 
-            convertBytes(expReduceInputBytes).equals( 
-            convertBytes(getCounterValue(counters,"REDUCE_SHUFFLE_BYTES"))));*/ 
-
-        /*Assert.assertEquals("Reduce output bytes have not matched.<exp:[" + 
-            convertBytes(expReduceOutputBytes) + "]><act:[" +
-            convertBytes(getCounterValue(counters,"HDFS_BYTES_WRITTEN")) + "]>", 
-            convertBytes(expReduceOutputBytes).equals( 
-            convertBytes(getCounterValue(counters,"HDFS_BYTES_WRITTEN"))));*/
-
-        /*Assert.assertEquals("Reduce output records have not matched.<exp:[" + 
-            expReduceOutputRecs + "]><act:[" + getCounterValue(counters,
-            "REDUCE_OUTPUT_RECORDS") + "]>", 
-            expReduceOutputRecs, getCounterValue(counters,
-            "REDUCE_OUTPUT_RECORDS"));*/ 
- 
-         /*Assert.assertEquals("Reduce input records have not matched.<exp:[" + 
-            expReduceInputRecs + "]><act:[" + getCounterValue(counters,
-            "REDUCE_INPUT_RECORDS") + "]>",
-            expReduceInputRecs, 
-            getCounterValue(counters,"REDUCE_INPUT_RECORDS"));*/
-        LOG.info("Done.");
-      }
+  /**
+   * It verifies the simulated job map counters.
+   * @param counters - Original job map counters.
+   * @param mapJobCounters - Simulated job map counters.
+   * @param jobConf - Simulated job configuration.
+   * @throws ParseException - If an parser error occurs.
+   */
+  public void verifyJobMapCounters(Counters counters, 
+     Map<String,Long> mapCounters, JobConf jobConf) throws ParseException {
+    if (!jobConf.get("gridmix.job.type", "LOADJOB").equals("SLEEPJOB")) {
+    //The below statements have commented due to a bug(MAPREDUCE-2135).
+    /*Assert.assertTrue("Map input bytes have not matched.<exp:[" +
+          convertBytes(mapCounters.get("MAP_INPUT_BYTES").longValue()) +"]><act:[" +
+          convertBytes(getCounterValue(counters,"HDFS_BYTES_READ")) + "]>",
+          convertBytes(mapCounters.get("MAP_INPUT_BYTES").longValue()).equals(
+          convertBytes(getCounterValue(counters,"HDFS_BYTES_READ"))));
+
+    Assert.assertTrue("Map output bytes has not matched.<exp:[" +
+        convertBytes(mapCounters.get("MAP_OUTPUT_BYTES").longValue()) + "]><act:[" +
+        convertBytes(getCounterValue(counters, "MAP_OUTPUT_BYTES")) + "]>",
+        convertBytes(mapCounters.get("MAP_OUTPUT_BYTES").longValue()).equals(
+        convertBytes(getCounterValue(counters, "MAP_OUTPUT_BYTES"))));*/
+
+    Assert.assertEquals("Map input records have not matched.<exp:[" +
+        mapCounters.get("MAP_INPUT_RECS").longValue() + "]><act:[" +
+        getCounterValue(counters, "MAP_INPUT_RECORDS") + "]>",
+        mapCounters.get("MAP_INPUT_RECS").longValue(), 
+        getCounterValue(counters, "MAP_INPUT_RECORDS"));
+
+    // The below statements have commented due to a bug(MAPREDUCE-2154).
+    /*Assert.assertEquals("Map output records have not matched.<exp:[" +
+          mapCounters.get("MAP_OUTPUT_RECS").longValue() + "]><act:[" +
+          getCounterValue(counters, "MAP_OUTPUT_RECORDS") + "]>",
+          mapCounters.get("MAP_OUTPUT_RECS").longValue(), 
+          getCounterValue(counters, "MAP_OUTPUT_RECORDS"));*/
+    } else {
+      Assert.assertTrue("Map Input Bytes are zero", 
+          getCounterValue(counters,"HDFS_BYTES_READ") != 0);
+      Assert.assertNotNull("Map Input Records are zero", 
+          getCounterValue(counters, "MAP_INPUT_RECORDS")!=0);
+    }
+  }
+
+  /**
+   *  It verifies the simulated job reduce counters.
+   * @param counters - Original job reduce counters.
+   * @param reduceCounters - Simulated job reduce counters.
+   * @param jobConf - simulated job configuration.
+   * @throws ParseException - if an parser error occurs.
+   */
+  public void verifyJobReduceCounters(Counters counters, 
+     Map<String,Long> reduceCounters, JobConf jobConf) throws ParseException {
+    if (!jobConf.get("gridmix.job.type", "LOADJOB").equals("SLEEPJOB")) {
+    /*Assert.assertTrue("Reduce input bytes have not matched.<exp:[" +
+          convertBytes(reduceCounters.get("REDUCE_INPUT_BYTES").longValue()) + "]><act:[" +
+          convertBytes(getCounterValue(counters,"REDUCE_SHUFFLE_BYTES")) + "]>",
+          convertBytes(reduceCounters.get("REDUCE_INPUT_BYTES").longValue()).equals(
+          convertBytes(getCounterValue(counters,"REDUCE_SHUFFLE_BYTES"))));*/
+
+    /*Assert.assertEquals("Reduce output bytes have not matched.<exp:[" +
+          convertBytes(reduceCounters.get("REDUCE_OUTPUT_BYTES").longValue()) + "]><act:[" +
+          convertBytes(getCounterValue(counters,"HDFS_BYTES_WRITTEN")) + "]>",
+          convertBytes(reduceCounters.get("REDUCE_OUTPUT_BYTES").longValue()).equals(
+          convertBytes(getCounterValue(counters,"HDFS_BYTES_WRITTEN"))));*/
+
+    /*Assert.assertEquals("Reduce output records have not matched.<exp:[" +
+          reduceCounters.get("REDUCE_OUTPUT_RECS").longValue() + "]><act:[" + getCounterValue(counters,
+          "REDUCE_OUTPUT_RECORDS") + "]>",
+          reduceCounters.get("REDUCE_OUTPUT_RECS").longValue(), getCounterValue(counters,
+          "REDUCE_OUTPUT_RECORDS"));*/
+
+    /*Assert.assertEquals("Reduce input records have not matched.<exp:[" +
+          reduceCounters.get("REDUCE_INPUT_RECS").longValue() + "]><act:[" + getCounterValue(counters,
+          "REDUCE_INPUT_RECORDS") + "]>",
+          reduceCounters.get("REDUCE_INPUT_RECS").longValue(),
+          getCounterValue(counters,"REDUCE_INPUT_RECORDS"));*/
+    } else {
+      Assert.assertTrue("Reduce output records are not zero for sleep job.",
+          getCounterValue(counters, "REDUCE_OUTPUT_RECORDS") == 0);
+      Assert.assertTrue("Reduce output bytes are not zero for sleep job.", 
+          getCounterValue(counters,"HDFS_BYTES_WRITTEN") == 0);
+    }
+  }
+
+  /**
+   * It verifies the gridmix simulated job summary.
+   * @param zombieJob - Original job summary.
+   * @param jhInfo  - Simulated job history info.
+   * @param jobConf - simulated job configuration.
+   * @throws IOException - if an I/O error occurs.
+   */
+  public void verifySimulatedJobSummary(ZombieJob zombieJob, 
+     JobHistoryParser.JobInfo jhInfo, JobConf jobConf) throws IOException {
+    Assert.assertEquals("Job id has not matched",
+        zombieJob.getJobID(), JobID.forName(
+        jobConf.get("gridmix.job.original-job-id")));
+
+    Assert.assertEquals("Job maps have not matched", 
+        zombieJob.getNumberMaps(), jhInfo.getTotalMaps());
+
+    if (!jobConf.getBoolean("gridmix.sleep.maptask-only",false)) {
+      Assert.assertEquals("Job reducers have not matched",
+          zombieJob.getNumberReduces(), jhInfo.getTotalReduces());
+    } else {
+      Assert.assertEquals("Job reducers have not matched",
+          0, jhInfo.getTotalReduces());
+    }
+
+    Assert.assertEquals("Job status has not matched.", 
+        zombieJob.getOutcome().name(), 
+        convertJobStatus(jhInfo.getJobStatus()));
+
+    LoggedJob loggedJob = zombieJob.getLoggedJob();
+    Assert.assertEquals("Job priority has not matched.", 
+        loggedJob.getPriority().toString(), jhInfo.getPriority());
+
+    if (jobConf.get("gridmix.user.resolve.class").contains("RoundRobin")) {
+       Assert.assertTrue(jhInfo.getJobId().toString() + 
+           " has not impersonate with other user.", !jhInfo.getUsername()
+           .equals(UserGroupInformation.getLoginUser().getShortUserName()));
+    }
+  }
+
+  /**
+   * Get the original job map counters from a trace.
+   * @param zombieJob - Original job story.
+   * @return - map counters as a map.
+   */
+  public Map<String, Long> getJobMapCounters(ZombieJob zombieJob) {
+    long expMapInputBytes = 0;
+    long expMapOutputBytes = 0;
+    long expMapInputRecs = 0;
+    long expMapOutputRecs = 0;
+    Map<String,Long> mapCounters = new HashMap<String,Long>();
+    for (int index = 0; index < zombieJob.getNumberMaps(); index ++) {
+      TaskInfo mapTask = zombieJob.getTaskInfo(TaskType.MAP, index);
+      expMapInputBytes += mapTask.getInputBytes();
+      expMapOutputBytes += mapTask.getOutputBytes();
+      expMapInputRecs += mapTask.getInputRecords();
+      expMapOutputRecs += mapTask.getOutputRecords();
+    }
+    mapCounters.put("MAP_INPUT_BYTES", expMapInputBytes);
+    mapCounters.put("MAP_OUTPUT_BYTES", expMapOutputBytes);
+    mapCounters.put("MAP_INPUT_RECS", expMapInputRecs);
+    mapCounters.put("MAP_OUTPUT_RECS", expMapOutputRecs);
+    return mapCounters;
+  }
+  
+  /**
+   * Get the original job reduce counters from a trace.
+   * @param zombieJob - Original job story.
+   * @return - reduce counters as a map.
+   */
+  public Map<String,Long> getJobReduceCounters(ZombieJob zombieJob) {
+    long expReduceInputBytes = 0;
+    long expReduceOutputBytes = 0;
+    long expReduceInputRecs = 0;
+    long expReduceOutputRecs = 0;
+    Map<String,Long> reduceCounters = new HashMap<String,Long>();
+    for (int index = 0; index < zombieJob.getNumberReduces(); index ++) {
+      TaskInfo reduceTask = zombieJob.getTaskInfo(TaskType.REDUCE, index);
+      expReduceInputBytes += reduceTask.getInputBytes();
+      expReduceOutputBytes += reduceTask.getOutputBytes();
+      expReduceInputRecs += reduceTask.getInputRecords();
+      expReduceOutputRecs += reduceTask.getOutputRecords();
+    }
+    reduceCounters.put("REDUCE_INPUT_BYTES", expReduceInputBytes);
+    reduceCounters.put("REDUCE_OUTPUT_BYTES", expReduceOutputBytes);
+    reduceCounters.put("REDUCE_INPUT_RECS", expReduceInputRecs);
+    reduceCounters.put("REDUCE_OUTPUT_RECS", expReduceOutputRecs);
+    return reduceCounters;
+  }
+
+  /**
+   * Get the simulated job configuration of a job.
+   * @param simulatedJobID - Simulated job id.
+   * @param tmpJHFolder - temporary job history folder location.
+   * @return - simulated job configuration.
+   * @throws IOException - If an I/O error occurs.
+   */
+  public JobConf getSimulatedJobConf(JobID simulatedJobID, File tmpJHFolder) 
+      throws IOException{
+    FileSystem fs = null;
+    try {
+      String historyFilePath = jtClient.getProxy().
+          getJobHistoryLocationForRetiredJob(simulatedJobID);
+      Path jhpath = new Path(historyFilePath);
+      fs = jhpath.getFileSystem(conf);
+      fs.copyToLocalFile(jhpath,new Path(tmpJHFolder.toString()));
+      fs.copyToLocalFile(new Path(historyFilePath + "_conf.xml"), 
+         new Path(tmpJHFolder.toString()));
+      JobConf jobConf = new JobConf();
+      jobConf.addResource(new Path(tmpJHFolder.toString() + "/" + 
+          simulatedJobID + "_conf.xml"));
+      jobConf.reloadConfiguration();
+      return jobConf;
+    }finally {
+      fs.close();
+    }
+  }  
+
+  /**
+   * Get the simulated job history of a job.
+   * @param simulatedJobID - simulated job id.
+   * @return - simulated job information.
+   * @throws IOException - if an I/O error occurs.
+   */
+  public JobHistoryParser.JobInfo getSimulatedJobHistory(JobID simulatedJobID) 
+      throws IOException {
+    FileSystem fs = null;
+    try {
+      String historyFilePath = jtClient.getProxy().
+          getJobHistoryLocationForRetiredJob(simulatedJobID);
+      Path jhpath = new Path(historyFilePath);
+      fs = jhpath.getFileSystem(conf);
+      JobHistoryParser jhparser = new JobHistoryParser(fs, jhpath);
+      JobHistoryParser.JobInfo jhInfo = jhparser.parse();
+      return jhInfo;
+    } finally {
+      fs.close();
     }
   }
 

Modified: hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/UtilsForGridmix.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/UtilsForGridmix.java?rev=1082633&r1=1082632&r2=1082633&view=diff
==============================================================================
--- hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/UtilsForGridmix.java (original)
+++ hadoop/mapreduce/branches/yahoo-merge/src/contrib/gridmix/src/test/system/org/apache/hadoop/mapred/gridmix/test/system/UtilsForGridmix.java Thu Mar 17 18:44:24 2011
@@ -266,7 +266,8 @@ public class UtilsForGridmix {
      JobStatus js = jobStatus[numJobs - index];
      JobID jobid = js.getJobID();
      String jobName = js.getJobName();
-     if (!jobName.equals("GRIDMIX_GENERATE_INPUT_DATA")) {
+     if (!jobName.equals("GRIDMIX_GENERATE_INPUT_DATA") && 
+          !jobName.equals("GRIDMIX_GENERATE_DISTCACHE_DATA")) {
        jobids.add(jobid);
      }
    }