You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by an...@apache.org on 2014/05/28 17:27:23 UTC

svn commit: r1598042 - in /pig/branches/branch-0.13: CHANGES.txt src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java test/org/apache/pig/test/TestMRJobStats.java

Author: aniket486
Date: Wed May 28 15:27:23 2014
New Revision: 1598042

URL: http://svn.apache.org/r1598042
Log:
PIG-3958: TestMRJobStats is broken in 0.13 and trunk (aniket486)

Modified:
    pig/branches/branch-0.13/CHANGES.txt
    pig/branches/branch-0.13/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
    pig/branches/branch-0.13/test/org/apache/pig/test/TestMRJobStats.java

Modified: pig/branches/branch-0.13/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.13/CHANGES.txt?rev=1598042&r1=1598041&r2=1598042&view=diff
==============================================================================
--- pig/branches/branch-0.13/CHANGES.txt (original)
+++ pig/branches/branch-0.13/CHANGES.txt Wed May 28 15:27:23 2014
@@ -143,6 +143,8 @@ PIG-3882: Multiquery off mode execution 
  
 BUG FIXES
 
+PIG-3958: TestMRJobStats is broken in 0.13 and trunk (aniket486)
+
 PIG-3949: HiveColumnarStorage compile failure with Hive 0.14.0 (daijy)
 
 PIG-3960: Compile fail against Hadoop 2.4.0 after PIG-3913 (daijy)

Modified: pig/branches/branch-0.13/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.13/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java?rev=1598042&r1=1598041&r2=1598042&view=diff
==============================================================================
--- pig/branches/branch-0.13/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java (original)
+++ pig/branches/branch-0.13/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java Wed May 28 15:27:23 2014
@@ -331,6 +331,22 @@ public final class MRJobStats extends Jo
         }
     }
 
+    private class TaskStat {
+        int size;
+        long max;
+        long min;
+        long avg;
+        long median;
+
+        public TaskStat(int size, long max, long min, long avg, long median) {
+            this.size = size;
+            this.max = max;
+            this.min = min;
+            this.avg = avg;
+            this.median = median;
+        }
+    }
+
     void addMapReduceStatistics(Job job) {
         TaskReport[] maps = null;
         try {
@@ -338,26 +354,42 @@ public final class MRJobStats extends Jo
         } catch (IOException e) {
             LOG.warn("Failed to get map task report", e);
         }
-        if (maps != null && maps.length > 0) {
-            int size = maps.length;
-            long max = 0;
-            long min = Long.MAX_VALUE;
-            long median = 0;
-            long total = 0;
-            long durations[] = new long[size];
-
-            for (int i = 0; i < maps.length; i++) {
-                TaskReport rpt = maps[i];
-                long duration = rpt.getFinishTime() - rpt.getStartTime();
-                durations[i] = duration;
-                max = (duration > max) ? duration : max;
-                min = (duration < min) ? duration : min;
-                total += duration;
-            }
-            long avg = total / size;
+        TaskReport[] reduces = null;
+        try {
+            reduces = HadoopShims.getTaskReports(job, TaskType.REDUCE);
+        } catch (IOException e) {
+            LOG.warn("Failed to get reduce task report", e);
+        }
+        addMapReduceStatistics(maps, reduces);
+    }
+
+    private TaskStat getTaskStat(TaskReport[] tasks) {
+        int size = tasks.length;
+        long max = 0;
+        long min = Long.MAX_VALUE;
+        long median = 0;
+        long total = 0;
+        long durations[] = new long[size];
 
-            median = calculateMedianValue(durations);
-            setMapStat(size, max, min, avg, median);
+        for (int i = 0; i < tasks.length; i++) {
+            TaskReport rpt = tasks[i];
+            long duration = rpt.getFinishTime() - rpt.getStartTime();
+            durations[i] = duration;
+            max = (duration > max) ? duration : max;
+            min = (duration < min) ? duration : min;
+            total += duration;
+        }
+        long avg = total / size;
+
+        median = calculateMedianValue(durations);
+
+        return new TaskStat(size, max, min, avg, median);
+    }
+
+    private void addMapReduceStatistics(TaskReport[] maps, TaskReport[] reduces) {
+        if (maps != null && maps.length > 0) {
+            TaskStat st = getTaskStat(maps);
+            setMapStat(st.size, st.max, st.min, st.avg, st.median);
         } else {
             int m = conf.getInt("mapred.map.tasks", 1);
             if (m > 0) {
@@ -365,31 +397,9 @@ public final class MRJobStats extends Jo
             }
         }
 
-        TaskReport[] reduces = null;
-        try {
-            reduces = HadoopShims.getTaskReports(job, TaskType.REDUCE);
-        } catch (IOException e) {
-            LOG.warn("Failed to get reduce task report", e);
-        }
         if (reduces != null && reduces.length > 0) {
-            int size = reduces.length;
-            long max = 0;
-            long min = Long.MAX_VALUE;
-            long median = 0;
-            long total = 0;
-            long durations[] = new long[size];
-
-            for (int i = 0; i < reduces.length; i++) {
-                TaskReport rpt = reduces[i];
-                long duration = rpt.getFinishTime() - rpt.getStartTime();
-                durations[i] = duration;
-                max = (duration > max) ? duration : max;
-                min = (duration < min) ? duration : min;
-                total += duration;
-            }
-            long avg = total / size;
-            median = calculateMedianValue(durations);
-            setReduceStat(size, max, min, avg, median);
+            TaskStat st = getTaskStat(reduces);
+            setReduceStat(st.size, st.max, st.min, st.avg, st.median);
         } else {
             int m = conf.getInt("mapred.reduce.tasks", 1);
             if (m > 0) {

Modified: pig/branches/branch-0.13/test/org/apache/pig/test/TestMRJobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.13/test/org/apache/pig/test/TestMRJobStats.java?rev=1598042&r1=1598041&r2=1598042&view=diff
==============================================================================
--- pig/branches/branch-0.13/test/org/apache/pig/test/TestMRJobStats.java (original)
+++ pig/branches/branch-0.13/test/org/apache/pig/test/TestMRJobStats.java Wed May 28 15:27:23 2014
@@ -30,7 +30,6 @@ import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
 import org.apache.hadoop.mapred.JobID;
 import org.apache.hadoop.mapred.TaskReport;
 import org.apache.hadoop.mapreduce.Job;
@@ -111,8 +110,6 @@ public class TestMRJobStats {
 
     @Test
     public void testMedianMapReduceTime() throws Exception {
-
-        JobConf jobConf = new JobConf();
         JobClient jobClient = Mockito.mock(JobClient.class);
 
         // mock methods to return the predefined map and reduce task reports
@@ -124,10 +121,10 @@ public class TestMRJobStats {
         getJobStatsMethod("setId", JobID.class).invoke(jobStats, jobID);
         jobStats.setSuccessful(true);
 
-        getJobStatsMethod("addMapReduceStatistics", JobClient.class, Configuration.class)
-            .invoke(jobStats, jobClient, jobConf);
-        String msg = (String)getJobStatsMethod("getDisplayString", boolean.class)
-            .invoke(jobStats, false);
+        getJobStatsMethod("addMapReduceStatistics", TaskReport[].class, TaskReport[].class)
+            .invoke(jobStats, mapTaskReports, reduceTaskReports);
+        String msg = (String)getJobStatsMethod("getDisplayString")
+            .invoke(jobStats);
 
         System.out.println(JobStats.SUCCESS_HEADER);
         System.out.println(msg);
@@ -149,21 +146,15 @@ public class TestMRJobStats {
         Mockito.when(reduceTaskReports[0].getStartTime()).thenReturn(500L * ONE_THOUSAND);
         Mockito.when(reduceTaskReports[0].getFinishTime()).thenReturn(700L * ONE_THOUSAND);
 
-        JobConf jobConf = new JobConf();
-        JobClient jobClient = Mockito.mock(JobClient.class);
-
-        Mockito.when(jobClient.getMapTaskReports(jobID)).thenReturn(mapTaskReports);
-        Mockito.when(jobClient.getReduceTaskReports(jobID)).thenReturn(reduceTaskReports);
-
         PigStats.JobGraph jobGraph = new PigStats.JobGraph();
         MRJobStats jobStats = createJobStats("JobStatsTest", jobGraph);
         getJobStatsMethod("setId", JobID.class).invoke(jobStats, jobID);
         jobStats.setSuccessful(true);
 
-        getJobStatsMethod("addMapReduceStatistics", JobClient.class, Configuration.class)
-            .invoke(jobStats, jobClient, jobConf);
-        String msg = (String)getJobStatsMethod("getDisplayString", boolean.class)
-            .invoke(jobStats, false);
+        getJobStatsMethod("addMapReduceStatistics", TaskReport[].class, TaskReport[].class)
+            .invoke(jobStats, mapTaskReports, reduceTaskReports);
+        String msg = (String)getJobStatsMethod("getDisplayString")
+            .invoke(jobStats);
         System.out.println(JobStats.SUCCESS_HEADER);
         System.out.println(msg);