You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:09:33 UTC

svn commit: r1077383 - in /hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src: java/org/apache/hadoop/mapred/gridmix/ test/org/apache/hadoop/mapred/gridmix/

Author: omalley
Date: Fri Mar  4 04:09:33 2011
New Revision: 1077383

URL: http://svn.apache.org/viewvc?rev=1077383&view=rev
Log:
commit 5f9d982a5ed9525686877955b5dbed450468a746
Author: Rahul Kumar Singh <rk...@yahoo-inc.com>
Date:   Sat Apr 10 17:19:45 2010 +0530

     MAPREDUCE:1526 from https://issues.apache.org/jira/secure/attachment/12440983/1594-yhadoop-20-1xx-1-5.patc://issues.apache.org/jira/secure/attachment/12441333/1526-yhadoop-20-101-4.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-1526. Cache the job related information while submitting the job
    +

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/UserResolver.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java?rev=1077383&r1=1077382&r2=1077383&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Gridmix.java Fri Mar  4 04:09:33 2011
@@ -159,7 +159,7 @@ public class Gridmix extends Configured 
       GridmixJobSubmissionPolicy policy = GridmixJobSubmissionPolicy.getPolicy(
         conf, GridmixJobSubmissionPolicy.STRESS);
       LOG.info(" Submission policy is " + policy.name());
-      statistics = new Statistics(conf, policy.getPollingInterval(), startFlag,userResolver);
+      statistics = new Statistics(conf, policy.getPollingInterval(), startFlag);
       monitor = createJobMonitor(statistics);
       int noOfSubmitterThreads = (policy == GridmixJobSubmissionPolicy.SERIAL) ? 1
           : Runtime.getRuntime().availableProcessors() + 1;
@@ -168,7 +168,7 @@ public class Gridmix extends Configured 
         monitor, conf.getInt(
           GRIDMIX_SUB_THR, noOfSubmitterThreads), conf.getInt(
           GRIDMIX_QUE_DEP, 5), new FilePool(
-          conf, ioPath), userResolver);
+          conf, ioPath), userResolver,statistics);
       
       factory = createJobFactory(
         submitter, traceIn, scratchDir, conf, startFlag, userResolver);
@@ -190,9 +190,10 @@ public class Gridmix extends Configured 
     return new JobMonitor(stats);
   }
 
-  protected JobSubmitter createJobSubmitter(JobMonitor monitor, int threads,
-      int queueDepth, FilePool pool,UserResolver resolver) throws IOException {
-    return new JobSubmitter(monitor, threads, queueDepth, pool, resolver);
+  protected JobSubmitter createJobSubmitter(
+    JobMonitor monitor, int threads, int queueDepth, FilePool pool,
+    UserResolver resolver, Statistics statistics) throws IOException {
+    return new JobSubmitter(monitor, threads, queueDepth, pool, statistics);
   }
 
   protected JobFactory createJobFactory(

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java?rev=1077383&r1=1077382&r2=1077383&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/GridmixJob.java Fri Mar  4 04:09:33 2011
@@ -75,6 +75,7 @@ abstract class GridmixJob implements Cal
   protected final long submissionTimeNanos;
   private static final ConcurrentHashMap<Integer,List<InputSplit>> descCache =
      new ConcurrentHashMap<Integer,List<InputSplit>>();
+  protected static final String GRIDMIX_JOB_SEQ = "gridmix.job.seq";
 
   public GridmixJob(
     final Configuration conf, long submissionMillis, final JobStory jobdesc,
@@ -89,7 +90,7 @@ abstract class GridmixJob implements Cal
         public Job run() throws IOException {
           Job ret = new Job(conf, nameFormat.get().format("%05d", seq)
               .toString());
-          ret.getConfiguration().setInt("gridmix.job.seq", seq);
+          ret.getConfiguration().setInt(GRIDMIX_JOB_SEQ, seq);
           ret.getConfiguration().set(ORIGNAME,
               null == jobdesc.getJobID() ? "<unknown>" : jobdesc.getJobID()
                   .toString());
@@ -160,8 +161,7 @@ abstract class GridmixJob implements Cal
   }
 
   static List<InputSplit> pullDescription(JobContext jobCtxt) {
-    return pullDescription(jobCtxt.getConfiguration().getInt(
-        "gridmix.job.seq", -1));
+    return pullDescription(GridmixJob.getJobSeqId(jobCtxt));
   }
   
   static List<InputSplit> pullDescription(int seq) {
@@ -211,6 +211,10 @@ abstract class GridmixJob implements Cal
     return id();
   }
 
+  static int getJobSeqId(JobContext job) {
+    return job.getConfiguration().getInt(GRIDMIX_JOB_SEQ,-1);
+  }
+
   public static class DraftPartitioner<V> extends Partitioner<GridmixKey,V> {
     public int getPartition(GridmixKey key, V value, int numReduceTasks) {
       return key.getPartition();

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java?rev=1077383&r1=1077382&r2=1077383&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/JobSubmitter.java Fri Mar  4 04:09:33 2011
@@ -17,6 +17,9 @@
  */
 package org.apache.hadoop.mapred.gridmix;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 import java.io.IOException;
 import java.nio.channels.ClosedByInterruptException;
 import java.util.concurrent.ExecutorService;
@@ -25,12 +28,6 @@ import java.util.concurrent.RejectedExec
 import java.util.concurrent.Semaphore;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.security.PrivilegedExceptionAction;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.security.UserGroupInformation;
 
 /**
  * Component accepting deserialized job traces, computing split data, and
@@ -43,12 +40,12 @@ class JobSubmitter implements Gridmix.Co
 
   public static final Log LOG = LogFactory.getLog(JobSubmitter.class);
 
-  final Semaphore sem;
+  private final Semaphore sem;
   private final FilePool inputDir;
   private final JobMonitor monitor;
+  private final Statistics statistics;
   private final ExecutorService sched;
   private volatile boolean shutdown = false;
-  private final UserResolver resolver;
 
   /**
    * Initialize the submission component with downstream monitor and pool of
@@ -59,16 +56,18 @@ class JobSubmitter implements Gridmix.Co
    * @param queueDepth Max depth of pending work queue
    *   See {@link Gridmix#GRIDMIX_QUE_DEP}.
    * @param inputDir Set of files from which split data may be mined for
-   *   synthetic jobs.
+   * synthetic job
+   * @param statistics
    */
-  public JobSubmitter(JobMonitor monitor, int threads, int queueDepth,
-      FilePool inputDir, UserResolver resolver) {
+  public JobSubmitter(
+    JobMonitor monitor, int threads, int queueDepth, FilePool inputDir,
+    Statistics statistics) {
     sem = new Semaphore(queueDepth);
     sched = new ThreadPoolExecutor(threads, threads, 0L,
         TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
     this.inputDir = inputDir;
     this.monitor = monitor;
-    this.resolver = resolver;
+    this.statistics = statistics;
   }
 
   /**
@@ -106,6 +105,7 @@ class JobSubmitter implements Gridmix.Co
         try {
           // submit job
           monitor.add(job.call());
+          statistics.addJobStats(job.getJob(), job.getJobDesc());
           LOG.debug("SUBMIT " + job + "@" + System.currentTimeMillis() +
               " (" + job.getJob().getJobID() + ")");
         } catch (IOException e) {
@@ -132,7 +132,7 @@ class JobSubmitter implements Gridmix.Co
         monitor.submissionFailed(job.getJob());
       } finally {
         sem.release();
-      }                               
+      }
     }
   }
 
@@ -154,6 +154,7 @@ class JobSubmitter implements Gridmix.Co
 
   /**
    * (Re)scan the set of input files from which splits are derived.
+   * @throws java.io.IOException
    */
   public void refreshFilePool() throws IOException {
     inputDir.refresh();

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java?rev=1077383&r1=1077382&r2=1077383&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/Statistics.java Fri Mar  4 04:09:33 2011
@@ -17,29 +17,28 @@
  */
 package org.apache.hadoop.mapred.gridmix;
 
-import org.apache.hadoop.mapred.gridmix.Gridmix.Component;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobClient;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.ClusterStatus;
+import org.apache.hadoop.mapred.JobClient;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.gridmix.Gridmix.Component;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapred.JobStatus;
-import org.apache.hadoop.mapred.TaskReport;
-import org.apache.hadoop.mapreduce.JobID;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.tools.rumen.JobStory;
 
+import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantLock;
 import java.util.concurrent.locks.Condition;
-import java.io.IOException;
+import java.util.concurrent.locks.ReentrantLock;
 
 /**
  * Component collecting the stats required by other components
@@ -63,6 +62,10 @@ public class Statistics implements Compo
   private final List<StatListener<JobStats>> jobStatListeners =
     new ArrayList<StatListener<JobStats>>();
 
+  //List of jobids and noofMaps for each job
+  private static final Map<Integer, JobStats> jobMaps =
+    new ConcurrentHashMap<Integer,JobStats>();
+
   private int completedJobsInCurrentInterval = 0;
   private final int jtPollingInterval;
   private volatile boolean shutdown = false;
@@ -72,37 +75,42 @@ public class Statistics implements Compo
   private final ReentrantLock lock = new ReentrantLock();
   private final Condition jobCompleted = lock.newCondition();
   private final CountDownLatch startFlag;
-  private final UserResolver userResolver;
-  private static Map<JobID, TaskReport[]> jobTaskReports =
-    new ConcurrentHashMap<JobID, TaskReport[]>();
 
   public Statistics(
-    final Configuration conf, int pollingInterval, CountDownLatch startFlag,UserResolver userResolver)
-    throws IOException {
+    final Configuration conf, int pollingInterval, CountDownLatch startFlag)
+    throws IOException, InterruptedException {
     UserGroupInformation ugi = UserGroupInformation.getLoginUser();
-    this.userResolver = userResolver;
-    try {
       this.cluster = ugi.doAs(new PrivilegedExceptionAction<JobClient>(){
-        public JobClient run() {
-          try {
-            return new JobClient(new JobConf(conf));
-          } catch (IOException e) {
-            LOG.error(" error while createing job client " + e.getMessage());
-          }
-          return null;
+        public JobClient run() throws IOException {
+          return new JobClient(new JobConf(conf));
         }
       });
-    } catch (InterruptedException e) {
-      LOG.error(" Exception in statisitics " + e.getMessage());
-    } catch (IOException e) {
-      LOG.error("Exception in statistics " + e.getMessage());
-    }
+
     this.jtPollingInterval = pollingInterval;
     maxJobCompletedInInterval = conf.getInt(
       MAX_JOBS_COMPLETED_IN_POLL_INTERVAL_KEY, 1);
     this.startFlag = startFlag;
   }
 
+  public void addJobStats(Job job, JobStory jobdesc) {
+    int seq = GridmixJob.getJobSeqId(job);
+    if (seq < 0) {
+      LOG.info("Not tracking job " + job.getJobName()
+          + " as seq id is less than zero: " + seq);
+      return;
+    }
+    
+    int maps = 0;
+    if (jobdesc == null) {
+      throw new IllegalArgumentException(
+        " JobStory not available for job " + job.getJobName());
+    } else {
+      maps = jobdesc.getNumberMaps();
+    }
+    JobStats stats = new JobStats(maps,job);
+    jobMaps.put(seq,stats);
+  }
+
   /**
    * Used by JobMonitor to add the completed job.
    */
@@ -114,27 +122,25 @@ public class Statistics implements Compo
     if(!statistics.isAlive()) {
       return;
     }
+    JobStats stat = jobMaps.remove(GridmixJob.getJobSeqId(job));
+
+    if (stat == null) return;
+    
     completedJobsInCurrentInterval++;
-    if (job.getJobID() != null) {
-      jobTaskReports.remove(job.getJobID());
-    }
+
     //check if we have reached the maximum level of job completions.
     if (completedJobsInCurrentInterval >= maxJobCompletedInInterval) {
       if (LOG.isDebugEnabled()) {
         LOG.debug(
-          " Reached maximum limit of jobs in a polling interval " +
+          "Reached maximum limit of jobs in a polling interval " +
             completedJobsInCurrentInterval);
       }
       completedJobsInCurrentInterval = 0;
       lock.lock();
       try {
         //Job is completed notify all the listeners.
-        if (jobStatListeners.size() > 0) {
-          for (StatListener<JobStats> l : jobStatListeners) {
-            JobStats stats = new JobStats();
-            stats.setCompleteJob(job);
-            l.update(stats);
-          }
+        for (StatListener<JobStats> l : jobStatListeners) {
+          l.update(stat);
         }
         this.jobCompleted.signalAll();
       } finally {
@@ -145,7 +151,6 @@ public class Statistics implements Compo
 
   //TODO: We have just 2 types of listeners as of now . If no of listeners
   //increase then we should move to map kind of model.
-
   public void addClusterStatsObservers(StatListener<ClusterStats> listener) {
     clusterStatlisteners.add(listener);
   }
@@ -197,93 +202,26 @@ public class Statistics implements Compo
         if (clusterStatlisteners.size() > 0) {
           try {
             ClusterStatus clusterStatus = cluster.getClusterStatus();
-            JobStatus[] allJobs = cluster.getAllJobs();
-            List<JobStatus> runningWaitingJobs = getRunningWaitingJobs(allJobs);
-            getJobReports(runningWaitingJobs);
             updateAndNotifyClusterStatsListeners(
-              clusterStatus, runningWaitingJobs);
+              clusterStatus);
           } catch (IOException e) {
             LOG.error(
               "Statistics io exception while polling JT ", e);
             return;
-          } catch (InterruptedException e) {
-            LOG.error(
-              "Statistics interrupt exception while polling JT ", e);
-            return;
           }
         }
       }
     }
 
     private void updateAndNotifyClusterStatsListeners(
-      ClusterStatus clusterStatus, List<JobStatus> runningWaitingJobs) {
+      ClusterStatus clusterStatus) {
       ClusterStats stats = ClusterStats.getClusterStats();
       stats.setClusterMetric(clusterStatus);
-      stats.setRunningWaitingJobs(runningWaitingJobs);
       for (StatListener<ClusterStats> listener : clusterStatlisteners) {
         listener.update(stats);
       }
     }
 
-    private void getJobReports(List<JobStatus> jobs) throws IOException {
-      for (final JobStatus job : jobs) {
-
-        final UserGroupInformation user = userResolver.getTargetUgi(
-          UserGroupInformation.createRemoteUser(job.getUsername()));
-        try {
-          user.doAs(
-            new PrivilegedExceptionAction<Void>() {
-              public Void run() {
-                JobID id = job.getJobID();
-                if (!jobTaskReports.containsKey(id)) {
-                  try {
-                    jobTaskReports.put(
-                      id, cluster.getMapTaskReports(
-                        org.apache.hadoop.mapred.JobID.downgrade(id)));
-                  } catch (IOException e) {
-                    LOG.error(
-                      " Couldnt get the MapTaskResports for " + job.getJobId() +
-                        " job username "+ job.getUsername() +" cause " + user);
-                  }
-                }
-                return null;
-              }
-            });
-        } catch (InterruptedException e) {
-          LOG.error(
-            " Could nt get information for user " + user + " and job " +
-              job.getJobId());
-        } catch (IOException e) {
-          LOG.error(
-            " Could nt get information for user " + user + " and job " +
-              job.getJobId());
-          throw new IOException(e);
-        }
-      }
-    }
-
-    /**
-     * From the list of Jobs , give the list of jobs whoes state is eigther
-     * PREP or RUNNING.
-     *
-     * @param allJobs
-     * @return
-     * @throws java.io.IOException
-     * @throws InterruptedException
-     */
-    private List<JobStatus> getRunningWaitingJobs(JobStatus[] allJobs)
-      throws IOException, InterruptedException {
-      List<JobStatus> result = new ArrayList<JobStatus>();
-      for (JobStatus job : allJobs) {
-        //TODO Check if job.getStatus() makes a rpc call
-        int state = job.getRunState();
-        if (JobStatus.PREP == state || JobStatus.RUNNING == state) {
-          result.add(job);
-        }
-      }
-      return result;
-    }
-
   }
 
   /**
@@ -298,7 +236,7 @@ public class Statistics implements Compo
   @Override
   public void shutdown() {
     shutdown = true;
-    jobTaskReports.clear();
+    jobMaps.clear();
     clusterStatlisteners.clear();
     jobStatListeners.clear();
     statistics.interrupt();
@@ -307,7 +245,7 @@ public class Statistics implements Compo
   @Override
   public void abort() {
     shutdown = true;
-    jobTaskReports.clear();
+    jobMaps.clear();
     clusterStatlisteners.clear();
     jobStatListeners.clear();
     statistics.interrupt();
@@ -319,21 +257,31 @@ public class Statistics implements Compo
    * TODO: In future we need to extend this to send more information.
    */
   static class JobStats {
-    private Job completedJob;
+    private int noOfMaps;
+    private Job job;
 
-    public Job getCompleteJob() {
-      return completedJob;
+    public JobStats(int noOfMaps,Job job){
+      this.job = job;
+      this.noOfMaps = noOfMaps;
+    }
+    public int getNoOfMaps() {
+      return noOfMaps;
     }
 
-    public void setCompleteJob(Job job) {
-      this.completedJob = job;
+    /**
+     * Returns the job ,
+     * We should not use job.getJobID it returns null in 20.1xx.
+     * Use (GridmixJob.getJobSeqId(job)) instead
+     * @return job
+     */
+    public Job getJob() {
+      return job;
     }
   }
 
   static class ClusterStats {
     private ClusterStatus status = null;
     private static ClusterStats stats = new ClusterStats();
-    private List<JobStatus> runningWaitingJobs;
 
     private ClusterStats() {
 
@@ -360,20 +308,15 @@ public class Statistics implements Compo
       return status;
     }
 
+    int getNumRunningJob() {
+      return jobMaps.size();
+    }
+
     /**
      * @return runningWatitingJobs
      */
-    public List<JobStatus> getRunningWaitingJobs() {
-      return runningWaitingJobs;
-    }
-
-    public void setRunningWaitingJobs(List<JobStatus> runningWaitingJobs) {
-      this.runningWaitingJobs = runningWaitingJobs;
+    static Collection<JobStats> getRunningJobStats() {
+      return jobMaps.values();
     }
-
-    public Map<JobID, TaskReport[]> getJobReports() {
-      return jobTaskReports;
-    }
-
   }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java?rev=1077383&r1=1077382&r2=1077383&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/StressJobFactory.java Fri Mar  4 04:09:33 2011
@@ -17,27 +17,26 @@
  */
 package org.apache.hadoop.mapred.gridmix;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.mapred.ClusterStatus;
-import org.apache.hadoop.mapred.JobStatus;
+import org.apache.hadoop.mapred.gridmix.Statistics.ClusterStats;
+import org.apache.hadoop.mapred.gridmix.Statistics.JobStats;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.tools.rumen.JobStory;
 import org.apache.hadoop.tools.rumen.JobStoryProducer;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
 import java.io.IOException;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.locks.Condition;
-import java.util.List;
 
 public class StressJobFactory extends JobFactory<Statistics.ClusterStats> {
   public static final Log LOG = LogFactory.getLog(StressJobFactory.class);
 
   private LoadStatus loadStatus = new LoadStatus();
-  private List<JobStatus> runningWaitingJobs;
   private final Condition overloaded = this.lock.newCondition();
   /**
    * The minimum ratio between pending+running map tasks (aka. incomplete map
@@ -45,7 +44,7 @@ public class StressJobFactory extends Jo
    * overloaded. For running maps, we only count them partially. Namely, a 40%
    * completed map is counted as 0.6 map tasks in our calculation.
    */
-  static final float OVERLAOD_MAPTASK_MAPSLOT_RATIO = 2.0f;
+  static final float OVERLOAD_MAPTASK_MAPSLOT_RATIO = 2.0f;
 
   /**
    * Creating a new instance does not start the thread.
@@ -115,7 +114,7 @@ public class StressJobFactory extends Jo
             }
 
             int noOfSlotsAvailable = loadStatus.numSlotsBackfill;
-            LOG.info(" No of slots to be backfilled are " + noOfSlotsAvailable);
+            LOG.info("No of slots to be backfilled are " + noOfSlotsAvailable);
 
             for (int i = 0; i < noOfSlotsAvailable; i++) {
               try {
@@ -133,7 +132,7 @@ public class StressJobFactory extends Jo
                       UserGroupInformation.createRemoteUser(
                         job.getUser())), sequence.getAndIncrement()));
               } catch (IOException e) {
-                LOG.error(" EXCEPTOIN in availableSlots ", e);
+                LOG.error("Error while submitting the job ", e);
                 error = e;
                 return;
               }
@@ -152,7 +151,6 @@ public class StressJobFactory extends Jo
   }
 
   /**
-   * <p/>
    * STRESS Once you get the notification from StatsCollector.Collect the
    * clustermetrics. Update current loadStatus with new load status of JT.
    *
@@ -163,11 +161,11 @@ public class StressJobFactory extends Jo
     lock.lock();
     try {
       ClusterStatus clusterMetrics = item.getStatus();
-      LoadStatus newStatus;
-      runningWaitingJobs = item.getRunningWaitingJobs();
-      newStatus = checkLoadAndGetSlotsToBackfill(item, clusterMetrics);
-      loadStatus.isOverloaded = newStatus.isOverloaded;
-      loadStatus.numSlotsBackfill = newStatus.numSlotsBackfill;
+      try {
+        checkLoadAndGetSlotsToBackfill(item,clusterMetrics);
+      } catch (IOException e) {
+        LOG.error("Couldn't get the new Status",e);
+      }
       overloaded.signalAll();
     } finally {
       lock.unlock();
@@ -178,47 +176,44 @@ public class StressJobFactory extends Jo
    * We try to use some light-weight mechanism to determine cluster load.
    *
    * @param stats
-   * @param clusterStatus
-   * @return Whether, from job client perspective, the cluster is overloaded.
+   * @param clusterStatus Cluster status
+   * @throws java.io.IOException
    */
-  private LoadStatus checkLoadAndGetSlotsToBackfill(
-    Statistics.ClusterStats stats, ClusterStatus clusterStatus) {
-    LoadStatus loadStatus = new LoadStatus();
+  private void checkLoadAndGetSlotsToBackfill(
+    ClusterStats stats, ClusterStatus clusterStatus) throws IOException {
     // If there are more jobs than number of task trackers, we assume the
-    // cluster is overloaded. This is to bound the memory usage of the
-    // simulator job tracker, in situations where we have jobs with small
-    // number of map tasks and large number of reduce tasks.
-    if (runningWaitingJobs.size() >= clusterStatus.getTaskTrackers()) {
+    // cluster is overloaded. 
+    if (stats.getNumRunningJob() >= clusterStatus.getTaskTrackers()) {
       if (LOG.isDebugEnabled()) {
         LOG.debug(
           System.currentTimeMillis() + " Overloaded is " +
             Boolean.TRUE.toString() + " #runningJobs >= taskTrackerCount (" +
-            runningWaitingJobs.size() + " >= " +
+            stats.getNumRunningJob() + " >= " +
             clusterStatus.getTaskTrackers() + " )\n");
       }
       loadStatus.isOverloaded = true;
       loadStatus.numSlotsBackfill = 0;
-      return loadStatus;
+      return;
     }
 
     float incompleteMapTasks = 0; // include pending & running map tasks.
-    for (JobStatus job : runningWaitingJobs) {
-      incompleteMapTasks += (1 - Math.min(
-        job.mapProgress(), 1.0)) * stats.getJobReports().get(
-        job.getJobID()).length;
+    for (JobStats job : ClusterStats.getRunningJobStats()) {
+      float mapProgress = job.getJob().mapProgress();
+      int noOfMaps = job.getNoOfMaps();
+      incompleteMapTasks += (1 - Math.min(mapProgress,1.0))* noOfMaps;
     }
 
     float overloadedThreshold =
-      OVERLAOD_MAPTASK_MAPSLOT_RATIO * clusterStatus.getMaxMapTasks();
+      OVERLOAD_MAPTASK_MAPSLOT_RATIO * clusterStatus.getMaxMapTasks();
     boolean overloaded = incompleteMapTasks > overloadedThreshold;
     String relOp = (overloaded) ? ">" : "<=";
     if (LOG.isDebugEnabled()) {
-      LOG.info(
+      LOG.debug(
         System.currentTimeMillis() + " Overloaded is " + Boolean.toString(
           overloaded) + " incompleteMapTasks " + relOp + " " +
-          OVERLAOD_MAPTASK_MAPSLOT_RATIO + "*mapSlotCapacity" + "(" +
+          OVERLOAD_MAPTASK_MAPSLOT_RATIO + "*mapSlotCapacity" + "(" +
           incompleteMapTasks + " " + relOp + " " +
-          OVERLAOD_MAPTASK_MAPSLOT_RATIO + "*" +
+          OVERLOAD_MAPTASK_MAPSLOT_RATIO + "*" +
           clusterStatus.getMaxMapTasks() + ")");
     }
     if (overloaded) {
@@ -233,12 +228,11 @@ public class StressJobFactory extends Jo
     if (LOG.isDebugEnabled()) {
       LOG.debug("Current load Status is " + loadStatus);
     }
-    return loadStatus;
   }
 
   static class LoadStatus {
-    volatile boolean isOverloaded = false;
-    volatile int numSlotsBackfill = -1;
+    boolean isOverloaded = false;
+    int numSlotsBackfill = -1;
 
     public String toString() {
       return " is Overloaded " + isOverloaded + " no of slots available " +

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/UserResolver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/UserResolver.java?rev=1077383&r1=1077382&r2=1077383&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/UserResolver.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/java/org/apache/hadoop/mapred/gridmix/UserResolver.java Fri Mar  4 04:09:33 2011
@@ -39,8 +39,7 @@ public interface UserResolver {
    * Configure the user map given the URI and configuration. The resolver's
    * contract will define how the resource will be interpreted, but the default
    * will typically interpret the URI as a {@link org.apache.hadoop.fs.Path}
-   * listing target users. The format of this file is defined by {@link
-   * #parseUserList}.
+   * listing target users. 
    * @param userdesc URI (possibly null) from which user information may be
    * loaded per the subclass contract.
    * @param conf The tool configuration.

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java?rev=1077383&r1=1077382&r2=1077383&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/gridmix/src/test/org/apache/hadoop/mapred/gridmix/TestGridmixSubmission.java Fri Mar  4 04:09:33 2011
@@ -352,7 +352,6 @@ public class TestGridmixSubmission {
     conf = new Configuration();
       conf.setEnum(GridmixJobSubmissionPolicy.JOB_SUBMISSION_POLICY,policy);
     conf = GridmixTestUtils.mrCluster.createJobConf(new JobConf(conf));
-//    GridmixTestUtils.createHomeAndStagingDirectory((JobConf)conf);
     // allow synthetic users to create home directories
     GridmixTestUtils.dfs.mkdirs(root, new FsPermission((short)0777));
     GridmixTestUtils.dfs.setPermission(root, new FsPermission((short)0777));