You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by an...@apache.org on 2014/05/28 17:26:04 UTC
svn commit: r1598040 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
test/org/apache/pig/test/TestMRJobStats.java
Author: aniket486
Date: Wed May 28 15:26:04 2014
New Revision: 1598040
URL: http://svn.apache.org/r1598040
Log:
PIG-3958: TestMRJobStats is broken in 0.13 and trunk (aniket486)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
pig/trunk/test/org/apache/pig/test/TestMRJobStats.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1598040&r1=1598039&r2=1598040&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed May 28 15:26:04 2014
@@ -153,6 +153,8 @@ PIG-3882: Multiquery off mode execution
BUG FIXES
+PIG-3958: TestMRJobStats is broken in 0.13 and trunk (aniket486)
+
PIG-3949: HiveColumnarStorage compile failure with Hive 0.14.0 (daijy)
PIG-3960: Compile fail against Hadoop 2.4.0 after PIG-3913 (daijy)
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java?rev=1598040&r1=1598039&r2=1598040&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java Wed May 28 15:26:04 2014
@@ -331,6 +331,22 @@ public final class MRJobStats extends Jo
}
}
+ private class TaskStat {
+ int size;
+ long max;
+ long min;
+ long avg;
+ long median;
+
+ public TaskStat(int size, long max, long min, long avg, long median) {
+ this.size = size;
+ this.max = max;
+ this.min = min;
+ this.avg = avg;
+ this.median = median;
+ }
+ }
+
void addMapReduceStatistics(Job job) {
TaskReport[] maps = null;
try {
@@ -338,26 +354,42 @@ public final class MRJobStats extends Jo
} catch (IOException e) {
LOG.warn("Failed to get map task report", e);
}
- if (maps != null && maps.length > 0) {
- int size = maps.length;
- long max = 0;
- long min = Long.MAX_VALUE;
- long median = 0;
- long total = 0;
- long durations[] = new long[size];
-
- for (int i = 0; i < maps.length; i++) {
- TaskReport rpt = maps[i];
- long duration = rpt.getFinishTime() - rpt.getStartTime();
- durations[i] = duration;
- max = (duration > max) ? duration : max;
- min = (duration < min) ? duration : min;
- total += duration;
- }
- long avg = total / size;
+ TaskReport[] reduces = null;
+ try {
+ reduces = HadoopShims.getTaskReports(job, TaskType.REDUCE);
+ } catch (IOException e) {
+ LOG.warn("Failed to get reduce task report", e);
+ }
+ addMapReduceStatistics(maps, reduces);
+ }
+
+ private TaskStat getTaskStat(TaskReport[] tasks) {
+ int size = tasks.length;
+ long max = 0;
+ long min = Long.MAX_VALUE;
+ long median = 0;
+ long total = 0;
+ long durations[] = new long[size];
- median = calculateMedianValue(durations);
- setMapStat(size, max, min, avg, median);
+ for (int i = 0; i < tasks.length; i++) {
+ TaskReport rpt = tasks[i];
+ long duration = rpt.getFinishTime() - rpt.getStartTime();
+ durations[i] = duration;
+ max = (duration > max) ? duration : max;
+ min = (duration < min) ? duration : min;
+ total += duration;
+ }
+ long avg = total / size;
+
+ median = calculateMedianValue(durations);
+
+ return new TaskStat(size, max, min, avg, median);
+ }
+
+ private void addMapReduceStatistics(TaskReport[] maps, TaskReport[] reduces) {
+ if (maps != null && maps.length > 0) {
+ TaskStat st = getTaskStat(maps);
+ setMapStat(st.size, st.max, st.min, st.avg, st.median);
} else {
int m = conf.getInt("mapred.map.tasks", 1);
if (m > 0) {
@@ -365,31 +397,9 @@ public final class MRJobStats extends Jo
}
}
- TaskReport[] reduces = null;
- try {
- reduces = HadoopShims.getTaskReports(job, TaskType.REDUCE);
- } catch (IOException e) {
- LOG.warn("Failed to get reduce task report", e);
- }
if (reduces != null && reduces.length > 0) {
- int size = reduces.length;
- long max = 0;
- long min = Long.MAX_VALUE;
- long median = 0;
- long total = 0;
- long durations[] = new long[size];
-
- for (int i = 0; i < reduces.length; i++) {
- TaskReport rpt = reduces[i];
- long duration = rpt.getFinishTime() - rpt.getStartTime();
- durations[i] = duration;
- max = (duration > max) ? duration : max;
- min = (duration < min) ? duration : min;
- total += duration;
- }
- long avg = total / size;
- median = calculateMedianValue(durations);
- setReduceStat(size, max, min, avg, median);
+ TaskStat st = getTaskStat(reduces);
+ setReduceStat(st.size, st.max, st.min, st.avg, st.median);
} else {
int m = conf.getInt("mapred.reduce.tasks", 1);
if (m > 0) {
Modified: pig/trunk/test/org/apache/pig/test/TestMRJobStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMRJobStats.java?rev=1598040&r1=1598039&r2=1598040&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMRJobStats.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMRJobStats.java Wed May 28 15:26:04 2014
@@ -30,7 +30,6 @@ import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobClient;
-import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.TaskReport;
import org.apache.hadoop.mapreduce.Job;
@@ -111,8 +110,6 @@ public class TestMRJobStats {
@Test
public void testMedianMapReduceTime() throws Exception {
-
- JobConf jobConf = new JobConf();
JobClient jobClient = Mockito.mock(JobClient.class);
// mock methods to return the predefined map and reduce task reports
@@ -124,10 +121,10 @@ public class TestMRJobStats {
getJobStatsMethod("setId", JobID.class).invoke(jobStats, jobID);
jobStats.setSuccessful(true);
- getJobStatsMethod("addMapReduceStatistics", JobClient.class, Configuration.class)
- .invoke(jobStats, jobClient, jobConf);
- String msg = (String)getJobStatsMethod("getDisplayString", boolean.class)
- .invoke(jobStats, false);
+ getJobStatsMethod("addMapReduceStatistics", TaskReport[].class, TaskReport[].class)
+ .invoke(jobStats, mapTaskReports, reduceTaskReports);
+ String msg = (String)getJobStatsMethod("getDisplayString")
+ .invoke(jobStats);
System.out.println(JobStats.SUCCESS_HEADER);
System.out.println(msg);
@@ -149,21 +146,15 @@ public class TestMRJobStats {
Mockito.when(reduceTaskReports[0].getStartTime()).thenReturn(500L * ONE_THOUSAND);
Mockito.when(reduceTaskReports[0].getFinishTime()).thenReturn(700L * ONE_THOUSAND);
- JobConf jobConf = new JobConf();
- JobClient jobClient = Mockito.mock(JobClient.class);
-
- Mockito.when(jobClient.getMapTaskReports(jobID)).thenReturn(mapTaskReports);
- Mockito.when(jobClient.getReduceTaskReports(jobID)).thenReturn(reduceTaskReports);
-
PigStats.JobGraph jobGraph = new PigStats.JobGraph();
MRJobStats jobStats = createJobStats("JobStatsTest", jobGraph);
getJobStatsMethod("setId", JobID.class).invoke(jobStats, jobID);
jobStats.setSuccessful(true);
- getJobStatsMethod("addMapReduceStatistics", JobClient.class, Configuration.class)
- .invoke(jobStats, jobClient, jobConf);
- String msg = (String)getJobStatsMethod("getDisplayString", boolean.class)
- .invoke(jobStats, false);
+ getJobStatsMethod("addMapReduceStatistics", TaskReport[].class, TaskReport[].class)
+ .invoke(jobStats, mapTaskReports, reduceTaskReports);
+ String msg = (String)getJobStatsMethod("getDisplayString")
+ .invoke(jobStats);
System.out.println(JobStats.SUCCESS_HEADER);
System.out.println(msg);