You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by am...@apache.org on 2011/09/28 13:49:35 UTC

svn commit: r1176835 - in /hadoop/common/branches/branch-0.20-security: ./ src/core/org/apache/hadoop/util/ src/mapred/ src/mapred/org/apache/hadoop/mapred/ src/mapred/org/apache/hadoop/mapreduce/ src/test/org/apache/hadoop/mapred/ src/test/org/apache/...

Author: amarrk
Date: Wed Sep 28 11:49:34 2011
New Revision: 1176835

URL: http://svn.apache.org/viewvc?rev=1176835&view=rev
Log:
Backport of MAPREDUCE-220 and MAPREDUCE-2469. Includes adding cumulative CPU usage and total heap usage to task conters. (amarrk)

Modified:
    hadoop/common/branches/branch-0.20-security/CHANGES.txt
    hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java
    hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/util/MemoryCalculatorPlugin.java
    hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java
    hadoop/common/branches/branch-0.20-security/src/mapred/mapred-default.xml
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/Counter.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestCounters.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestJobCounters.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTTMemoryReporting.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java

Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1176835&r1=1176834&r2=1176835&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Wed Sep 28 11:49:34 2011
@@ -4,6 +4,9 @@ Release 0.20.206.0 - unreleased
 
   NEW FEATURES
 
+    MAPREDUCE-2777. Backport of MAPREDUCE-220 and MAPREDUCE-2469.
+                    Includes adding cumulative CPU usage and total heap 
+                    usage to task conters. (amarrk)
   BUG FIXES
 
     HDFS-2305. Running multiple 2NNs can result in corrupt file system. (atm)

Modified: hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java?rev=1176835&r1=1176834&r2=1176835&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/util/LinuxMemoryCalculatorPlugin.java Wed Sep 28 11:49:34 2011
@@ -18,115 +18,28 @@
 
 package org.apache.hadoop.util;
 
-import java.io.BufferedReader;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.IOException;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
 /**
  * Plugin to calculate virtual and physical memories on Linux systems.
+ * @deprecated Use {@link org.apache.hadoop.util.LinuxResourceCalculatorPlugin}
+ *             instead
  */
+@Deprecated
 public class LinuxMemoryCalculatorPlugin extends MemoryCalculatorPlugin {
-  private static final Log LOG =
-      LogFactory.getLog(LinuxMemoryCalculatorPlugin.class);
-
-  /**
-   * proc's meminfo virtual file has keys-values in the format
-   * "key:[ \t]*value[ \t]kB".
-   */
-  private static final String PROCFS_MEMFILE = "/proc/meminfo";
-  private static final Pattern PROCFS_MEMFILE_FORMAT =
-      Pattern.compile("^([a-zA-Z]*):[ \t]*([0-9]*)[ \t]kB");
-
-  // We just need the values for the keys MemTotal and SwapTotal
-  private static final String MEMTOTAL_STRING = "MemTotal";
-  private static final String SWAPTOTAL_STRING = "SwapTotal";
-
-  private long ramSize = 0;
-  private long swapSize = 0;
-
-  boolean readMemInfoFile = false;
-
-  private void readProcMemInfoFile() {
-
-    if (readMemInfoFile) {
-      return;
-    }
-
-    // Read "/proc/memInfo" file
-    BufferedReader in = null;
-    FileReader fReader = null;
-    try {
-      fReader = new FileReader(PROCFS_MEMFILE);
-      in = new BufferedReader(fReader);
-    } catch (FileNotFoundException f) {
-      // shouldn't happen....
-      return;
-    }
-
-    Matcher mat = null;
-
-    try {
-      String str = in.readLine();
-      while (str != null) {
-        mat = PROCFS_MEMFILE_FORMAT.matcher(str);
-        if (mat.find()) {
-          if (mat.group(1).equals(MEMTOTAL_STRING)) {
-            ramSize = Long.parseLong(mat.group(2));
-          } else if (mat.group(1).equals(SWAPTOTAL_STRING)) {
-            swapSize = Long.parseLong(mat.group(2));
-          }
-        }
-        str = in.readLine();
-      }
-    } catch (IOException io) {
-      LOG.warn("Error reading the stream " + io);
-    } finally {
-      // Close the streams
-      try {
-        fReader.close();
-        try {
-          in.close();
-        } catch (IOException i) {
-          LOG.warn("Error closing the stream " + in);
-        }
-      } catch (IOException i) {
-        LOG.warn("Error closing the stream " + fReader);
-      }
-    }
-
-    readMemInfoFile = true;
+  private LinuxResourceCalculatorPlugin resourceCalculatorPlugin;
+  // Use everything from LinuxResourceCalculatorPlugin
+  public LinuxMemoryCalculatorPlugin() {
+    resourceCalculatorPlugin = new LinuxResourceCalculatorPlugin();
   }
 
   /** {@inheritDoc} */
   @Override
   public long getPhysicalMemorySize() {
-    readProcMemInfoFile();
-    return ramSize * 1024;
+    return resourceCalculatorPlugin.getPhysicalMemorySize();
   }
 
   /** {@inheritDoc} */
   @Override
   public long getVirtualMemorySize() {
-    readProcMemInfoFile();
-    return (ramSize + swapSize) * 1024;
-  }
-
-  /**
-   * Test the {@link LinuxMemoryCalculatorPlugin}
-   * 
-   * @param args
-   */
-  public static void main(String[] args) {
-    LinuxMemoryCalculatorPlugin plugin = new LinuxMemoryCalculatorPlugin();
-    System.out.println("Physical memory Size(bytes) : "
-        + plugin.getPhysicalMemorySize());
-    System.out.println("Total Virtual memory Size(bytes) : "
-        + plugin.getVirtualMemorySize());
+    return resourceCalculatorPlugin.getVirtualMemorySize();
   }
 }
\ No newline at end of file

Modified: hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/util/MemoryCalculatorPlugin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/util/MemoryCalculatorPlugin.java?rev=1176835&r1=1176834&r2=1176835&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/util/MemoryCalculatorPlugin.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/util/MemoryCalculatorPlugin.java Wed Sep 28 11:49:34 2011
@@ -24,7 +24,11 @@ import org.apache.hadoop.conf.Configured
 /**
  * Plugin to calculate virtual and physical memories on the system.
  * 
+ * @deprecated Use
+ *             {@link org.apache.hadoop.util.ResourceCalculatorPlugin}
+ *             instead
  */
+@Deprecated
 public abstract class MemoryCalculatorPlugin extends Configured {
 
   /**

Modified: hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java?rev=1176835&r1=1176834&r2=1176835&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/util/ProcfsBasedProcessTree.java Wed Sep 28 11:49:34 2011
@@ -33,7 +33,7 @@ import java.util.LinkedList;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
 
 /**
  * A Proc file-system based ProcessTree. Works only on Linux.
@@ -44,17 +44,49 @@ public class ProcfsBasedProcessTree exte
       .getLog(ProcfsBasedProcessTree.class);
 
   private static final String PROCFS = "/proc/";
-  private static final Pattern PROCFS_STAT_FILE_FORMAT = Pattern
-      .compile("^([0-9-]+)\\s([^\\s]+)\\s[^\\s]\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+\\s){16}([0-9]+)(\\s[0-9-]+){16}");
+  private static final Pattern PROCFS_STAT_FILE_FORMAT = Pattern .compile(
+    "^([0-9-]+)\\s([^\\s]+)\\s[^\\s]\\s([0-9-]+)\\s([0-9-]+)\\s([0-9-]+)\\s" +
+    "([0-9-]+\\s){7}([0-9]+)\\s([0-9]+)\\s([0-9-]+\\s){7}([0-9]+)\\s([0-9]+)" +
+    "(\\s[0-9-]+){15}");
 
   static final String PROCFS_STAT_FILE = "stat";
   static final String PROCFS_CMDLINE_FILE = "cmdline";
+  public static final long PAGE_SIZE;
+  static {
+    ShellCommandExecutor shellExecutor =
+            new ShellCommandExecutor(new String[]{"getconf",  "PAGESIZE"});
+    long pageSize = -1;
+    try {
+      shellExecutor.execute();
+      pageSize = Long.parseLong(shellExecutor.getOutput().replace("\n", ""));
+    } catch (IOException e) {
+      LOG.error(StringUtils.stringifyException(e));
+    } finally {
+      PAGE_SIZE = pageSize;
+    }
+  }
+  public static final long JIFFY_LENGTH_IN_MILLIS; // in millisecond
+  static {
+    ShellCommandExecutor shellExecutor =
+            new ShellCommandExecutor(new String[]{"getconf",  "CLK_TCK"});
+    long jiffiesPerSecond = -1;
+    try {
+      shellExecutor.execute();
+      jiffiesPerSecond = Long.parseLong(shellExecutor.getOutput().replace("\n", ""));
+    } catch (IOException e) {
+      LOG.error(StringUtils.stringifyException(e));
+    } finally {
+      JIFFY_LENGTH_IN_MILLIS = jiffiesPerSecond != -1 ?
+                     Math.round(1000D / jiffiesPerSecond) : -1;
+    }
+  }
 
   // to enable testing, using this variable which can be configured
   // to a test directory.
   private String procfsDir;
   
   private Integer pid = -1;
+  private Long cpuTime = 0L;
 
   private Map<Integer, ProcessInfo> processTree = new HashMap<Integer, ProcessInfo>();
 
@@ -149,11 +181,12 @@ public class ProcfsBasedProcessTree exte
         pInfoQueue.addAll(pInfo.getChildren());
       }
 
-      // update age values.
+      // update age values and compute the number of jiffies since last update
       for (Map.Entry<Integer, ProcessInfo> procs : processTree.entrySet()) {
         ProcessInfo oldInfo = oldProcs.get(procs.getKey());
-        if (oldInfo != null) {
-          if (procs.getValue() != null) {
+        if (procs.getValue() != null) {
+          procs.getValue().updateJiffy(oldInfo);
+          if (oldInfo != null) {
             procs.getValue().updateAge(oldInfo);  
           }
         }
@@ -196,7 +229,7 @@ public class ProcfsBasedProcessTree exte
   }
 
   private static final String PROCESSTREE_DUMP_FORMAT =
-      "\t|- %d %d %d %d %s %d %s\n";
+      "\t|- %d %d %d %d %s %d %d %d %d %s\n";
 
   /**
    * Get a dump of the process-tree.
@@ -208,12 +241,14 @@ public class ProcfsBasedProcessTree exte
     StringBuilder ret = new StringBuilder();
     // The header.
     ret.append(String.format("\t|- PID PPID PGRPID SESSID CMD_NAME "
-        + "VMEM_USAGE(BYTES) FULL_CMD_LINE\n"));
+        + "USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) "
+        + "RSSMEM_USAGE(PAGES) FULL_CMD_LINE\n"));
     for (ProcessInfo p : processTree.values()) {
       if (p != null) {
         ret.append(String.format(PROCESSTREE_DUMP_FORMAT, p.getPid(), p
             .getPpid(), p.getPgrpId(), p.getSessionId(), p.getName(), p
-            .getVmem(), p.getCmdLine(procfsDir)));
+            .getUtime(), p.getStime(), p.getVmem(), p.getRssmemPage(), p
+            .getCmdLine(procfsDir)));
       }
     }
     return ret.toString();
@@ -231,6 +266,18 @@ public class ProcfsBasedProcessTree exte
   }
 
   /**
+   * Get the cumulative resident set size (rss) memory used by all the processes
+   * in the process-tree.
+   *
+   * @return cumulative rss memory used by the process-tree in bytes. return 0
+   *         if it cannot be calculated
+   */
+  public long getCumulativeRssmem() {
+    // include all processes.. all processes will be older than 0.
+    return getCumulativeRssmem(0);
+  }
+
+  /**
    * Get the cumulative virtual memory used by all the processes in the
    * process-tree that are older than the passed in age.
    * 
@@ -249,6 +296,50 @@ public class ProcfsBasedProcessTree exte
     return total;
   }
 
+  /**
+   * Get the cumulative resident set size (rss) memory used by all the processes
+   * in the process-tree that are older than the passed in age.
+   *
+   * @param olderThanAge processes above this age are included in the
+   *                      memory addition
+   * @return cumulative rss memory used by the process-tree in bytes,
+   *          for processes older than this age. return 0 if it cannot be
+   *          calculated
+   */
+  public long getCumulativeRssmem(int olderThanAge) {
+    if (PAGE_SIZE < 0) {
+      return 0;
+    }
+    long totalPages = 0;
+    for (ProcessInfo p : processTree.values()) {
+      if ((p != null) && (p.getAge() > olderThanAge)) {
+        totalPages += p.getRssmemPage();
+      }
+    }
+    return totalPages * PAGE_SIZE; // convert # pages to byte
+  }
+
+  /**
+   * Get the CPU time in millisecond used by all the processes in the
+   * process-tree since the process-tree created
+   *
+   * @return cumulative CPU time in millisecond since the process-tree created
+   *         return 0 if it cannot be calculated
+   */
+  public long getCumulativeCpuTime() {
+    if (JIFFY_LENGTH_IN_MILLIS < 0) {
+      return 0;
+    }
+    long incJiffies = 0;
+    for (ProcessInfo p : processTree.values()) {
+      if (p != null) {
+        incJiffies += p.dtime;
+      }
+    }
+    cpuTime += incJiffies * JIFFY_LENGTH_IN_MILLIS;
+    return cpuTime;
+  }
+
   private static Integer getValidPID(String pid) {
     Integer retPid = -1;
     try {
@@ -318,10 +409,11 @@ public class ProcfsBasedProcessTree exte
       Matcher m = PROCFS_STAT_FILE_FORMAT.matcher(str);
       boolean mat = m.find();
       if (mat) {
-        // Set ( name ) ( ppid ) ( pgrpId ) (session ) (vsize )
-        pinfo.updateProcessInfo(m.group(2), Integer.parseInt(m.group(3)), Integer
-            .parseInt(m.group(4)), Integer.parseInt(m.group(5)), Long
-            .parseLong(m.group(7)));
+        // Set (name) (ppid) (pgrpId) (session) (utime) (stime) (vsize) (rss)
+         pinfo.updateProcessInfo(m.group(2), Integer.parseInt(m.group(3)),
+                 Integer.parseInt(m.group(4)), Integer.parseInt(m.group(5)),
+                 Long.parseLong(m.group(7)), Long.parseLong(m.group(8)),
+                 Long.parseLong(m.group(10)), Long.parseLong(m.group(11)));
       }
     } catch (IOException io) {
       LOG.warn("Error reading the stream " + io);
@@ -368,8 +460,18 @@ public class ProcfsBasedProcessTree exte
     private Integer ppid; // parent process-id
     private Integer sessionId; // session-id
     private Long vmem; // virtual memory usage
+    private Long rssmemPage; // rss memory usage in # of pages
+    private Long utime = 0L; // # of jiffies in user mode
+    private Long stime = 0L; // # of jiffies in kernel mode
     // how many times has this process been seen alive
     private int age; 
+
+    // # of jiffies used since last update:
+    private Long dtime = 0L;
+    // dtime = (utime + stime) - (utimeOld + stimeOld)
+    // We need this to compute the cumulative CPU time
+    // because the subprocess may finish earlier than root process
+
     private List<ProcessInfo> children = new ArrayList<ProcessInfo>(); // list of children
 
     public ProcessInfo(int pid) {
@@ -402,17 +504,41 @@ public class ProcfsBasedProcessTree exte
       return vmem;
     }
 
+    public Long getUtime() {
+      return utime;
+    }
+
+    public Long getStime() {
+      return stime;
+    }
+
+    public Long getDtime() {
+      return dtime;
+    }
+
+    public Long getRssmemPage() { // get rss # of pages
+      return rssmemPage;
+    }
+
     public int getAge() {
       return age;
     }
     
     public void updateProcessInfo(String name, Integer ppid, Integer pgrpId,
-        Integer sessionId, Long vmem) {
+        Integer sessionId, Long utime, Long stime, Long vmem, Long rssmem) {
       this.name = name;
       this.ppid = ppid;
       this.pgrpId = pgrpId;
       this.sessionId = sessionId;
+      this.utime = utime;
+      this.stime = stime;
       this.vmem = vmem;
+      this.rssmemPage = rssmem;
+    }
+
+    public void updateJiffy(ProcessInfo oldInfo) {
+      this.dtime = (oldInfo == null ? this.utime + this.stime
+              : (this.utime + this.stime) - (oldInfo.utime + oldInfo.stime));
     }
 
     public void updateAge(ProcessInfo oldInfo) {

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/mapred-default.xml?rev=1176835&r1=1176834&r2=1176835&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/mapred-default.xml Wed Sep 28 11:49:34 2011
@@ -186,14 +186,14 @@
 -->
 
 <property>
-  <name>mapred.tasktracker.memory_calculator_plugin</name>
+  <name>mapred.tasktracker.resourcecalculatorplugin</name>
   <value></value>
   <description>
-   Name of the class whose instance will be used to query memory information
+   Name of the class whose instance will be used to query resource information
    on the tasktracker.
    
    The class must be an instance of 
-   org.apache.hadoop.util.MemoryCalculatorPlugin. If the value is null, the
+   org.apache.hadoop.util.ResourceCalculatorPlugin. If the value is null, the
    tasktracker attempts to use a class appropriate to the platform. 
    Currently, the only platform supported is Linux.
   </description>

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=1176835&r1=1176834&r2=1176835&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java Wed Sep 28 11:49:34 2011
@@ -69,8 +69,10 @@ interface InterTrackerProtocol extends V
    * Version 26: Added numRequiredSlots to TaskStatus for MAPREDUCE-516
    * Version 27: Adding node health status to TaskStatus for MAPREDUCE-211
    * Version 28: Adding user name to the serialized Task for use by TT.
+   * Version 29: Adding available memory and CPU usage information on TT to
+   *             TaskTrackerStatus for MAPREDUCE-1218
    */
-  public static final long versionID = 28L;
+  public static final long versionID = 29L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1176835&r1=1176834&r2=1176835&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobTracker.java Wed Sep 28 11:49:34 2011
@@ -4475,9 +4475,9 @@ public class JobTracker implements MRCon
     }
   }
   
-  static final String MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY =
+  public static final String MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY =
       "mapred.cluster.map.memory.mb";
-  static final String MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY =
+  public static final String MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY =
       "mapred.cluster.reduce.memory.mb";
 
   static final String MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY =

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Task.java?rev=1176835&r1=1176834&r2=1176835&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Task.java Wed Sep 28 11:49:34 2011
@@ -50,10 +50,11 @@ import org.apache.hadoop.io.serializer.S
 import org.apache.hadoop.mapred.IFile.Writer;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.ResourceCalculatorPlugin;
+import org.apache.hadoop.util.ResourceCalculatorPlugin.ProcResourceValues;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.fs.FSDataInputStream;
 
@@ -85,7 +86,11 @@ abstract public class Task implements Wr
     REDUCE_SKIPPED_GROUPS,
     REDUCE_SKIPPED_RECORDS,
     SPILLED_RECORDS,
-    SPLIT_RAW_BYTES
+    SPLIT_RAW_BYTES,
+    CPU_MILLISECONDS,
+    PHYSICAL_MEMORY_BYTES,
+    VIRTUAL_MEMORY_BYTES,
+    COMMITTED_HEAP_BYTES
   }
   
   /**
@@ -143,6 +148,9 @@ abstract public class Task implements Wr
   private Iterator<Long> currentRecIndexIterator = 
     skipRanges.skipRangeIterator();
   
+  private ResourceCalculatorPlugin resourceCalculator = null;
+  private long initCpuCumulativeTime = 0;
+
   protected JobConf conf;
   protected MapOutputFile mapOutputFile = new MapOutputFile();
   protected LocalDirAllocator lDirAlloc;
@@ -518,6 +526,16 @@ abstract public class Task implements Wr
       }
     }
     committer.setupTask(taskContext);
+    Class<? extends ResourceCalculatorPlugin> clazz =
+        conf.getClass(TaskTracker.TT_RESOURCE_CALCULATOR_PLUGIN,
+            null, ResourceCalculatorPlugin.class);
+    resourceCalculator = ResourceCalculatorPlugin
+            .getResourceCalculatorPlugin(clazz, conf);
+    LOG.info(" Using ResourceCalculatorPlugin : " + resourceCalculator);
+    if (resourceCalculator != null) {
+      initCpuCumulativeTime =
+        resourceCalculator.getProcResourceValues().getCumulativeCpuTime();
+    }
   }
   
   protected class TaskReporter 
@@ -698,6 +716,7 @@ abstract public class Task implements Wr
       }
     }
     public void stopCommunicationThread() throws InterruptedException {
+      // Updating resources specified in ResourceCalculatorPlugin
       if (pingThread != null) {
         synchronized(lock) {
           while(!done) {
@@ -776,6 +795,27 @@ abstract public class Task implements Wr
   private Map<String, FileSystemStatisticUpdater> statisticUpdaters =
      new HashMap<String, FileSystemStatisticUpdater>();
   
+  /**
+   * Update resource information counters
+   */
+   void updateResourceCounters() {
+     // Update generic resource counters
+     updateHeapUsageCounter();
+     
+     if (resourceCalculator == null) {
+       return;
+     }
+     ProcResourceValues res = resourceCalculator.getProcResourceValues();
+     long cpuTime = res.getCumulativeCpuTime();
+     long pMem = res.getPhysicalMemorySize();
+     long vMem = res.getVirtualMemorySize();
+     // Remove the CPU time consumed previously by JVM reuse
+     cpuTime -= initCpuCumulativeTime;
+     counters.findCounter(Counter.CPU_MILLISECONDS).setValue(cpuTime);
+     counters.findCounter(Counter.PHYSICAL_MEMORY_BYTES).setValue(pMem);
+     counters.findCounter(Counter.VIRTUAL_MEMORY_BYTES).setValue(vMem);
+   }
+  
   private synchronized void updateCounters() {
     for(Statistics stat: FileSystem.getAllStatistics()) {
       String uriScheme = stat.getScheme();
@@ -786,6 +826,19 @@ abstract public class Task implements Wr
       }
       updater.updateCounters();      
     }
+    // TODO Should CPU related counters be update only once i.e in the end
+    updateResourceCounters();
+  }
+
+  /**
+   * Updates the {@link TaskCounter#COMMITTED_HEAP_BYTES} counter to reflect the
+   * current total committed heap space usage of this JVM.
+   */
+  @SuppressWarnings("deprecation")
+  private void updateHeapUsageCounter() {
+    long currentHeapUsage = Runtime.getRuntime().totalMemory();
+    counters.findCounter(Counter.COMMITTED_HEAP_BYTES)
+            .setValue(currentHeapUsage);
   }
 
   public void done(TaskUmbilicalProtocol umbilical,

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1176835&r1=1176834&r2=1176835&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Wed Sep 28 11:49:34 2011
@@ -92,6 +92,7 @@ import org.apache.hadoop.security.author
 import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
 import org.apache.hadoop.util.DiskChecker;
 import org.apache.hadoop.util.MemoryCalculatorPlugin;
+import org.apache.hadoop.util.ResourceCalculatorPlugin;
 import org.apache.hadoop.util.ProcfsBasedProcessTree;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -347,11 +348,14 @@ public class TaskTracker implements MRCo
   private long mapSlotMemorySizeOnTT = JobConf.DISABLED_MEMORY_LIMIT;
   private long reduceSlotSizeMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
   private long totalMemoryAllottedForTasks = JobConf.DISABLED_MEMORY_LIMIT;
+  private ResourceCalculatorPlugin resourceCalculatorPlugin = null;
 
   private UserLogManager userLogManager;
 
   static final String MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY =
       "mapred.tasktracker.memory_calculator_plugin";
+  public static final String TT_RESOURCE_CALCULATOR_PLUGIN = 
+      "mapreduce.tasktracker.resourcecalculatorplugin";
 
   /**
    * the minimum interval between jobtracker polls
@@ -802,6 +806,12 @@ public class TaskTracker implements MRCo
                              taskTrackerName);
     mapEventsFetcher.start();
 
+    Class<? extends ResourceCalculatorPlugin> clazz =
+        fConf.getClass(TT_RESOURCE_CALCULATOR_PLUGIN,
+                       null, ResourceCalculatorPlugin.class);
+    resourceCalculatorPlugin = 
+      ResourceCalculatorPlugin.getResourceCalculatorPlugin(clazz, fConf);
+    LOG.info(" Using ResourceCalculatorPlugin : " + resourceCalculatorPlugin);
     initializeMemoryManagement();
 
     getUserLogManager().clearOldUserLogs(fConf);
@@ -1741,6 +1751,12 @@ public class TaskTracker implements MRCo
       long freeDiskSpace = getFreeSpace();
       long totVmem = getTotalVirtualMemoryOnTT();
       long totPmem = getTotalPhysicalMemoryOnTT();
+      long availableVmem = getAvailableVirtualMemoryOnTT();
+      long availablePmem = getAvailablePhysicalMemoryOnTT();
+      long cumuCpuTime = getCumulativeCpuTimeOnTT();
+      long cpuFreq = getCpuFrequencyOnTT();
+      int numCpu = getNumProcessorsOnTT();
+      float cpuUsage = getCpuUsageOnTT();
 
       status.getResourceStatus().setAvailableSpace(freeDiskSpace);
       status.getResourceStatus().setTotalVirtualMemory(totVmem);
@@ -1749,6 +1765,12 @@ public class TaskTracker implements MRCo
           mapSlotMemorySizeOnTT);
       status.getResourceStatus().setReduceSlotMemorySizeOnTT(
           reduceSlotSizeMemoryOnTT);
+      status.getResourceStatus().setAvailableVirtualMemory(availableVmem); 
+      status.getResourceStatus().setAvailablePhysicalMemory(availablePmem);
+      status.getResourceStatus().setCumulativeCpuTime(cumuCpuTime);
+      status.getResourceStatus().setCpuFrequency(cpuFreq);
+      status.getResourceStatus().setNumProcessors(numCpu);
+      status.getResourceStatus().setCpuUsage(cpuUsage);
     }
     //add node health information
     
@@ -1821,6 +1843,80 @@ public class TaskTracker implements MRCo
     return totalPhysicalMemoryOnTT;
   }
 
+  /**
+   * Return the free virtual memory available on this TaskTracker.
+   * @return total size of free virtual memory.
+   */
+  long getAvailableVirtualMemoryOnTT() {
+    long availableVirtualMemoryOnTT = TaskTrackerStatus.UNAVAILABLE;
+    if (resourceCalculatorPlugin != null) {
+      availableVirtualMemoryOnTT =
+              resourceCalculatorPlugin.getAvailableVirtualMemorySize();
+    }
+    return availableVirtualMemoryOnTT;
+  }
+
+  /**
+   * Return the free physical memory available on this TaskTracker.
+   * @return total size of free physical memory in bytes
+   */
+  long getAvailablePhysicalMemoryOnTT() {
+    long availablePhysicalMemoryOnTT = TaskTrackerStatus.UNAVAILABLE;
+    if (resourceCalculatorPlugin != null) {
+      availablePhysicalMemoryOnTT =
+              resourceCalculatorPlugin.getAvailablePhysicalMemorySize();
+    }
+    return availablePhysicalMemoryOnTT;
+  }
+
+  /**
+   * Return the cumulative CPU used time on this TaskTracker since system is on
+   * @return cumulative CPU used time in millisecond
+   */
+  long getCumulativeCpuTimeOnTT() {
+    long cumulativeCpuTime = TaskTrackerStatus.UNAVAILABLE;
+    if (resourceCalculatorPlugin != null) {
+      cumulativeCpuTime = resourceCalculatorPlugin.getCumulativeCpuTime();
+    }
+    return cumulativeCpuTime;
+  }
+
+  /**
+   * Return the number of Processors on this TaskTracker
+   * @return number of processors
+   */
+  int getNumProcessorsOnTT() {
+    int numProcessors = TaskTrackerStatus.UNAVAILABLE;
+    if (resourceCalculatorPlugin != null) {
+      numProcessors = resourceCalculatorPlugin.getNumProcessors();
+    }
+    return numProcessors;
+  }
+
+  /**
+   * Return the CPU frequency of this TaskTracker
+   * @return CPU frequency in kHz
+   */
+  long getCpuFrequencyOnTT() {
+    long cpuFrequency = TaskTrackerStatus.UNAVAILABLE;
+    if (resourceCalculatorPlugin != null) {
+      cpuFrequency = resourceCalculatorPlugin.getCpuFrequency();
+    }
+    return cpuFrequency;
+  }
+
+  /**
+   * Return the CPU usage in % of this TaskTracker
+   * @return CPU usage in %
+   */
+  float getCpuUsageOnTT() {
+    float cpuUsage = TaskTrackerStatus.UNAVAILABLE;
+    if (resourceCalculatorPlugin != null) {
+      cpuUsage = resourceCalculatorPlugin.getCpuUsage();
+    }
+    return cpuUsage;
+  }
+  
   long getTotalMemoryAllottedForTasksOnTT() {
     return totalMemoryAllottedForTasks;
   }
@@ -3976,25 +4072,31 @@ public class TaskTracker implements MRCo
           JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY));
     }
 
-    Class<? extends MemoryCalculatorPlugin> clazz =
-        fConf.getClass(MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
-            null, MemoryCalculatorPlugin.class);
-    MemoryCalculatorPlugin memoryCalculatorPlugin =
-        MemoryCalculatorPlugin
-            .getMemoryCalculatorPlugin(clazz, fConf);
-    LOG.info(" Using MemoryCalculatorPlugin : " + memoryCalculatorPlugin);
-
-    if (memoryCalculatorPlugin != null) {
-      totalVirtualMemoryOnTT = memoryCalculatorPlugin.getVirtualMemorySize();
+    // Use TT_RESOURCE_CALCULATOR_PLUGIN if it is configured.
+    Class<? extends MemoryCalculatorPlugin> clazz = 
+      fConf.getClass(MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY, 
+                     null, MemoryCalculatorPlugin.class); 
+    MemoryCalculatorPlugin memoryCalculatorPlugin = 
+      (clazz == null 
+       ? null 
+       : MemoryCalculatorPlugin.getMemoryCalculatorPlugin(clazz, fConf)); 
+    if (memoryCalculatorPlugin != null || resourceCalculatorPlugin != null) {
+      totalVirtualMemoryOnTT = 
+        (memoryCalculatorPlugin == null 
+         ? resourceCalculatorPlugin.getVirtualMemorySize() 
+         : memoryCalculatorPlugin.getVirtualMemorySize());
       if (totalVirtualMemoryOnTT <= 0) {
         LOG.warn("TaskTracker's totalVmem could not be calculated. "
-            + "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT);
+                 + "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT);
         totalVirtualMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
       }
-      totalPhysicalMemoryOnTT = memoryCalculatorPlugin.getPhysicalMemorySize();
+      totalPhysicalMemoryOnTT = 
+        (memoryCalculatorPlugin == null 
+         ? resourceCalculatorPlugin.getPhysicalMemorySize() 
+         : memoryCalculatorPlugin.getPhysicalMemorySize());
       if (totalPhysicalMemoryOnTT <= 0) {
         LOG.warn("TaskTracker's totalPmem could not be calculated. "
-            + "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT);
+                 + "Setting it to " + JobConf.DISABLED_MEMORY_LIMIT);
         totalPhysicalMemoryOnTT = JobConf.DISABLED_MEMORY_LIMIT;
       }
     }

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=1176835&r1=1176834&r2=1176835&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Wed Sep 28 11:49:34 2011
@@ -55,6 +55,7 @@ public class TaskTrackerStatus implement
   private int maxReduceTasks;
   private TaskTrackerHealthStatus healthStatus;
    
+  public static final int UNAVAILABLE = -1;
   /**
    * Class representing a collection of resources on this tasktracker.
    */
@@ -66,6 +67,13 @@ public class TaskTrackerStatus implement
     private long reduceSlotMemorySizeOnTT;
     private long availableSpace;
     
+    private long availableVirtualMemory = UNAVAILABLE; // in byte
+    private long availablePhysicalMemory = UNAVAILABLE; // in byte
+    private int numProcessors = UNAVAILABLE;
+    private long cumulativeCpuTime = UNAVAILABLE; // in millisecond
+    private long cpuFrequency = UNAVAILABLE; // in kHz
+    private float cpuUsage = UNAVAILABLE; // in %
+
     ResourceStatus() {
       totalVirtualMemory = JobConf.DISABLED_MEMORY_LIMIT;
       totalPhysicalMemory = JobConf.DISABLED_MEMORY_LIMIT;
@@ -172,21 +180,160 @@ public class TaskTrackerStatus implement
     long getAvailableSpace() {
       return availableSpace;
     }
+
+    /**
+     * Set the amount of available virtual memory on the tasktracker.
+     * If the input is not a valid number, it will be set to UNAVAILABLE
+     *
+     * @param vmem amount of available virtual memory on the tasktracker
+     *                    in bytes.
+     */
+    void setAvailableVirtualMemory(long availableMem) {
+      availableVirtualMemory = availableMem > 0 ?
+                               availableMem : UNAVAILABLE;
+    }
+
+    /**
+     * Get the amount of available virtual memory on the tasktracker.
+     * Will return UNAVAILABLE if it cannot be obtained
+     *
+     * @return the amount of available virtual memory on the tasktracker
+     *             in bytes.
+     */
+    long getAvailableVirtualMemory() {
+      return availableVirtualMemory;
+    }
+
+    /**
+     * Set the amount of available physical memory on the tasktracker.
+     * If the input is not a valid number, it will be set to UNAVAILABLE
+     *
+     * @param availableRAM amount of available physical memory on the
+     *                     tasktracker in bytes.
+     */
+    void setAvailablePhysicalMemory(long availableRAM) {
+      availablePhysicalMemory = availableRAM > 0 ?
+                                availableRAM : UNAVAILABLE;
+    }
+
+    /**
+     * Get the amount of available physical memory on the tasktracker.
+     * Will return UNAVAILABLE if it cannot be obtained
+     *
+     * @return amount of available physical memory on the tasktracker in bytes.
+     */
+    long getAvailablePhysicalMemory() {
+      return availablePhysicalMemory;
+    }
+
+    /**
+     * Set the CPU frequency of this TaskTracker
+     * If the input is not a valid number, it will be set to UNAVAILABLE
+     *
+     * @param cpuFrequency CPU frequency in kHz
+     */
+    public void setCpuFrequency(long cpuFrequency) {
+      this.cpuFrequency = cpuFrequency > 0 ?
+                          cpuFrequency : UNAVAILABLE;
+    }
+
+    /**
+     * Get the CPU frequency of this TaskTracker
+     * Will return UNAVAILABLE if it cannot be obtained
+     *
+     * @return CPU frequency in kHz
+     */
+    public long getCpuFrequency() {
+      return cpuFrequency;
+    }
+
+    /**
+     * Set the number of processors on this TaskTracker
+     * If the input is not a valid number, it will be set to UNAVAILABLE
+     *
+     * @param numProcessors number of processors
+     */
+    public void setNumProcessors(int numProcessors) {
+      this.numProcessors = numProcessors > 0 ?
+                           numProcessors : UNAVAILABLE;
+    }
+
+    /**
+     * Get the number of processors on this TaskTracker
+     * Will return UNAVAILABLE if it cannot be obtained
+     *
+     * @return number of processors
+     */
+    public int getNumProcessors() {
+      return numProcessors;
+    }
+
+    /**
+     * Set the cumulative CPU time on this TaskTracker since it is up
+     * It can be set to UNAVAILABLE if it is currently unavailable.
+     *
+     * @param cumulativeCpuTime Used CPU time in millisecond
+     */
+    public void setCumulativeCpuTime(long cumulativeCpuTime) {
+      this.cumulativeCpuTime = cumulativeCpuTime > 0 ?
+                               cumulativeCpuTime : UNAVAILABLE;
+    }
+
+    /**
+     * Get the cumulative CPU time on this TaskTracker since it is up
+     * Will return UNAVAILABLE if it cannot be obtained
+     *
+     * @return used CPU time in milliseconds
+     */
+    public long getCumulativeCpuTime() {
+      return cumulativeCpuTime;
+    }
+    
+    /**
+     * Set the CPU usage on this TaskTracker
+     * 
+     * @param cpuUsage CPU usage in %
+     */
+    public void setCpuUsage(float cpuUsage) {
+      this.cpuUsage = cpuUsage;
+    }
+
+    /**
+     * Get the CPU usage on this TaskTracker
+     * Will return UNAVAILABLE if it cannot be obtained
+     *
+     * @return CPU usage in %
+     */
+    public float getCpuUsage() {
+      return cpuUsage;
+    }
     
     public void write(DataOutput out) throws IOException {
       WritableUtils.writeVLong(out, totalVirtualMemory);
       WritableUtils.writeVLong(out, totalPhysicalMemory);
+      WritableUtils.writeVLong(out, availableVirtualMemory);
+      WritableUtils.writeVLong(out, availablePhysicalMemory);
       WritableUtils.writeVLong(out, mapSlotMemorySizeOnTT);
       WritableUtils.writeVLong(out, reduceSlotMemorySizeOnTT);
       WritableUtils.writeVLong(out, availableSpace);
+      WritableUtils.writeVLong(out, cumulativeCpuTime);
+      WritableUtils.writeVLong(out, cpuFrequency);
+      WritableUtils.writeVInt(out, numProcessors);
+      out.writeFloat(getCpuUsage());
     }
     
     public void readFields(DataInput in) throws IOException {
       totalVirtualMemory = WritableUtils.readVLong(in);
       totalPhysicalMemory = WritableUtils.readVLong(in);
+      availableVirtualMemory = WritableUtils.readVLong(in);
+      availablePhysicalMemory = WritableUtils.readVLong(in);
       mapSlotMemorySizeOnTT = WritableUtils.readVLong(in);
       reduceSlotMemorySizeOnTT = WritableUtils.readVLong(in);
       availableSpace = WritableUtils.readVLong(in);
+      cumulativeCpuTime = WritableUtils.readVLong(in);
+      cpuFrequency = WritableUtils.readVLong(in);
+      numProcessors = WritableUtils.readVInt(in);
+      setCpuUsage(in.readFloat());
     }
   }
   

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties?rev=1176835&r1=1176834&r2=1176835&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/Task_Counter.properties Wed Sep 28 11:49:34 2011
@@ -17,4 +17,8 @@ REDUCE_OUTPUT_RECORDS.name=    Reduce ou
 REDUCE_SKIPPED_RECORDS.name=   Reduce skipped records
 REDUCE_SKIPPED_GROUPS.name=    Reduce skipped groups
 SPILLED_RECORDS.name=          Spilled Records
+COMMITTED_HEAP_BYTES.name=     Total committed heap usage (bytes)
+CPU_MILLISECONDS.name=         CPU time spent (ms)
+PHYSICAL_MEMORY_BYTES.name=    Physical memory (bytes) snapshot
+VIRTUAL_MEMORY_BYTES.name=     Virtual memory (bytes) snapshot
 

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/Counter.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/Counter.java?rev=1176835&r1=1176834&r2=1176835&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/Counter.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapreduce/Counter.java Wed Sep 28 11:49:34 2011
@@ -104,6 +104,14 @@ public class Counter implements Writable
   }
     
   /**
+   * Set this counter by the given value
+   * @param value the value to set
+   */
+  public synchronized void setValue(long value) {
+    this.value = value;
+  }
+
+  /**
    * Increment this counter by the given value
    * @param incr the value to increase this counter by
    */

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestCounters.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestCounters.java?rev=1176835&r1=1176834&r2=1176835&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestCounters.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestCounters.java Wed Sep 28 11:49:34 2011
@@ -20,6 +20,9 @@ package org.apache.hadoop.mapred;
 import junit.framework.TestCase;
 import java.io.IOException;
 import java.text.ParseException;
+import java.util.Random;
+
+import org.apache.hadoop.mapred.Counters.Counter;
 
 /**
  * TestCounters checks the sanity and recoverability of {@code Counters}
@@ -90,6 +93,33 @@ public class TestCounters extends TestCa
     }
   }
   
+  /**
+   * Verify counter value works
+   */
+  public void testCounterValue() {
+    final int NUMBER_TESTS = 100;
+    final int NUMBER_INC = 10;
+    final Random rand = new Random();
+    for (int i = 0; i < NUMBER_TESTS; i++) {
+      long initValue = rand.nextInt();
+      long expectedValue = initValue;
+      Counter counter = new Counter("foo", "bar", expectedValue);
+      assertEquals("Counter value is not initialized correctly",
+                   expectedValue, counter.getValue());
+      for (int j = 0; j < NUMBER_INC; j++) {
+        int incValue = rand.nextInt();
+        counter.increment(incValue);
+        expectedValue += incValue;
+        assertEquals("Counter value is not incremented correctly",
+                     expectedValue, counter.getValue());
+      }
+      expectedValue = rand.nextInt();
+      counter.setValue(expectedValue);
+      assertEquals("Counter value is not set correctly",
+                   expectedValue, counter.getValue());
+    }
+  }
+  
   public static void main(String[] args) throws IOException {
     new TestCounters().testCounters();
   }

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestJobCounters.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestJobCounters.java?rev=1176835&r1=1176834&r2=1176835&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestJobCounters.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestJobCounters.java Wed Sep 28 11:49:34 2011
@@ -23,12 +23,12 @@ import java.io.FileWriter;
 import java.io.Writer;
 import java.io.BufferedWriter;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
 import java.util.StringTokenizer;
 
 import junit.framework.TestCase;
-import junit.extensions.TestSetup;
-import junit.framework.Test;
-import junit.framework.TestSuite;
 
 import static org.apache.hadoop.mapred.Task.Counter.SPILLED_RECORDS;
 import static org.apache.hadoop.mapred.Task.Counter.MAP_INPUT_RECORDS;
@@ -36,12 +36,17 @@ import static org.apache.hadoop.mapred.T
 import static org.apache.hadoop.mapred.Task.Counter.MAP_INPUT_BYTES;
 import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_BYTES;
 import static org.apache.hadoop.mapred.Task.Counter.MAP_OUTPUT_MATERIALIZED_BYTES;
+import static org.apache.hadoop.mapred.Task.Counter.COMMITTED_HEAP_BYTES;
 
+import org.apache.commons.lang.RandomStringUtils;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
@@ -450,4 +455,249 @@ public class TestJobCounters extends Tes
       }
     }
   }
+  
+  /** 
+   * Increases the JVM's heap usage to the specified target value.
+   */
+  static class MemoryLoader {
+    private static final int DEFAULT_UNIT_LOAD_SIZE = 10 * 1024 * 1024; // 10mb
+    
+    // the target value to reach
+    private long targetValue;
+    // a list to hold the load objects
+    private List<String> loadObjects = new ArrayList<String>();
+    
+    MemoryLoader(long targetValue) {
+      this.targetValue = targetValue;
+    }
+    
+    /**
+     * Loads the memory to the target value.
+     */
+    void load() {
+      while (Runtime.getRuntime().totalMemory() < targetValue) {
+        System.out.println("Loading memory with " + DEFAULT_UNIT_LOAD_SIZE 
+                           + " characters. Current usage : " 
+                           + Runtime.getRuntime().totalMemory());
+        // load some objects in the memory
+        loadObjects.add(RandomStringUtils.random(DEFAULT_UNIT_LOAD_SIZE));
+
+        // sleep for 100ms
+        try {
+          Thread.sleep(100);
+        } catch (InterruptedException ie) {}
+      }
+    }
+  }
+
+  /**
+   * A mapper that increases the JVM's heap usage to a target value configured 
+   * via {@link MemoryLoaderMapper#TARGET_VALUE} using a {@link MemoryLoader}.
+   */
+  @SuppressWarnings({"deprecation", "unchecked"})
+  static class MemoryLoaderMapper 
+  extends MapReduceBase 
+  implements org.apache.hadoop.mapred.Mapper<WritableComparable, Writable, 
+                    WritableComparable, Writable> {
+    static final String TARGET_VALUE = "map.memory-loader.target-value";
+    
+    private static MemoryLoader loader = null;
+    
+    public void map(WritableComparable key, Writable val, 
+                    OutputCollector<WritableComparable, Writable> output,
+                    Reporter reporter)
+    throws IOException {
+      assertNotNull("Mapper not configured!", loader);
+      
+      // load the memory
+      loader.load();
+      
+      // work as identity mapper
+      output.collect(key, val);
+    }
+
+    public void configure(JobConf conf) {
+      loader = new MemoryLoader(conf.getLong(TARGET_VALUE, -1));
+    }
+  }
+
+  /** 
+   * A reducer that increases the JVM's heap usage to a target value configured 
+   * via {@link MemoryLoaderReducer#TARGET_VALUE} using a {@link MemoryLoader}.
+   */
+  @SuppressWarnings({"deprecation", "unchecked"})
+  static class MemoryLoaderReducer extends MapReduceBase 
+  implements org.apache.hadoop.mapred.Reducer<WritableComparable, Writable, 
+                     WritableComparable, Writable> {
+    static final String TARGET_VALUE = "reduce.memory-loader.target-value";
+    private static MemoryLoader loader = null;
+    
+    public void reduce(WritableComparable key, Iterator<Writable> val, 
+                       OutputCollector<WritableComparable, Writable> output,
+                       Reporter reporter)
+    throws IOException {
+      assertNotNull("Reducer not configured!", loader);
+      
+      // load the memory
+      loader.load();
+      
+      // work as identity reducer
+      output.collect(key, key);
+    }
+
+    public void configure(JobConf conf) {
+      loader = new MemoryLoader(conf.getLong(TARGET_VALUE, -1));
+    }
+  }
+
+  @SuppressWarnings("deprecation")
+  private long getTaskCounterUsage (JobClient client, JobID id, int numReports,
+                                    int taskId, boolean isMap) 
+  throws Exception {
+    TaskReport[] reports = null;
+    if (isMap) {
+      reports = client.getMapTaskReports(id);
+    } else {
+      reports = client.getReduceTaskReports(id);
+    }
+    
+    assertNotNull("No reports found for " + (isMap? "map" : "reduce") + " tasks" 
+                  + "' in job " + id, reports);
+    // make sure that the total number of reports match the expected
+    assertEquals("Mismatch in task id", numReports, reports.length);
+    
+    Counters counters = reports[taskId].getCounters();
+    
+    return counters.getCounter(COMMITTED_HEAP_BYTES);
+  }
+
+  // set up heap options, target value for memory loader and the output 
+  // directory before running the job
+  @SuppressWarnings("deprecation")
+  private static RunningJob runHeapUsageTestJob(JobConf conf, Path testRootDir,
+                              String heapOptions, long targetMapValue,
+                              long targetReduceValue, FileSystem fs, 
+                              JobClient client, Path inDir) 
+  throws IOException {
+    // define a job
+    JobConf jobConf = new JobConf(conf);
+    
+    // configure the jobs
+    jobConf.setNumMapTasks(1);
+    jobConf.setNumReduceTasks(1);
+    jobConf.setMapperClass(MemoryLoaderMapper.class);
+    jobConf.setReducerClass(MemoryLoaderReducer.class);
+    jobConf.setInputFormat(TextInputFormat.class);
+    jobConf.setOutputKeyClass(LongWritable.class);
+    jobConf.setOutputValueClass(Text.class);
+    jobConf.setMaxMapAttempts(1);
+    jobConf.setMaxReduceAttempts(1);
+    jobConf.set(JobConf.MAPRED_TASK_JAVA_OPTS, heapOptions);
+    
+    // set the targets
+    jobConf.setLong(MemoryLoaderMapper.TARGET_VALUE, targetMapValue);
+    jobConf.setLong(MemoryLoaderReducer.TARGET_VALUE, targetReduceValue);
+    
+    // set the input directory for the job
+    FileInputFormat.setInputPaths(jobConf, inDir);
+    
+    // define job output folder
+    Path outDir = new Path(testRootDir, "out");
+    fs.delete(outDir, true);
+    FileOutputFormat.setOutputPath(jobConf, outDir);
+    
+    // run the job
+    RunningJob job = client.submitJob(jobConf);
+    job.waitForCompletion();
+    JobID jobID = job.getID();
+    assertTrue("Job " + jobID + " failed!", job.isSuccessful());
+    
+    return job;
+  }
+
+  /**
+   * Tests {@link TaskCounter}'s {@link TaskCounter.COMMITTED_HEAP_BYTES}. 
+   * The test consists of running a low-memory job which consumes less heap 
+   * memory and then running a high-memory job which consumes more heap memory, 
+   * and then ensuring that COMMITTED_HEAP_BYTES of low-memory job is smaller 
+   * than that of the high-memory job.
+   * @throws IOException
+   */
+  @SuppressWarnings("deprecation")
+  public void testHeapUsageCounter() throws Exception {
+    JobConf conf = new JobConf();
+    // create a local filesystem handle
+    FileSystem fileSystem = FileSystem.getLocal(conf);
+    
+    // define test root directories
+    File rootDir =
+      new File(System.getProperty("test.build.data", "/tmp"));
+    File testRootDir = new File(rootDir, "testHeapUsageCounter");
+    // cleanup the test root directory
+    Path testRootDirPath = new Path(testRootDir.toString());
+    fileSystem.delete(testRootDirPath, true);
+    // set the current working directory
+    fileSystem.setWorkingDirectory(testRootDirPath);
+    
+    fileSystem.deleteOnExit(testRootDirPath);
+    
+    // create a mini cluster using the local file system
+    MiniMRCluster mrCluster = 
+      new MiniMRCluster(1, fileSystem.getUri().toString(), 1);
+    
+    try {
+      conf = mrCluster.createJobConf();
+      JobClient jobClient = new JobClient(conf);
+
+      // define job input
+      File file = new File(testRootDir, "in");
+      Path inDir = new Path(file.toString());
+      // create input data
+      createWordsFile(file);
+
+      // configure and run a low memory job which will run without loading the
+      // jvm's heap
+      RunningJob lowMemJob = 
+        runHeapUsageTestJob(conf, testRootDirPath, "-Xms32m -Xmx1G", 
+                            0, 0, fileSystem, jobClient, inDir);
+      JobID lowMemJobID = lowMemJob.getID();
+      long lowMemJobMapHeapUsage = getTaskCounterUsage(jobClient, lowMemJobID, 
+                                                       1, 0, true);
+      System.out.println("Job1 (low memory job) map task heap usage: " 
+                         + lowMemJobMapHeapUsage);
+      long lowMemJobReduceHeapUsage =
+        getTaskCounterUsage(jobClient, lowMemJobID, 1, 0, false);
+      System.out.println("Job1 (low memory job) reduce task heap usage: " 
+                         + lowMemJobReduceHeapUsage);
+
+      // configure and run a high memory job which will load the jvm's heap
+      RunningJob highMemJob = 
+        runHeapUsageTestJob(conf, testRootDirPath, "-Xms32m -Xmx1G", 
+                            lowMemJobMapHeapUsage + 256*1024*1024, 
+                            lowMemJobReduceHeapUsage + 256*1024*1024,
+                            fileSystem, jobClient, inDir);
+      JobID highMemJobID = highMemJob.getID();
+
+      long highMemJobMapHeapUsage = getTaskCounterUsage(jobClient, highMemJobID,
+                                                        1, 0, true);
+      System.out.println("Job2 (high memory job) map task heap usage: " 
+                         + highMemJobMapHeapUsage);
+      long highMemJobReduceHeapUsage =
+        getTaskCounterUsage(jobClient, highMemJobID, 1, 0, false);
+      System.out.println("Job2 (high memory job) reduce task heap usage: " 
+                         + highMemJobReduceHeapUsage);
+
+      assertTrue("Incorrect map heap usage reported by the map task", 
+                 lowMemJobMapHeapUsage < highMemJobMapHeapUsage);
+
+      assertTrue("Incorrect reduce heap usage reported by the reduce task", 
+                 lowMemJobReduceHeapUsage < highMemJobReduceHeapUsage);
+    } finally {
+      // shutdown the mr cluster
+      mrCluster.shutdown();
+      try {
+        fileSystem.delete(testRootDirPath, true);
+      } catch (IOException ioe) {} 
+    }
+  }
 }
\ No newline at end of file

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTTMemoryReporting.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTTMemoryReporting.java?rev=1176835&r1=1176834&r2=1176835&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTTMemoryReporting.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/mapred/TestTTMemoryReporting.java Wed Sep 28 11:49:34 2011
@@ -1,230 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.mapred;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.examples.SleepJob;
-import org.apache.hadoop.util.LinuxMemoryCalculatorPlugin;
-import org.apache.hadoop.util.MemoryCalculatorPlugin;
-import org.apache.hadoop.util.ToolRunner;
-import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
-
-import junit.framework.TestCase;
-
-/**
- * This test class tests the functionality related to configuring, reporting
- * and computing memory related parameters in a Map/Reduce cluster.
- * 
- * Each test sets up a {@link MiniMRCluster} with a locally defined 
- * {@link org.apache.hadoop.mapred.TaskScheduler}. This scheduler validates 
- * the memory related configuration is correctly computed and reported from 
- * the tasktracker in 
- * {@link org.apache.hadoop.mapred.TaskScheduler#assignTasks(TaskTrackerStatus)}.
- */
-public class TestTTMemoryReporting extends TestCase {
-
-  static final Log LOG = LogFactory.getLog(TestTTMemoryReporting.class);
-  
-  private MiniMRCluster miniMRCluster;
-
-  /**
-   * Fake scheduler to test the proper reporting of memory values by TT
-   */
-  public static class FakeTaskScheduler extends JobQueueTaskScheduler {
-    
-    private boolean hasPassed = true;
-    private String message;
-    
-    public FakeTaskScheduler() {
-      super();
-    }
-    
-    public boolean hasTestPassed() {
-      return hasPassed;
-    }
-    
-    public String getFailureMessage() {
-      return message;
-    }
-    
-    @Override
-    public List<Task> assignTasks(TaskTracker taskTracker)
-        throws IOException {
-      TaskTrackerStatus status = taskTracker.getStatus();
-      
-      long totalVirtualMemoryOnTT =
-          getConf().getLong("totalVmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
-      long totalPhysicalMemoryOnTT =
-          getConf().getLong("totalPmemOnTT", JobConf.DISABLED_MEMORY_LIMIT);
-      long mapSlotMemorySize =
-          getConf().getLong("mapSlotMemorySize", JobConf.DISABLED_MEMORY_LIMIT);
-      long reduceSlotMemorySize =
-          getConf()
-              .getLong("reduceSlotMemorySize", JobConf.DISABLED_MEMORY_LIMIT);
-
-      long reportedTotalVirtualMemoryOnTT =
-          status.getResourceStatus().getTotalVirtualMemory();
-      long reportedTotalPhysicalMemoryOnTT =
-          status.getResourceStatus().getTotalPhysicalMemory();
-      long reportedMapSlotMemorySize =
-          status.getResourceStatus().getMapSlotMemorySizeOnTT();
-      long reportedReduceSlotMemorySize =
-          status.getResourceStatus().getReduceSlotMemorySizeOnTT();
-
-      message =
-          "expected memory values : (totalVirtualMemoryOnTT, totalPhysicalMemoryOnTT, "
-              + "mapSlotMemSize, reduceSlotMemorySize) = ("
-              + totalVirtualMemoryOnTT + ", " + totalPhysicalMemoryOnTT + ","
-              + mapSlotMemorySize + "," + reduceSlotMemorySize + ")";
-      message +=
-          "\nreported memory values : (totalVirtualMemoryOnTT, totalPhysicalMemoryOnTT, "
-              + "reportedMapSlotMemorySize, reportedReduceSlotMemorySize) = ("
-              + reportedTotalVirtualMemoryOnTT
-              + ", "
-              + reportedTotalPhysicalMemoryOnTT
-              + ","
-              + reportedMapSlotMemorySize
-              + ","
-              + reportedReduceSlotMemorySize
-              + ")";
-      LOG.info(message);
-      if (totalVirtualMemoryOnTT != reportedTotalVirtualMemoryOnTT
-          || totalPhysicalMemoryOnTT != reportedTotalPhysicalMemoryOnTT
-          || mapSlotMemorySize != reportedMapSlotMemorySize
-          || reduceSlotMemorySize != reportedReduceSlotMemorySize) {
-        hasPassed = false;
-      }
-      return super.assignTasks(taskTracker);
-    }
-  }
-
-  /**
-   * Test that verifies default values are configured and reported correctly.
-   * 
-   * @throws Exception
-   */
-  public void testDefaultMemoryValues()
-      throws Exception {
-    JobConf conf = new JobConf();
-    try {
-      // Memory values are disabled by default.
-      conf.setClass(
-          org.apache.hadoop.mapred.TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
-          DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
-      setUpCluster(conf);
-      runSleepJob(miniMRCluster.createJobConf());
-      verifyTestResults();
-    } finally {
-      tearDownCluster();
-    }
-  }
-
-  /**
-   * Test that verifies that configured values are reported correctly.
-   * 
-   * @throws Exception
-   */
-  public void testConfiguredMemoryValues()
-      throws Exception {
-    JobConf conf = new JobConf();
-    conf.setLong("totalVmemOnTT", 4 * 1024 * 1024 * 1024L);
-    conf.setLong("totalPmemOnTT", 2 * 1024 * 1024 * 1024L);
-    conf.setLong("mapSlotMemorySize", 1 * 512L);
-    conf.setLong("reduceSlotMemorySize", 1 * 1024L);
-
-    conf.setClass(
-        org.apache.hadoop.mapred.TaskTracker.MAPRED_TASKTRACKER_MEMORY_CALCULATOR_PLUGIN_PROPERTY,
-        DummyMemoryCalculatorPlugin.class, MemoryCalculatorPlugin.class);
-    conf.setLong(DummyMemoryCalculatorPlugin.MAXVMEM_TESTING_PROPERTY,
-        4 * 1024 * 1024 * 1024L);
-    conf.setLong(DummyMemoryCalculatorPlugin.MAXPMEM_TESTING_PROPERTY,
-        2 * 1024 * 1024 * 1024L);
-    conf.setLong(JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
-        512L);
-    conf.setLong(
-        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1024L);
-    
-    try {
-      setUpCluster(conf);
-      JobConf jobConf = miniMRCluster.createJobConf();
-      jobConf.setMemoryForMapTask(1 * 1024L);
-      jobConf.setMemoryForReduceTask(2 * 1024L);
-      runSleepJob(jobConf);
-      verifyTestResults();
-    } finally {
-      tearDownCluster();
-    }
-  }
-
-  /**
-   * Test that verifies that total memory values are calculated and reported
-   * correctly.
-   * 
-   * @throws Exception
-   */
-  public void testMemoryValuesOnLinux()
-      throws Exception {
-    if (!System.getProperty("os.name").startsWith("Linux")) {
-      return;
-    }
-
-    JobConf conf = new JobConf();
-    LinuxMemoryCalculatorPlugin plugin = new LinuxMemoryCalculatorPlugin();
-    conf.setLong("totalVmemOnTT", plugin.getVirtualMemorySize());
-    conf.setLong("totalPmemOnTT", plugin.getPhysicalMemorySize());
-
-    try {
-      setUpCluster(conf);
-      runSleepJob(miniMRCluster.createJobConf());
-      verifyTestResults();
-    } finally {
-      tearDownCluster();
-    }
-  }
-
-  private void setUpCluster(JobConf conf)
-                                throws Exception {
-    conf.setClass("mapred.jobtracker.taskScheduler",
-        TestTTMemoryReporting.FakeTaskScheduler.class, TaskScheduler.class);
-    conf.set("mapred.job.tracker.handler.count", "1");
-    miniMRCluster = new MiniMRCluster(1, "file:///", 3, null, null, conf);
-  }
-  
-  private void runSleepJob(JobConf conf) throws Exception {
-    String[] args = { "-m", "1", "-r", "1",
-                      "-mt", "10", "-rt", "10" };
-    ToolRunner.run(conf, new SleepJob(), args);
-  }
-
-  private void verifyTestResults() {
-    FakeTaskScheduler scheduler = 
-      (FakeTaskScheduler)miniMRCluster.getJobTrackerRunner().
-                              getJobTracker().getTaskScheduler();
-    assertTrue(scheduler.getFailureMessage(), scheduler.hasTestPassed());
-  }
-  
-  private void tearDownCluster() {
-    if (miniMRCluster != null) {
-      miniMRCluster.shutdown();
-    }
-  }
-}

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java?rev=1176835&r1=1176834&r2=1176835&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/util/TestProcfsBasedProcessTree.java Wed Sep 28 11:49:34 2011
@@ -22,6 +22,7 @@ import java.io.BufferedWriter;
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.Random;
 import java.util.Vector;
 import java.util.regex.Matcher;
@@ -182,12 +183,12 @@ public class TestProcfsBasedProcessTree 
 
     LOG.info("Process-tree dump follows: \n" + processTreeDump);
     assertTrue("Process-tree dump doesn't start with a proper header",
-        processTreeDump.startsWith("\t|- PID PPID PGRPID SESSID CMD_NAME "
-            + "VMEM_USAGE(BYTES) FULL_CMD_LINE\n"));
+        processTreeDump.startsWith("\t|- PID PPID PGRPID SESSID CMD_NAME " +
+        "USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) " +
+        "RSSMEM_USAGE(PAGES) FULL_CMD_LINE\n"));
     for (int i = N; i >= 0; i--) {
-      String cmdLineDump =
-          "\\|- [0-9]+ [0-9]+ [0-9]+ [0-9]+ \\(sh\\) [0-9]+ sh " + shellScript
-              + " " + i;
+      String cmdLineDump = "\\|- [0-9]+ [0-9]+ [0-9]+ [0-9]+ \\(sh\\)" +
+          " [0-9]+ [0-9]+ [0-9]+ [0-9]+ sh " + shellScript + " " + i;
       Pattern pat = Pattern.compile(cmdLineDump);
       Matcher mat = pat.matcher(processTreeDump);
       assertTrue("Process-tree dump doesn't contain the cmdLineDump of " + i
@@ -221,7 +222,10 @@ public class TestProcfsBasedProcessTree 
     String ppid;
     String pgrpId;
     String session;
-    String vmem;
+    String vmem = "0";
+    String rssmemPage = "0";
+    String utime = "0";
+    String stime = "0";
     
     public ProcessStatInfo(String[] statEntries) {
       pid = statEntries[0];
@@ -230,27 +234,35 @@ public class TestProcfsBasedProcessTree 
       pgrpId = statEntries[3];
       session = statEntries[4];
       vmem = statEntries[5];
+      if (statEntries.length > 6) {
+        rssmemPage = statEntries[6];
+      }
+      if (statEntries.length > 7) {
+        utime = statEntries[7];
+        stime = statEntries[8];
+      }
     }
     
     // construct a line that mimics the procfs stat file.
     // all unused numerical entries are set to 0.
     public String getStatLine() {
       return String.format("%s (%s) S %s %s %s 0 0 0" +
-                      " 0 0 0 0 0 0 0 0 0 0 0 0 0 %s 0 0 0" +
+                      " 0 0 0 0 %s %s 0 0 0 0 0 0 0 %s %s 0 0" +
                       " 0 0 0 0 0 0 0 0" +
                       " 0 0 0 0 0", 
-                      pid, name, ppid, pgrpId, session, vmem);
+                      pid, name, ppid, pgrpId, session,
+                      utime, stime, vmem, rssmemPage);
     }
   }
   
   /**
    * A basic test that creates a few process directories and writes
-   * stat files. Verifies that the virtual memory is correctly  
+   * stat files. Verifies that the cpu time and memory is correctly
    * computed.
    * @throws IOException if there was a problem setting up the
    *                      fake procfs directories or files.
    */
-  public void testVirtualMemoryForProcessTree() throws IOException {
+  public void testCpuAndMemoryForProcessTree() throws IOException {
 
     // test processes
     String[] pids = { "100", "200", "300", "400" };
@@ -265,13 +277,13 @@ public class TestProcfsBasedProcessTree 
       // assuming processes 100, 200, 300 are in tree and 400 is not.
       ProcessStatInfo[] procInfos = new ProcessStatInfo[4];
       procInfos[0] = new ProcessStatInfo(new String[] 
-                                  {"100", "proc1", "1", "100", "100", "100000"});
+          {"100", "proc1", "1", "100", "100", "100000", "100", "1000", "200"});
       procInfos[1] = new ProcessStatInfo(new String[] 
-                                  {"200", "proc2", "100", "100", "100", "200000"});
+          {"200", "proc2", "100", "100", "100", "200000", "200", "2000", "400"});
       procInfos[2] = new ProcessStatInfo(new String[] 
-                                  {"300", "proc3", "200", "100", "100", "300000"});
+          {"300", "proc3", "200", "100", "100", "300000", "300", "3000", "600"});
       procInfos[3] = new ProcessStatInfo(new String[] 
-                                  {"400", "proc4", "1", "400", "400", "400000"});
+          {"400", "proc4", "1", "400", "400", "400000", "400", "4000", "800"});
       
       writeStatFiles(procfsRootDir, pids, procInfos);
       
@@ -282,8 +294,36 @@ public class TestProcfsBasedProcessTree 
       processTree.getProcessTree();
       
       // verify cumulative memory
-      assertEquals("Cumulative memory does not match", 
-              Long.parseLong("600000"), processTree.getCumulativeVmem());
+      assertEquals("Cumulative virtual memory does not match", 600000L,
+                   processTree.getCumulativeVmem());
+
+      // verify rss memory
+      long cumuRssMem = ProcfsBasedProcessTree.PAGE_SIZE > 0 ?
+                        600L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
+      assertEquals("Cumulative rss memory does not match",
+                   cumuRssMem, processTree.getCumulativeRssmem());
+
+      // verify cumulative cpu time
+      long cumuCpuTime = ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS > 0 ?
+             7200L * ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS : 0L;
+      assertEquals("Cumulative cpu time does not match",
+                   cumuCpuTime, processTree.getCumulativeCpuTime());
+
+      // test the cpu time again to see if it cumulates
+      procInfos[0] = new ProcessStatInfo(new String[]
+          {"100", "proc1", "1", "100", "100", "100000", "100", "2000", "300"});
+      procInfos[1] = new ProcessStatInfo(new String[]
+          {"200", "proc2", "100", "100", "100", "200000", "200", "3000", "500"});
+      writeStatFiles(procfsRootDir, pids, procInfos);
+
+      // build the process tree.
+      processTree.getProcessTree();
+
+      // verify cumulative cpu time again
+      cumuCpuTime = ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS > 0 ?
+             9400L * ProcfsBasedProcessTree.JIFFY_LENGTH_IN_MILLIS : 0L;
+      assertEquals("Cumulative cpu time does not match",
+                   cumuCpuTime, processTree.getCumulativeCpuTime());
     } finally {
       FileUtil.fullyDelete(procfsRootDir);
     }
@@ -295,7 +335,7 @@ public class TestProcfsBasedProcessTree 
    * @throws IOException if there was a problem setting up the
    *                      fake procfs directories or files.
    */
-  public void testVMemForOlderProcesses() throws IOException {
+  public void testMemForOlderProcesses() throws IOException {
     // initial list of processes
     String[] pids = { "100", "200", "300", "400" };
     // create the fake procfs root directory. 
@@ -309,13 +349,13 @@ public class TestProcfsBasedProcessTree 
       // assuming 100, 200 and 400 are in tree, 300 is not.
       ProcessStatInfo[] procInfos = new ProcessStatInfo[4];
       procInfos[0] = new ProcessStatInfo(new String[] 
-                                  {"100", "proc1", "1", "100", "100", "100000"});
+                        {"100", "proc1", "1", "100", "100", "100000", "100"});
       procInfos[1] = new ProcessStatInfo(new String[] 
-                                  {"200", "proc2", "100", "100", "100", "200000"});
+                        {"200", "proc2", "100", "100", "100", "200000", "200"});
       procInfos[2] = new ProcessStatInfo(new String[] 
-                                  {"300", "proc3", "1", "300", "300", "300000"});
+                        {"300", "proc3", "1", "300", "300", "300000", "300"});
       procInfos[3] = new ProcessStatInfo(new String[] 
-                                  {"400", "proc4", "100", "100", "100", "400000"});
+                        {"400", "proc4", "100", "100", "100", "400000", "400"});
       
       writeStatFiles(procfsRootDir, pids, procInfos);
       
@@ -326,51 +366,69 @@ public class TestProcfsBasedProcessTree 
       processTree.getProcessTree();
       
       // verify cumulative memory
-      assertEquals("Cumulative memory does not match", 
-              Long.parseLong("700000"), processTree.getCumulativeVmem());
-      
+      assertEquals("Cumulative memory does not match",
+                   700000L, processTree.getCumulativeVmem());
+
       // write one more process as child of 100.
       String[] newPids = { "500" };
       setupPidDirs(procfsRootDir, newPids);
       
       ProcessStatInfo[] newProcInfos = new ProcessStatInfo[1];
       newProcInfos[0] = new ProcessStatInfo(new String[]
-                             {"500", "proc5", "100", "100", "100", "500000"});
+                      {"500", "proc5", "100", "100", "100", "500000", "500"});
       writeStatFiles(procfsRootDir, newPids, newProcInfos);
       
-      // check vmem includes the new process.
+      // check memory includes the new process.
       processTree.getProcessTree();
-      assertEquals("Cumulative memory does not include new process",
-              Long.parseLong("1200000"), processTree.getCumulativeVmem());
+      assertEquals("Cumulative vmem does not include new process",
+                   1200000L, processTree.getCumulativeVmem());
+      long cumuRssMem = ProcfsBasedProcessTree.PAGE_SIZE > 0 ?
+                        1200L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
+      assertEquals("Cumulative rssmem does not include new process",
+                   cumuRssMem, processTree.getCumulativeRssmem());
       
       // however processes older than 1 iteration will retain the older value
-      assertEquals("Cumulative memory shouldn't have included new process",
-              Long.parseLong("700000"), processTree.getCumulativeVmem(1));
-      
+      assertEquals("Cumulative vmem shouldn't have included new process",
+                   700000L, processTree.getCumulativeVmem(1));
+      cumuRssMem = ProcfsBasedProcessTree.PAGE_SIZE > 0 ?
+                   700L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
+      assertEquals("Cumulative rssmem shouldn't have included new process",
+                   cumuRssMem, processTree.getCumulativeRssmem(1));
+
       // one more process
       newPids = new String[]{ "600" };
       setupPidDirs(procfsRootDir, newPids);
       
       newProcInfos = new ProcessStatInfo[1];
       newProcInfos[0] = new ProcessStatInfo(new String[]
-                                     {"600", "proc6", "100", "100", "100", "600000"});
+                      {"600", "proc6", "100", "100", "100", "600000", "600"});
       writeStatFiles(procfsRootDir, newPids, newProcInfos);
 
       // refresh process tree
       processTree.getProcessTree();
       
       // processes older than 2 iterations should be same as before.
-      assertEquals("Cumulative memory shouldn't have included new processes",
-          Long.parseLong("700000"), processTree.getCumulativeVmem(2));
-      
+      assertEquals("Cumulative vmem shouldn't have included new processes",
+                   700000L, processTree.getCumulativeVmem(2));
+      cumuRssMem = ProcfsBasedProcessTree.PAGE_SIZE > 0 ?
+                   700L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
+      assertEquals("Cumulative rssmem shouldn't have included new processes",
+                   cumuRssMem, processTree.getCumulativeRssmem(2));
+
       // processes older than 1 iteration should not include new process,
       // but include process 500
-      assertEquals("Cumulative memory shouldn't have included new processes",
-          Long.parseLong("1200000"), processTree.getCumulativeVmem(1));
-      
+      assertEquals("Cumulative vmem shouldn't have included new processes",
+                   1200000L, processTree.getCumulativeVmem(1));
+      cumuRssMem = ProcfsBasedProcessTree.PAGE_SIZE > 0 ?
+                   1200L * ProcfsBasedProcessTree.PAGE_SIZE : 0L;
+      assertEquals("Cumulative rssmem shouldn't have included new processes",
+                   cumuRssMem, processTree.getCumulativeRssmem(1));
+
       // no processes older than 3 iterations, this should be 0
       assertEquals("Getting non-zero vmem for processes older than 3 iterations",
                     0L, processTree.getCumulativeVmem(3));
+      assertEquals("Getting non-zero rssmem for processes older than 3 iterations",
+                    0L, processTree.getCumulativeRssmem(3));
     } finally {
       FileUtil.fullyDelete(procfsRootDir);
     }
@@ -395,24 +453,18 @@ public class TestProcfsBasedProcessTree 
       int numProcesses = pids.length;
       // Processes 200, 300, 400 and 500 are descendants of 100. 600 is not.
       ProcessStatInfo[] procInfos = new ProcessStatInfo[numProcesses];
-      procInfos[0] =
-          new ProcessStatInfo(new String[] { "100", "proc1", "1", "100",
-              "100", "100000" });
-      procInfos[1] =
-          new ProcessStatInfo(new String[] { "200", "proc2", "100", "100",
-              "100", "200000" });
-      procInfos[2] =
-          new ProcessStatInfo(new String[] { "300", "proc3", "200", "100",
-              "100", "300000" });
-      procInfos[3] =
-          new ProcessStatInfo(new String[] { "400", "proc4", "200", "100",
-              "100", "400000" });
-      procInfos[4] =
-          new ProcessStatInfo(new String[] { "500", "proc5", "400", "100",
-              "100", "400000" });
-      procInfos[5] =
-          new ProcessStatInfo(new String[] { "600", "proc6", "1", "1", "1",
-              "400000" });
+      procInfos[0] = new ProcessStatInfo(new String[] {
+          "100", "proc1", "1", "100", "100", "100000", "100", "1000", "200"});
+      procInfos[1] = new ProcessStatInfo(new String[] {
+          "200", "proc2", "100", "100", "100", "200000", "200", "2000", "400"});
+      procInfos[2] = new ProcessStatInfo(new String[] {
+          "300", "proc3", "200", "100", "100", "300000", "300", "3000", "600"});
+      procInfos[3] = new ProcessStatInfo(new String[] {
+          "400", "proc4", "200", "100", "100", "400000", "400", "4000", "800"});
+      procInfos[4] = new ProcessStatInfo(new String[] {
+          "500", "proc5", "400", "100", "100", "400000", "400", "4000", "800"});
+      procInfos[5] = new ProcessStatInfo(new String[] {
+          "600", "proc6", "1", "1", "1", "400000", "400", "4000", "800"});
 
       String[] cmdLines = new String[numProcesses];
       cmdLines[0] = "proc1 arg1 arg2";
@@ -435,15 +487,17 @@ public class TestProcfsBasedProcessTree 
 
       LOG.info("Process-tree dump follows: \n" + processTreeDump);
       assertTrue("Process-tree dump doesn't start with a proper header",
-          processTreeDump.startsWith("\t|- PID PPID PGRPID SESSID CMD_NAME "
-              + "VMEM_USAGE(BYTES) FULL_CMD_LINE\n"));
+          processTreeDump.startsWith("\t|- PID PPID PGRPID SESSID CMD_NAME " +
+          "USER_MODE_TIME(MILLIS) SYSTEM_TIME(MILLIS) VMEM_USAGE(BYTES) " +
+          "RSSMEM_USAGE(PAGES) FULL_CMD_LINE\n"));
       for (int i = 0; i < 5; i++) {
         ProcessStatInfo p = procInfos[i];
         assertTrue(
             "Process-tree dump doesn't contain the cmdLineDump of process "
                 + p.pid, processTreeDump.contains("\t|- " + p.pid + " "
                 + p.ppid + " " + p.pgrpId + " " + p.session + " (" + p.name
-                + ") " + p.vmem + " " + cmdLines[i]));
+                + ") " + p.utime + " " + p.stime + " " + p.vmem + " "
+                + p.rssmemPage + " " + cmdLines[i]));
       }
 
       // 600 should not be in the dump
@@ -452,7 +506,7 @@ public class TestProcfsBasedProcessTree 
           "Process-tree dump shouldn't contain the cmdLineDump of process "
               + p.pid, processTreeDump.contains("\t|- " + p.pid + " " + p.ppid
               + " " + p.pgrpId + " " + p.session + " (" + p.name + ") "
-              + p.vmem + " " + cmdLines[5]));
+              + p.utime + " " + p.stime + " " + p.vmem + " " + cmdLines[5]));
     } finally {
       FileUtil.fullyDelete(procfsRootDir);
     }