You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/09/28 02:58:27 UTC
svn commit: r1628036 - in /pig/trunk: ./
shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/
shims/src/hadoop23/org/apache/hadoop/mapred/
shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/
src/org/apache/pig/bac...
Author: rohini
Date: Sun Sep 28 00:58:26 2014
New Revision: 1628036
URL: http://svn.apache.org/r1628036
Log:
PIG-4050: HadoopShims.getTaskReports() can cause OOM with Hadoop 2 (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
pig/trunk/shims/src/hadoop23/org/apache/hadoop/mapred/DowngradeHelper.java
pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1628036&r1=1628035&r2=1628036&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sun Sep 28 00:58:26 2014
@@ -84,6 +84,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-4050: HadoopShims.getTaskReports() can cause OOM with Hadoop 2 (rohini)
+
PIG-4176: Fix tez e2e test Bloom_[1-3] (daijy)
PIG-4195: Support loading char/varchar data in OrcStorage (daijy)
Modified: pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1628036&r1=1628035&r2=1628036&view=diff
==============================================================================
--- pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (original)
+++ pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Sun Sep 28 00:58:26 2014
@@ -18,6 +18,8 @@
package org.apache.pig.backend.hadoop.executionengine.shims;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -184,15 +186,19 @@ public class HadoopShims {
runningJob.killJob();
}
- public static TaskReport[] getTaskReports(Job job, TaskType type) throws IOException {
+ public static Iterator<TaskReport> getTaskReports(Job job, TaskType type) throws IOException {
if (job.getJobConf().getBoolean(PigConfiguration.PIG_NO_TASK_REPORT, false)) {
LOG.info("TaskReports are disabled for job: " + job.getAssignedJobID());
return null;
}
JobClient jobClient = job.getJobClient();
- return (type == TaskType.MAP)
- ? jobClient.getMapTaskReports(job.getAssignedJobID())
- : jobClient.getReduceTaskReports(job.getAssignedJobID());
+ TaskReport[] reports = null;
+ if (type == TaskType.MAP) {
+ reports = jobClient.getMapTaskReports(job.getAssignedJobID());
+ } else {
+ reports = jobClient.getReduceTaskReports(job.getAssignedJobID());
+ }
+ return reports == null ? null : Arrays.asList(reports).iterator();
}
public static boolean isHadoopYARN() {
Modified: pig/trunk/shims/src/hadoop23/org/apache/hadoop/mapred/DowngradeHelper.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/src/hadoop23/org/apache/hadoop/mapred/DowngradeHelper.java?rev=1628036&r1=1628035&r2=1628036&view=diff
==============================================================================
--- pig/trunk/shims/src/hadoop23/org/apache/hadoop/mapred/DowngradeHelper.java (original)
+++ pig/trunk/shims/src/hadoop23/org/apache/hadoop/mapred/DowngradeHelper.java Sun Sep 28 00:58:26 2014
@@ -17,11 +17,39 @@
*/
package org.apache.hadoop.mapred;
+import java.util.Iterator;
+
public class DowngradeHelper {
- // This is required since hadoop 2 TaskReport allows
+ // This is required since hadoop 2 TaskReport allows
// only package level access to this api
- public static TaskReport[] downgradeTaskReports(
+ public static Iterator<TaskReport> downgradeTaskReports(
org.apache.hadoop.mapreduce.TaskReport[] reports) {
- return TaskReport.downgradeArray(reports);
+ return reports == null ? null : new TaskReportIterator(reports);
+ }
+
+ private static class TaskReportIterator implements Iterator<TaskReport> {
+
+ private org.apache.hadoop.mapreduce.TaskReport[] reports;
+ private int curIndex = 0;
+
+ public TaskReportIterator(org.apache.hadoop.mapreduce.TaskReport[] reports) {
+ this.reports = reports;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return curIndex < this.reports.length ;
+ }
+
+ @Override
+ public TaskReport next() {
+ return TaskReport.downgrade(reports[curIndex++]);
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
}
}
Modified: pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1628036&r1=1628035&r2=1628036&view=diff
==============================================================================
--- pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (original)
+++ pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Sun Sep 28 00:58:26 2014
@@ -19,6 +19,7 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.lang.reflect.Method;
+import java.util.Iterator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -214,7 +215,7 @@ public class HadoopShims {
}
}
- public static TaskReport[] getTaskReports(Job job, TaskType type) throws IOException {
+ public static Iterator<TaskReport> getTaskReports(Job job, TaskType type) throws IOException {
if (job.getJobConf().getBoolean(PigConfiguration.PIG_NO_TASK_REPORT, false)) {
LOG.info("TaskReports are disabled for job: " + job.getAssignedJobID());
return null;
@@ -227,7 +228,7 @@ public class HadoopShims {
throw new IOException(ir);
}
}
-
+
public static boolean isHadoopYARN() {
return true;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java?rev=1628036&r1=1628035&r2=1628036&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java Sun Sep 28 00:58:26 2014
@@ -22,6 +22,7 @@ import java.io.PrintStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -160,23 +161,25 @@ public abstract class Launcher {
return (int) (Math.ceil(prog)) == 1;
}
- protected long computeTimeSpent(TaskReport[] taskReports) {
+ protected long computeTimeSpent(Iterator<TaskReport> taskReports) {
long timeSpent = 0;
- for (TaskReport r : taskReports) {
+ while (taskReports.hasNext()) {
+ TaskReport r = taskReports.next();
timeSpent += (r.getFinishTime() - r.getStartTime());
}
return timeSpent;
}
- protected void getErrorMessages(TaskReport reports[], String type,
+ protected void getErrorMessages(Iterator<TaskReport> reports, String type,
boolean errNotDbg, PigContext pigContext) throws Exception {
- for (int i = 0; i < reports.length; i++) {
- String msgs[] = reports[i].getDiagnostics();
+ while(reports.hasNext()) {
+ TaskReport report = reports.next();
+ String msgs[] = report.getDiagnostics();
ArrayList<Exception> exceptions = new ArrayList<Exception>();
String exceptionCreateFailMsg = null;
boolean jobFailed = false;
if (msgs.length > 0) {
- if (HadoopShims.isJobFailed(reports[i])) {
+ if (HadoopShims.isJobFailed(report)) {
jobFailed = true;
}
Set<String> errorMessageSet = new HashSet<String>();
@@ -199,7 +202,7 @@ public abstract class Launcher {
}
} else {
log.debug("Error message from task (" + type + ") "
- + reports[i].getTaskID() + msgs[j]);
+ + report.getTaskID() + msgs[j]);
}
}
}
@@ -223,7 +226,7 @@ public abstract class Launcher {
if (exceptions.size() > 1) {
for (int j = 0; j < exceptions.size(); ++j) {
String headerMessage = "Error message from task ("
- + type + ") " + reports[i].getTaskID();
+ + type + ") " + report.getTaskID();
LogUtils.writeLog(exceptions.get(j), pigContext
.getProperties().getProperty("pig.logfile"),
log, false, headerMessage, false, false);
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1628036&r1=1628035&r2=1628036&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Sun Sep 28 00:58:26 2014
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.io.PrintStream;
import java.util.ArrayList;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -797,13 +798,13 @@ public class MapReduceLauncher extends L
throw new ExecException(backendException);
}
try {
- TaskReport[] mapRep = HadoopShims.getTaskReports(job, TaskType.MAP);
+ Iterator<TaskReport> mapRep = HadoopShims.getTaskReports(job, TaskType.MAP);
if (mapRep != null) {
getErrorMessages(mapRep, "map", errNotDbg, pigContext);
totalHadoopTimeSpent += computeTimeSpent(mapRep);
mapRep = null;
}
- TaskReport[] redRep = HadoopShims.getTaskReports(job, TaskType.REDUCE);
+ Iterator<TaskReport> redRep = HadoopShims.getTaskReports(job, TaskType.REDUCE);
if (redRep != null) {
getErrorMessages(redRep, "reduce", errNotDbg, pigContext);
totalHadoopTimeSpent += computeTimeSpent(redRep);
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java?rev=1628036&r1=1628035&r2=1628036&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java Sun Sep 28 00:58:26 2014
@@ -21,7 +21,6 @@ package org.apache.pig.tools.pigstats;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -182,17 +181,17 @@ public abstract class JobStats extends O
* @param durations
* @return median value
*/
- protected long calculateMedianValue(long[] durations) {
+ protected long calculateMedianValue(List<Long> durations) {
long median;
// figure out the median
- Arrays.sort(durations);
- int midPoint = durations.length /2;
- if ((durations.length & 1) == 1) {
+ Collections.sort(durations);
+ int midPoint = durations.size() /2;
+ if ((durations.size() & 1) == 1) {
// odd
- median = durations[midPoint];
+ median = durations.get(midPoint);
} else {
// even
- median = (durations[midPoint-1] + durations[midPoint]) / 2;
+ median = (durations.get(midPoint-1) + durations.get(midPoint)) / 2;
}
return median;
}
Modified: pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java?rev=1628036&r1=1628035&r2=1628036&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java Sun Sep 28 00:58:26 2014
@@ -31,9 +31,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskReport;
import org.apache.hadoop.mapred.jobcontrol.Job;
import org.apache.hadoop.mapreduce.TaskType;
@@ -349,13 +347,13 @@ public final class MRJobStats extends Jo
}
void addMapReduceStatistics(Job job) {
- TaskReport[] maps = null;
+ Iterator<TaskReport> maps = null;
try {
maps = HadoopShims.getTaskReports(job, TaskType.MAP);
} catch (IOException e) {
LOG.warn("Failed to get map task report", e);
}
- TaskReport[] reduces = null;
+ Iterator<TaskReport> reduces = null;
try {
reduces = HadoopShims.getTaskReports(job, TaskType.REDUCE);
} catch (IOException e) {
@@ -364,21 +362,22 @@ public final class MRJobStats extends Jo
addMapReduceStatistics(maps, reduces);
}
- private TaskStat getTaskStat(TaskReport[] tasks) {
- int size = tasks.length;
+ private TaskStat getTaskStat(Iterator<TaskReport> tasks) {
+ int size = 0;
long max = 0;
long min = Long.MAX_VALUE;
long median = 0;
long total = 0;
- long durations[] = new long[size];
+ List<Long> durations = new ArrayList<Long>();
- for (int i = 0; i < tasks.length; i++) {
- TaskReport rpt = tasks[i];
+ while(tasks.hasNext()){
+ TaskReport rpt = tasks.next();
long duration = rpt.getFinishTime() - rpt.getStartTime();
- durations[i] = duration;
+ durations.add(duration);
max = (duration > max) ? duration : max;
min = (duration < min) ? duration : min;
total += duration;
+ size++;
}
long avg = total / size;
@@ -387,8 +386,8 @@ public final class MRJobStats extends Jo
return new TaskStat(size, max, min, avg, median);
}
- private void addMapReduceStatistics(TaskReport[] maps, TaskReport[] reduces) {
- if (maps != null && maps.length > 0) {
+ private void addMapReduceStatistics(Iterator<TaskReport> maps, Iterator<TaskReport> reduces) {
+ if (maps != null && maps.hasNext()) {
TaskStat st = getTaskStat(maps);
setMapStat(st.size, st.max, st.min, st.avg, st.median);
} else {
@@ -398,7 +397,7 @@ public final class MRJobStats extends Jo
}
}
- if (reduces != null && reduces.length > 0) {
+ if (reduces != null && reduces.hasNext()) {
TaskStat st = getTaskStat(reduces);
setReduceStat(st.size, st.max, st.min, st.avg, st.median);
} else {