You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/05/16 18:13:48 UTC

svn commit: r775492 - in /hadoop/core/trunk: ./ src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/ src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/ src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/statistics/job/

Author: ddas
Date: Sat May 16 16:13:47 2009
New Revision: 775492

URL: http://svn.apache.org/viewvc?rev=775492&view=rev
Log:
HADOOP-5582. Fixes a problem in Hadoop Vaidya to do with reading counters from job history files. Contributed by Suhas Gogate.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/ReadingHDFSFilesAsSideEffect.java
    hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/postex_diagnosis_tests.xml
    hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/statistics/job/JobStatistics.java
    hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/statistics/job/JobStatisticsInterface.java
    hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/vaidya.sh

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=775492&r1=775491&r2=775492&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Sat May 16 16:13:47 2009
@@ -608,6 +608,9 @@
     HADOOP-5855. Fix javac warnings for DisallowedDatanodeException and
     UnsupportedActionException.  (szetszwo)
 
+    HADOOP-5582. Fixes a problem in Hadoop Vaidya to do with reading
+    counters from job history files. (Suhas Gogate via ddas)
+
 Release 0.20.1 - Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/ReadingHDFSFilesAsSideEffect.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/ReadingHDFSFilesAsSideEffect.java?rev=775492&r1=775491&r2=775492&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/ReadingHDFSFilesAsSideEffect.java (original)
+++ hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/ReadingHDFSFilesAsSideEffect.java Sat May 16 16:13:47 2009
@@ -53,6 +53,12 @@
      * Set the this._job
      */
     this._job = job;
+        
+    /*
+     * Read the Normalization Factor
+     */
+    double normF = getInputElementDoubleValue("NormalizationFactor", 2.0);
+    
     
     /*
      * Calculate and return the impact
@@ -64,13 +70,20 @@
      * If side effect HDFS bytes read are >= twice map input bytes impact is treated as
      * maximum.
      */
+    if(job.getLongValue(JobKeys.MAP_INPUT_BYTES) == 0 && job.getLongValue(JobKeys.HDFS_BYTES_READ) != 0) {
+      return (double)1;
+    }
+
+    if (job.getLongValue(JobKeys.HDFS_BYTES_READ) == 0) {
+      return (double)0;
+    }
     
     this._impact = (job.getLongValue(JobKeys.HDFS_BYTES_READ) / job.getLongValue(JobKeys.MAP_INPUT_BYTES));
-    if (this._impact >= 2.0) {
+    if (this._impact >= normF) {
       this._impact = 1;
     }
     else  {
-      this._impact -= 1;
+      this._impact = this._impact/normF;
     }
     
     return this._impact;

Modified: hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/postex_diagnosis_tests.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/postex_diagnosis_tests.xml?rev=775492&r1=775491&r2=775492&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/postex_diagnosis_tests.xml (original)
+++ hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/postexdiagnosis/tests/postex_diagnosis_tests.xml Sat May 16 16:13:47 2009
@@ -86,6 +86,7 @@
   <SuccessThreshold><![CDATA[0.05]]></SuccessThreshold>
   <Prescription><![CDATA[default advice]]></Prescription>
   <InputElement>
+    <NormalizationFactor>2.0</NormalizationFactor>
   </InputElement>
 </DiagnosticTest>
 

Modified: hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/statistics/job/JobStatistics.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/statistics/job/JobStatistics.java?rev=775492&r1=775491&r2=775492&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/statistics/job/JobStatistics.java (original)
+++ hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/statistics/job/JobStatistics.java Sat May 16 16:13:47 2009
@@ -83,21 +83,34 @@
    * Get Job Counters of type long
    */
   public long getLongValue(Enum key) {
-    return Long.parseLong(this._job.get(key));
+    if (this._job.get(key) == null) {
+      return (long)0;
+    }
+    else {
+      return Long.parseLong(this._job.get(key));
+    }
   }
   
   /*
    * Get job Counters of type Double
    */
   public double getDoubleValue(Enum key) {
-    return Double.parseDouble(this._job.get(key));
+    if (this._job.get(key) == null) {
+      return (double)0;
+    } else {
+      return Double.parseDouble(this._job.get(key));
+    }
   }
   
   /* 
    * Get Job Counters of type String
    */
   public String getStringValue(Enum key) {
-    return this._job.get(key);
+	if (this._job.get(key) == null) {
+	  return "";
+	} else {
+      return this._job.get(key);
+	}
   }
   
   /*
@@ -139,7 +152,7 @@
     this._jobConf = jobConf;
     this._jobInfo = jobInfo;
     this._job = new Hashtable<Enum, String>();
-    populate_Job(this._job, this._jobInfo.getValues());
+    populate_Job(this._job, this._jobInfo.getValues());  
     populate_MapReduceTaskLists(this._mapTaskList, this._reduceTaskList, this._jobInfo.getAllTasks());
   }
   
@@ -175,6 +188,7 @@
         Map.Entry<JobHistory.Keys, String> mtc = kv.next();
         JobHistory.Keys key = mtc.getKey();
         String value = mtc.getValue();
+        //System.out.println("JobHistory.MapKeys."+key+": "+value);
         switch (key) {
         case TASKID: mapT.setValue(MapTaskKeys.TASK_ID, value); break;
         case TASK_ATTEMPT_ID: mapT.setValue(MapTaskKeys.ATTEMPT_ID, value); break;
@@ -184,12 +198,16 @@
         case START_TIME: mapT.setValue(MapTaskKeys.START_TIME, value); break;
         case FINISH_TIME: mapT.setValue(MapTaskKeys.FINISH_TIME, value); break;
         case SPLITS: mapT.setValue(MapTaskKeys.SPLITS, value); break;
+        case TRACKER_NAME: mapT.setValue(MapTaskKeys.TRACKER_NAME, value); break;
+        case STATE_STRING: mapT.setValue(MapTaskKeys.STATE_STRING, value); break;
+        case HTTP_PORT: mapT.setValue(MapTaskKeys.HTTP_PORT, value); break;
+        case ERROR: mapT.setValue(MapTaskKeys.ERROR, value); break;
         case COUNTERS:
           value.concat(",");
           parseAndAddMapTaskCounters(mapT, value);
           mapTaskList.add(mapT);
           break;
-        default: System.out.println("JobHistory.Keys."+key+" : NOT INCLUDED IN PERFORMANCE ADVISOR MAP COUNTERS");
+        default: System.out.println("JobHistory.MapKeys."+key+" : NOT INCLUDED IN PERFORMANCE ADVISOR MAP COUNTERS");
           break;
         }
       }
@@ -214,6 +232,7 @@
         Map.Entry<JobHistory.Keys, String> rtc = kv.next();
         JobHistory.Keys key = rtc.getKey();
         String value = rtc.getValue();
+        //System.out.println("JobHistory.ReduceKeys."+key+": "+value);
         switch (key) {
         case TASKID: reduceT.setValue(ReduceTaskKeys.TASK_ID, value); break;
         case TASK_ATTEMPT_ID: reduceT.setValue(ReduceTaskKeys.ATTEMPT_ID, value); break;
@@ -224,19 +243,24 @@
         case FINISH_TIME: reduceT.setValue(ReduceTaskKeys.FINISH_TIME, value); break;
         case SHUFFLE_FINISHED: reduceT.setValue(ReduceTaskKeys.SHUFFLE_FINISH_TIME, value); break;
         case SORT_FINISHED: reduceT.setValue(ReduceTaskKeys.SORT_FINISH_TIME, value); break;
+        case SPLITS: reduceT.setValue(ReduceTaskKeys.SPLITS, value); break;
+        case TRACKER_NAME: reduceT.setValue(ReduceTaskKeys.TRACKER_NAME, value); break;
+        case STATE_STRING: reduceT.setValue(ReduceTaskKeys.STATE_STRING, value); break;
+        case HTTP_PORT: reduceT.setValue(ReduceTaskKeys.HTTP_PORT, value); break;
         case COUNTERS:
           value.concat(",");
           parseAndAddReduceTaskCounters(reduceT, value);
           reduceTaskList.add(reduceT);
           break;
-        default: System.out.println("JobHistory.Keys."+key+" : NOT INCLUDED IN PERFORMANCE ADVISOR REDUCE COUNTERS");
+        default: System.out.println("JobHistory.ReduceKeys."+key+" : NOT INCLUDED IN PERFORMANCE ADVISOR REDUCE COUNTERS");
           break;
         }
         
         // Add number of task attempts
         reduceT.setValue(ReduceTaskKeys.NUM_ATTEMPTS, (new Integer(task.getTaskAttempts().size())).toString());
       }
-      } else if (task.get(Keys.TASK_TYPE).equals("CLEANUP")) {
+      } else if (task.get(Keys.TASK_TYPE).equals("CLEANUP") ||
+                 task.get(Keys.TASK_TYPE).equals("SETUP")) {
         //System.out.println("INFO: IGNORING TASK TYPE : "+task.get(Keys.TASK_TYPE));
       } else {
         System.out.println("UNKNOWN TASK TYPE : "+task.get(Keys.TASK_TYPE));
@@ -275,6 +299,7 @@
       Map.Entry<JobHistory.Keys, String> entry = (Map.Entry<JobHistory.Keys, String>) kv.next();
       JobHistory.Keys key = entry.getKey();
       String value = entry.getValue();
+      //System.out.println("JobHistory.JobKeys."+key+": "+value);
       switch (key) {
       case JOBTRACKERID: job.put(JobKeys.JOBTRACKERID, value); break;
       //case START_TIME: job.put(JobKeys., value); break;
@@ -292,6 +317,7 @@
       case FINISHED_MAPS: job.put(JobKeys.FINISHED_MAPS, value); break;
       case FINISHED_REDUCES: job.put(JobKeys.FINISHED_REDUCES, value); break;
       case JOB_STATUS: job.put(JobKeys.STATUS, value); break;
+      case JOB_PRIORITY: job.put(JobKeys.JOB_PRIORITY, value); break;
       case COUNTERS:
         value.concat(",");
         parseAndAddJobCounters(job, value);
@@ -306,47 +332,59 @@
    * Parse and add the job counters
    */
   private void parseAndAddJobCounters(Hashtable<Enum, String> job, String counters) throws ParseException {
-    Matcher m = _pattern.matcher(counters);
-    while(m.find()){
-      String ctuple = m.group(0);
-      //String ctuple = c1tuple.substring(0, c1tuple.length()-1);
-      String []parts = ctuple.split(":");
-      if (parts[0].equals("File Systems.Local bytes read")) {
-        job.put(JobKeys.LOCAL_BYTES_READ, parts[1]);
-      } else if (parts[0].equals("File Systems.Local bytes written")) {
-        job.put(JobKeys.LOCAL_BYTES_WRITTEN, parts[1]);
-      } else if (parts[0].equals("File Systems.HDFS bytes read")) {
-        job.put(JobKeys.HDFS_BYTES_READ, parts[1]);
-      } else if (parts[0].equals("File Systems.HDFS bytes written")) {
-        job.put(JobKeys.HDFS_BYTES_WRITTEN, parts[1]);
-      } else if (parts[0].equals("Job Counters .Launched map tasks")) {
-        job.put(JobKeys.LAUNCHED_MAPS, parts[1]);
-      } else if (parts[0].equals("Job Counters .Launched reduce tasks")) {
-        job.put(JobKeys.LAUNCHED_REDUCES, parts[1]);
-      } else if (parts[0].equals("Job Counters .Data-local map tasks")) {
-        job.put(JobKeys.DATALOCAL_MAPS, parts[1]);
-      } else if (parts[0].equals("Job Counters .Rack-local map tasks")) {
-        job.put(JobKeys.RACKLOCAL_MAPS, parts[1]);
-      } else if (parts[0].equals("Map-Reduce Framework.Map input records")) {
-        job.put(JobKeys.MAP_INPUT_RECORDS, parts[1]);
-      } else if (parts[0].equals("Map-Reduce Framework.Map output records")) {
-        job.put(JobKeys.MAP_OUTPUT_RECORDS, parts[1]);
-      } else if (parts[0].equals("Map-Reduce Framework.Map input bytes")) {
-        job.put(JobKeys.MAP_INPUT_BYTES, parts[1]);
-      } else if (parts[0].equals("Map-Reduce Framework.Map output bytes")) {
-        job.put(JobKeys.MAP_OUTPUT_BYTES, parts[1]);
-      } else if (parts[0].equals("Map-Reduce Framework.Combine input records")) {
-        job.put(JobKeys.COMBINE_INPUT_RECORDS, parts[1]);
-      } else if (parts[0].equals("Map-Reduce Framework.Combine output records")) {
-        job.put(JobKeys.COMBINE_OUTPUT_RECORDS, parts[1]);
-      } else if (parts[0].equals("Map-Reduce Framework.Reduce input groups")) {
-        job.put(JobKeys.REDUCE_INPUT_GROUPS, parts[1]);
-      } else if (parts[0].equals("Map-Reduce Framework.Reduce input records")) {
-        job.put(JobKeys.REDUCE_INPUT_RECORDS, parts[1]);
-      } else if (parts[0].equals("Map-Reduce Framework.Reduce output records")) {
-        job.put(JobKeys.REDUCE_OUTPUT_RECORDS, parts[1]);
-      } else {
-        System.out.println("Pattern:<"+ctuple+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR");
+    Counters cnt = Counters.fromEscapedCompactString(counters);
+    for (java.util.Iterator<Counters.Group> grps = cnt.iterator(); grps.hasNext(); ) {
+      Counters.Group grp = grps.next();
+      //String groupname = "<" + grp.getName() + ">::<" + grp.getDisplayName() + ">";
+      for (java.util.Iterator<Counters.Counter> mycounters = grp.iterator(); mycounters.hasNext(); ) {
+        Counters.Counter counter = mycounters.next();
+        //String countername = "<"+counter.getName()+">::<"+counter.getDisplayName()+">::<"+counter.getValue()+">";
+        //System.out.println("groupName:"+groupname+",countername: "+countername);
+        String countername = grp.getDisplayName()+"."+counter.getDisplayName();
+        String value = (new Long(counter.getValue())).toString();
+        String[] parts = {countername,value};
+        //System.out.println("part0:"+parts[0]+",:part1 "+parts[1]);
+        if (parts[0].equals("FileSystemCounters.FILE_BYTES_READ")) {
+          job.put(JobKeys.LOCAL_BYTES_READ, parts[1]);
+        } else if (parts[0].equals("FileSystemCounters.FILE_BYTES_WRITTEN")) {
+          job.put(JobKeys.LOCAL_BYTES_WRITTEN, parts[1]);
+        } else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_READ")) {
+          job.put(JobKeys.HDFS_BYTES_READ, parts[1]);
+        } else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_WRITTEN")) {
+          job.put(JobKeys.HDFS_BYTES_WRITTEN, parts[1]);
+        } else if (parts[0].equals("Job Counters .Launched map tasks")) {
+          job.put(JobKeys.LAUNCHED_MAPS, parts[1]);
+        } else if (parts[0].equals("Job Counters .Launched reduce tasks")) {
+          job.put(JobKeys.LAUNCHED_REDUCES, parts[1]);
+        } else if (parts[0].equals("Job Counters .Data-local map tasks")) {
+          job.put(JobKeys.DATALOCAL_MAPS, parts[1]);
+        } else if (parts[0].equals("Job Counters .Rack-local map tasks")) {
+          job.put(JobKeys.RACKLOCAL_MAPS, parts[1]);
+        } else if (parts[0].equals("Map-Reduce Framework.Map input records")) {
+          job.put(JobKeys.MAP_INPUT_RECORDS, parts[1]);
+        } else if (parts[0].equals("Map-Reduce Framework.Map output records")) {
+          job.put(JobKeys.MAP_OUTPUT_RECORDS, parts[1]);
+        } else if (parts[0].equals("Map-Reduce Framework.Map input bytes")) {
+          job.put(JobKeys.MAP_INPUT_BYTES, parts[1]);
+        } else if (parts[0].equals("Map-Reduce Framework.Map output bytes")) {
+          job.put(JobKeys.MAP_OUTPUT_BYTES, parts[1]);
+        } else if (parts[0].equals("Map-Reduce Framework.Combine input records")) {
+          job.put(JobKeys.COMBINE_INPUT_RECORDS, parts[1]);
+        } else if (parts[0].equals("Map-Reduce Framework.Combine output records")) {
+          job.put(JobKeys.COMBINE_OUTPUT_RECORDS, parts[1]);
+        } else if (parts[0].equals("Map-Reduce Framework.Reduce input groups")) {
+          job.put(JobKeys.REDUCE_INPUT_GROUPS, parts[1]);
+        } else if (parts[0].equals("Map-Reduce Framework.Reduce input records")) {
+          job.put(JobKeys.REDUCE_INPUT_RECORDS, parts[1]);
+        } else if (parts[0].equals("Map-Reduce Framework.Reduce output records")) {
+          job.put(JobKeys.REDUCE_OUTPUT_RECORDS, parts[1]);
+        } else if (parts[0].equals("Map-Reduce Framework.Spilled Records")) {
+          job.put(JobKeys.SPILLED_RECORDS, parts[1]);
+        } else if (parts[0].equals("Map-Reduce Framework.Reduce shuffle bytes")) {
+          job.put(JobKeys.SHUFFLE_BYTES, parts[1]);
+        } else {
+          System.out.println("JobCounterKey:<"+parts[0]+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR");
+        }
       }
     }  
   }
@@ -354,67 +392,89 @@
   /*
    * Parse and add the Map task counters
    */
-  private void parseAndAddMapTaskCounters(MapTaskStatistics mapTask, String counters) {
-    Matcher m = _pattern.matcher(counters);
-    while(m.find()){
-      String ctuple = m.group(0);
-      //String ctuple = c1tuple.substring(0, c1tuple.length()-1);
-      String []parts = ctuple.split(":");
-      if (parts[0].equals("File Systems.Local bytes read")) {
-        mapTask.setValue(MapTaskKeys.LOCAL_BYTES_READ, parts[1]);
-      } else if (parts[0].equals("File Systems.Local bytes written")) {
-        mapTask.setValue(MapTaskKeys.LOCAL_BYTES_WRITTEN, parts[1]);
-      } else if (parts[0].equals("File Systems.HDFS bytes read")) {
-        mapTask.setValue(MapTaskKeys.HDFS_BYTES_READ, parts[1]);
-      } else if (parts[0].equals("File Systems.HDFS bytes written")) {
-        mapTask.setValue(MapTaskKeys.HDFS_BYTES_WRITTEN, parts[1]);
-      } else if (parts[0].equals("Map-Reduce Framework.Map input records")) {
-        mapTask.setValue(MapTaskKeys.INPUT_RECORDS, parts[1]);
-      } else if (parts[0].equals("Map-Reduce Framework.Map output records")) {
-        mapTask.setValue(MapTaskKeys.OUTPUT_RECORDS, parts[1]);
-      } else if (parts[0].equals("Map-Reduce Framework.Map input bytes")) {
-        mapTask.setValue(MapTaskKeys.INPUT_BYTES, parts[1]);
-      } else if (parts[0].equals("Map-Reduce Framework.Map output bytes")) {
-        mapTask.setValue(MapTaskKeys.OUTPUT_BYTES, parts[1]);
-      } else if (parts[0].equals("Map-Reduce Framework.Combine input records")) {
-        mapTask.setValue(MapTaskKeys.COMBINE_INPUT_RECORDS, parts[1]);
-      } else if (parts[0].equals("Map-Reduce Framework.Combine output records")) {
-        mapTask.setValue(MapTaskKeys.COMBINE_OUTPUT_RECORDS, parts[1]);
-      } else {
-        System.out.println("Pattern:<"+ctuple+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR MAP TASK");
-      }
-    }    
+  private void parseAndAddMapTaskCounters(MapTaskStatistics mapTask, String counters) throws ParseException {
+    Counters cnt = Counters.fromEscapedCompactString(counters);
+    for (java.util.Iterator<Counters.Group> grps = cnt.iterator(); grps.hasNext(); ) {
+      Counters.Group grp = grps.next();
+      //String groupname = "<" + grp.getName() + ">::<" + grp.getDisplayName() + ">";
+      for (java.util.Iterator<Counters.Counter> mycounters = grp.iterator(); mycounters.hasNext(); ) {
+        Counters.Counter counter = mycounters.next();
+        //String countername = "<"+counter.getName()+">::<"+counter.getDisplayName()+">::<"+counter.getValue()+">";
+        //System.out.println("groupName:"+groupname+",countername: "+countername);
+        String countername = grp.getDisplayName()+"."+counter.getDisplayName();
+        String value = (new Long(counter.getValue())).toString();
+        String[] parts = {countername,value};
+        //System.out.println("part0:"+parts[0]+",:part1 "+parts[1]);
+        if (parts[0].equals("FileSystemCounters.FILE_BYTES_READ")) {
+          mapTask.setValue(MapTaskKeys.LOCAL_BYTES_READ, parts[1]);
+        } else if (parts[0].equals("FileSystemCounters.FILE_BYTES_WRITTEN")) {
+          mapTask.setValue(MapTaskKeys.LOCAL_BYTES_WRITTEN, parts[1]);
+        } else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_READ")) {
+          mapTask.setValue(MapTaskKeys.HDFS_BYTES_READ, parts[1]);
+        } else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_WRITTEN")) {
+          mapTask.setValue(MapTaskKeys.HDFS_BYTES_WRITTEN, parts[1]);
+        } else if (parts[0].equals("Map-Reduce Framework.Map input records")) {
+          mapTask.setValue(MapTaskKeys.INPUT_RECORDS, parts[1]);
+        } else if (parts[0].equals("Map-Reduce Framework.Map output records")) {
+          mapTask.setValue(MapTaskKeys.OUTPUT_RECORDS, parts[1]);
+        } else if (parts[0].equals("Map-Reduce Framework.Map input bytes")) {
+          mapTask.setValue(MapTaskKeys.INPUT_BYTES, parts[1]);
+        } else if (parts[0].equals("Map-Reduce Framework.Map output bytes")) {
+          mapTask.setValue(MapTaskKeys.OUTPUT_BYTES, parts[1]);
+        } else if (parts[0].equals("Map-Reduce Framework.Combine input records")) {
+          mapTask.setValue(MapTaskKeys.COMBINE_INPUT_RECORDS, parts[1]);
+        } else if (parts[0].equals("Map-Reduce Framework.Combine output records")) {
+          mapTask.setValue(MapTaskKeys.COMBINE_OUTPUT_RECORDS, parts[1]);
+        } else if (parts[0].equals("Map-Reduce Framework.Spilled Records")) {
+          mapTask.setValue(MapTaskKeys.SPILLED_RECORDS, parts[1]);
+        } else {
+          System.out.println("MapCounterKey:<"+parts[0]+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR MAP TASK");
+        }
+      }    
+    }
   }
   
   /*
    * Parse and add the reduce task counters
    */
-  private void parseAndAddReduceTaskCounters(ReduceTaskStatistics reduceTask, String counters) {
-    Matcher m = _pattern.matcher(counters);
-    while(m.find()){
-      String ctuple = m.group(0);
-      //String ctuple = c1tuple.substring(0, c1tuple.length()-1);
-      String []parts = ctuple.split(":");
-      if (parts[0].equals("File Systems.Local bytes read")) {
-        reduceTask.setValue(ReduceTaskKeys.LOCAL_BYTES_READ, parts[1]);
-      } else if (parts[0].equals("File Systems.Local bytes written")) {
-        reduceTask.setValue(ReduceTaskKeys.LOCAL_BYTES_WRITTEN, parts[1]);
-      } else if (parts[0].equals("File Systems.HDFS bytes read")) {
-        reduceTask.setValue(ReduceTaskKeys.HDFS_BYTES_READ, parts[1]);
-      } else if (parts[0].equals("File Systems.HDFS bytes written")) {
-        reduceTask.setValue(ReduceTaskKeys.HDFS_BYTES_WRITTEN, parts[1]);
-      } else if (parts[0].equals("Map-Reduce Framework.Reduce input records")) {
-        reduceTask.setValue(ReduceTaskKeys.INPUT_RECORDS, parts[1]);
-      } else if (parts[0].equals("Map-Reduce Framework.Reduce output records")) {
-        reduceTask.setValue(ReduceTaskKeys.OUTPUT_RECORDS, parts[1]);
-      } else if (parts[0].equals("Map-Reduce Framework.Combine input records")) {
-        reduceTask.setValue(ReduceTaskKeys.COMBINE_INPUT_RECORDS, parts[1]);
-      } else if (parts[0].equals("Map-Reduce Framework.Combine output records")) {
-        reduceTask.setValue(ReduceTaskKeys.COMBINE_OUTPUT_RECORDS, parts[1]);
-      } else if (parts[0].equals("Map-Reduce Framework.Reduce input groups")) {
-        reduceTask.setValue(ReduceTaskKeys.INPUT_GROUPS, parts[1]);
-      } else {
-        System.out.println("Pattern:<"+ctuple+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR MAP TASK");
+  private void parseAndAddReduceTaskCounters(ReduceTaskStatistics reduceTask, String counters) throws ParseException {
+    Counters cnt = Counters.fromEscapedCompactString(counters);
+    for (java.util.Iterator<Counters.Group> grps = cnt.iterator(); grps.hasNext(); ) {
+      Counters.Group grp = grps.next();
+      //String groupname = "<" + grp.getName() + ">::<" + grp.getDisplayName() + ">";
+      for (java.util.Iterator<Counters.Counter> mycounters = grp.iterator(); mycounters.hasNext(); ) {
+        Counters.Counter counter = mycounters.next();
+        //String countername = "<"+counter.getName()+">::<"+counter.getDisplayName()+">::<"+counter.getValue()+">";
+        //System.out.println("groupName:"+groupname+",countername: "+countername);
+        String countername = grp.getDisplayName()+"."+counter.getDisplayName();
+        String value = (new Long(counter.getValue())).toString();
+        String[] parts = {countername,value};
+        //System.out.println("part0:"+parts[0]+",:part1 "+parts[1]);
+        if (parts[0].equals("FileSystemCounters.FILE_BYTES_READ")) {
+          reduceTask.setValue(ReduceTaskKeys.LOCAL_BYTES_READ, parts[1]);
+        } else if (parts[0].equals("FileSystemCounters.FILE_BYTES_WRITTEN")) {
+          reduceTask.setValue(ReduceTaskKeys.LOCAL_BYTES_WRITTEN, parts[1]);
+        } else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_READ")) {
+          reduceTask.setValue(ReduceTaskKeys.HDFS_BYTES_READ, parts[1]);
+        } else if (parts[0].equals("FileSystemCounters.HDFS_BYTES_WRITTEN")) {
+          reduceTask.setValue(ReduceTaskKeys.HDFS_BYTES_WRITTEN, parts[1]);
+        } else if (parts[0].equals("Map-Reduce Framework.Reduce input records")) {
+          reduceTask.setValue(ReduceTaskKeys.INPUT_RECORDS, parts[1]);
+        } else if (parts[0].equals("Map-Reduce Framework.Reduce output records")) {
+          reduceTask.setValue(ReduceTaskKeys.OUTPUT_RECORDS, parts[1]);
+        } else if (parts[0].equals("Map-Reduce Framework.Combine input records")) {
+          reduceTask.setValue(ReduceTaskKeys.COMBINE_INPUT_RECORDS, parts[1]);
+        } else if (parts[0].equals("Map-Reduce Framework.Combine output records")) {
+          reduceTask.setValue(ReduceTaskKeys.COMBINE_OUTPUT_RECORDS, parts[1]);
+        } else if (parts[0].equals("Map-Reduce Framework.Reduce input groups")) {
+          reduceTask.setValue(ReduceTaskKeys.INPUT_GROUPS, parts[1]);
+        } else if (parts[0].equals("Map-Reduce Framework.Spilled Records")) {
+          reduceTask.setValue(ReduceTaskKeys.SPILLED_RECORDS, parts[1]);
+        } else if (parts[0].equals("Map-Reduce Framework.Reduce shuffle bytes")) {
+          reduceTask.setValue(ReduceTaskKeys.SHUFFLE_BYTES, parts[1]);
+        } else {
+          System.out.println("ReduceCounterKey:<"+parts[0]+"> ==> NOT INCLUDED IN PERFORMANCE ADVISOR REDUCE TASK");
+        }
       }
     }    
   }

Modified: hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/statistics/job/JobStatisticsInterface.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/statistics/job/JobStatisticsInterface.java?rev=775492&r1=775491&r2=775492&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/statistics/job/JobStatisticsInterface.java (original)
+++ hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/statistics/job/JobStatisticsInterface.java Sat May 16 16:13:47 2009
@@ -98,7 +98,7 @@
     HDFS_BYTES_WRITTEN, LOCAL_BYTES_READ, LOCAL_BYTES_WRITTEN, COMBINE_OUTPUT_RECORDS,
     COMBINE_INPUT_RECORDS, REDUCE_INPUT_GROUPS, REDUCE_INPUT_RECORDS, REDUCE_OUTPUT_RECORDS,
     MAP_INPUT_RECORDS, MAP_OUTPUT_RECORDS, MAP_INPUT_BYTES, MAP_OUTPUT_BYTES, MAP_HDFS_BYTES_WRITTEN,
-    JOBCONF
+    JOBCONF, JOB_PRIORITY, SHUFFLE_BYTES, SPILLED_RECORDS
    }
   
   /**
@@ -108,7 +108,7 @@
     TASK_ID, TASK_TYPE, START_TIME, STATUS, FINISH_TIME, HDFS_BYTES_READ, HDFS_BYTES_WRITTEN,
     LOCAL_BYTES_READ, LOCAL_BYTES_WRITTEN, COMBINE_OUTPUT_RECORDS, COMBINE_INPUT_RECORDS, 
     OUTPUT_RECORDS, INPUT_RECORDS, INPUT_BYTES, OUTPUT_BYTES, NUM_ATTEMPTS, ATTEMPT_ID,
-    HOSTNAME, SPLITS
+    HOSTNAME, SPLITS, SPILLED_RECORDS, TRACKER_NAME, STATE_STRING, HTTP_PORT, ERROR
   }
   
   /**
@@ -119,6 +119,7 @@
     TASK_ID, TASK_TYPE, START_TIME, STATUS, FINISH_TIME, HDFS_BYTES_READ, HDFS_BYTES_WRITTEN,
     LOCAL_BYTES_READ, LOCAL_BYTES_WRITTEN, COMBINE_OUTPUT_RECORDS, COMBINE_INPUT_RECORDS, 
     OUTPUT_RECORDS, INPUT_RECORDS, NUM_ATTEMPTS, ATTEMPT_ID, HOSTNAME, SHUFFLE_FINISH_TIME,
-    SORT_FINISH_TIME, INPUT_GROUPS
+    SORT_FINISH_TIME, INPUT_GROUPS, TRACKER_NAME, STATE_STRING, HTTP_PORT, SPLITS, SHUFFLE_BYTES, 
+    SPILLED_RECORDS
   }
 }

Modified: hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/vaidya.sh
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/vaidya.sh?rev=775492&r1=775491&r2=775492&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/vaidya.sh (original)
+++ hadoop/core/trunk/src/contrib/vaidya/src/java/org/apache/hadoop/vaidya/vaidya.sh Sat May 16 16:13:47 2009
@@ -44,4 +44,4 @@
 
 hadoopVersion=`$HADOOP_HOME/bin/hadoop version | awk 'BEGIN { RS = "" ; FS = "\n" } ; { print $1 }' | awk '{print $2}'`
 
-$JAVA_HOME/bin/java -classpath $HADOOP_HOME/hadoop-${hadoopVersion}-core.jar:$HADOOP_HOME/contrib/vaidya/hadoop-${hadoopVersion}-vaidya.jar:$HADOOP_HOME/lib/commons-logging-1.0.4.jar:${CLASSPATH} org.apache.hadoop.vaidya.postexdiagnosis.PostExPerformanceDiagnoser $@
+$JAVA_HOME/bin/java -Xmx1024m -classpath $HADOOP_HOME/hadoop-${hadoopVersion}-core.jar:$HADOOP_HOME/contrib/vaidya/hadoop-${hadoopVersion}-vaidya.jar:$HADOOP_HOME/lib/commons-logging-1.0.4.jar:${CLASSPATH} org.apache.hadoop.vaidya.postexdiagnosis.PostExPerformanceDiagnoser $@