You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2014/09/28 02:58:27 UTC

svn commit: r1628036 - in /pig/trunk: ./ shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/ shims/src/hadoop23/org/apache/hadoop/mapred/ shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/ src/org/apache/pig/bac...

Author: rohini
Date: Sun Sep 28 00:58:26 2014
New Revision: 1628036

URL: http://svn.apache.org/r1628036
Log:
PIG-4050: HadoopShims.getTaskReports() can cause OOM with Hadoop 2 (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
    pig/trunk/shims/src/hadoop23/org/apache/hadoop/mapred/DowngradeHelper.java
    pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
    pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1628036&r1=1628035&r2=1628036&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sun Sep 28 00:58:26 2014
@@ -84,6 +84,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-4050: HadoopShims.getTaskReports() can cause OOM with Hadoop 2 (rohini)
+
 PIG-4176: Fix tez e2e test Bloom_[1-3] (daijy)
 
 PIG-4195: Support loading char/varchar data in OrcStorage (daijy)

Modified: pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1628036&r1=1628035&r2=1628036&view=diff
==============================================================================
--- pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (original)
+++ pig/trunk/shims/src/hadoop20/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Sun Sep 28 00:58:26 2014
@@ -18,6 +18,8 @@
 package org.apache.pig.backend.hadoop.executionengine.shims;
 
 import java.io.IOException;
+import java.util.Arrays;
+import java.util.Iterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -184,15 +186,19 @@ public class HadoopShims {
             runningJob.killJob();
     }
 
-    public static TaskReport[] getTaskReports(Job job, TaskType type) throws IOException {
+    public static Iterator<TaskReport> getTaskReports(Job job, TaskType type) throws IOException {
         if (job.getJobConf().getBoolean(PigConfiguration.PIG_NO_TASK_REPORT, false)) {
             LOG.info("TaskReports are disabled for job: " + job.getAssignedJobID());
             return null;
         }
         JobClient jobClient = job.getJobClient();
-        return (type == TaskType.MAP)
-                ? jobClient.getMapTaskReports(job.getAssignedJobID())
-                        : jobClient.getReduceTaskReports(job.getAssignedJobID());
+        TaskReport[] reports = null;
+        if (type == TaskType.MAP) {
+            reports = jobClient.getMapTaskReports(job.getAssignedJobID());
+        } else {
+            reports = jobClient.getReduceTaskReports(job.getAssignedJobID());
+        }
+        return reports == null ? null : Arrays.asList(reports).iterator();
     }
     
     public static boolean isHadoopYARN() {

Modified: pig/trunk/shims/src/hadoop23/org/apache/hadoop/mapred/DowngradeHelper.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/src/hadoop23/org/apache/hadoop/mapred/DowngradeHelper.java?rev=1628036&r1=1628035&r2=1628036&view=diff
==============================================================================
--- pig/trunk/shims/src/hadoop23/org/apache/hadoop/mapred/DowngradeHelper.java (original)
+++ pig/trunk/shims/src/hadoop23/org/apache/hadoop/mapred/DowngradeHelper.java Sun Sep 28 00:58:26 2014
@@ -17,11 +17,39 @@
  */
 package org.apache.hadoop.mapred;
 
+import java.util.Iterator;
+
 public class DowngradeHelper {
-    // This is required since hadoop 2 TaskReport allows 
+    // This is required since hadoop 2 TaskReport allows
     // only package level access to this api
-    public static TaskReport[] downgradeTaskReports(
+    public static Iterator<TaskReport> downgradeTaskReports(
             org.apache.hadoop.mapreduce.TaskReport[] reports) {
-        return TaskReport.downgradeArray(reports);
+        return reports == null ? null : new TaskReportIterator(reports);
+    }
+
+    private static class TaskReportIterator implements Iterator<TaskReport> {
+
+        private org.apache.hadoop.mapreduce.TaskReport[] reports;
+        private int curIndex = 0;
+
+        public TaskReportIterator(org.apache.hadoop.mapreduce.TaskReport[] reports) {
+            this.reports = reports;
+        }
+
+        @Override
+        public boolean hasNext() {
+            return curIndex < this.reports.length ;
+        }
+
+        @Override
+        public TaskReport next() {
+            return TaskReport.downgrade(reports[curIndex++]);
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException();
+        }
+
     }
 }

Modified: pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java
URL: http://svn.apache.org/viewvc/pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java?rev=1628036&r1=1628035&r2=1628036&view=diff
==============================================================================
--- pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java (original)
+++ pig/trunk/shims/src/hadoop23/org/apache/pig/backend/hadoop/executionengine/shims/HadoopShims.java Sun Sep 28 00:58:26 2014
@@ -19,6 +19,7 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.lang.reflect.Method;
+import java.util.Iterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -214,7 +215,7 @@ public class HadoopShims {
         }
     }
 
-    public static TaskReport[] getTaskReports(Job job, TaskType type) throws IOException {
+    public static Iterator<TaskReport> getTaskReports(Job job, TaskType type) throws IOException {
         if (job.getJobConf().getBoolean(PigConfiguration.PIG_NO_TASK_REPORT, false)) {
             LOG.info("TaskReports are disabled for job: " + job.getAssignedJobID());
             return null;
@@ -227,7 +228,7 @@ public class HadoopShims {
             throw new IOException(ir);
         }
     }
-    
+
     public static boolean isHadoopYARN() {
         return true;
     }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java?rev=1628036&r1=1628035&r2=1628036&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/Launcher.java Sun Sep 28 00:58:26 2014
@@ -22,6 +22,7 @@ import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -160,23 +161,25 @@ public abstract class Launcher {
         return (int) (Math.ceil(prog)) == 1;
     }
 
-    protected long computeTimeSpent(TaskReport[] taskReports) {
+    protected long computeTimeSpent(Iterator<TaskReport> taskReports) {
         long timeSpent = 0;
-        for (TaskReport r : taskReports) {
+        while (taskReports.hasNext()) {
+            TaskReport r = taskReports.next();
             timeSpent += (r.getFinishTime() - r.getStartTime());
         }
         return timeSpent;
     }
 
-    protected void getErrorMessages(TaskReport reports[], String type,
+    protected void getErrorMessages(Iterator<TaskReport> reports, String type,
             boolean errNotDbg, PigContext pigContext) throws Exception {
-        for (int i = 0; i < reports.length; i++) {
-            String msgs[] = reports[i].getDiagnostics();
+        while(reports.hasNext()) {
+            TaskReport report = reports.next();
+            String msgs[] = report.getDiagnostics();
             ArrayList<Exception> exceptions = new ArrayList<Exception>();
             String exceptionCreateFailMsg = null;
             boolean jobFailed = false;
             if (msgs.length > 0) {
-                if (HadoopShims.isJobFailed(reports[i])) {
+                if (HadoopShims.isJobFailed(report)) {
                     jobFailed = true;
                 }
                 Set<String> errorMessageSet = new HashSet<String>();
@@ -199,7 +202,7 @@ public abstract class Launcher {
                             }
                         } else {
                             log.debug("Error message from task (" + type + ") "
-                                    + reports[i].getTaskID() + msgs[j]);
+                                    + report.getTaskID() + msgs[j]);
                         }
                     }
                 }
@@ -223,7 +226,7 @@ public abstract class Launcher {
                 if (exceptions.size() > 1) {
                     for (int j = 0; j < exceptions.size(); ++j) {
                         String headerMessage = "Error message from task ("
-                                + type + ") " + reports[i].getTaskID();
+                                + type + ") " + report.getTaskID();
                         LogUtils.writeLog(exceptions.get(j), pigContext
                                 .getProperties().getProperty("pig.logfile"),
                                 log, false, headerMessage, false, false);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1628036&r1=1628035&r2=1628036&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Sun Sep 28 00:58:26 2014
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -797,13 +798,13 @@ public class MapReduceLauncher extends L
             throw new ExecException(backendException);
         }
         try {
-            TaskReport[] mapRep = HadoopShims.getTaskReports(job, TaskType.MAP);
+            Iterator<TaskReport> mapRep = HadoopShims.getTaskReports(job, TaskType.MAP);
             if (mapRep != null) {
                 getErrorMessages(mapRep, "map", errNotDbg, pigContext);
                 totalHadoopTimeSpent += computeTimeSpent(mapRep);
                 mapRep = null;
             }
-            TaskReport[] redRep = HadoopShims.getTaskReports(job, TaskType.REDUCE);
+            Iterator<TaskReport> redRep = HadoopShims.getTaskReports(job, TaskType.REDUCE);
             if (redRep != null) {
                 getErrorMessages(redRep, "reduce", errNotDbg, pigContext);
                 totalHadoopTimeSpent += computeTimeSpent(redRep);

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java?rev=1628036&r1=1628035&r2=1628036&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java Sun Sep 28 00:58:26 2014
@@ -21,7 +21,6 @@ package org.apache.pig.tools.pigstats;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -182,17 +181,17 @@ public abstract class JobStats extends O
      * @param durations
      * @return median value
      */
-    protected long calculateMedianValue(long[] durations) {
+    protected long calculateMedianValue(List<Long> durations) {
         long median;
         // figure out the median
-        Arrays.sort(durations);
-        int midPoint = durations.length /2;
-        if ((durations.length & 1) == 1) {
+        Collections.sort(durations);
+        int midPoint = durations.size() /2;
+        if ((durations.size() & 1) == 1) {
             // odd
-            median = durations[midPoint];
+            median = durations.get(midPoint);
         } else {
             // even
-            median = (durations[midPoint-1] + durations[midPoint]) / 2;
+            median = (durations.get(midPoint-1) + durations.get(midPoint)) / 2;
         }
         return median;
     }

Modified: pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java?rev=1628036&r1=1628035&r2=1628036&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java (original)
+++ pig/trunk/src/org/apache/pig/tools/pigstats/mapreduce/MRJobStats.java Sun Sep 28 00:58:26 2014
@@ -31,9 +31,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.Counters;
 import org.apache.hadoop.mapred.Counters.Counter;
-import org.apache.hadoop.mapred.JobClient;
 import org.apache.hadoop.mapred.JobID;
-import org.apache.hadoop.mapred.RunningJob;
 import org.apache.hadoop.mapred.TaskReport;
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapreduce.TaskType;
@@ -349,13 +347,13 @@ public final class MRJobStats extends Jo
     }
 
     void addMapReduceStatistics(Job job) {
-        TaskReport[] maps = null;
+        Iterator<TaskReport> maps = null;
         try {
             maps = HadoopShims.getTaskReports(job, TaskType.MAP);
         } catch (IOException e) {
             LOG.warn("Failed to get map task report", e);
         }
-        TaskReport[] reduces = null;
+        Iterator<TaskReport> reduces = null;
         try {
             reduces = HadoopShims.getTaskReports(job, TaskType.REDUCE);
         } catch (IOException e) {
@@ -364,21 +362,22 @@ public final class MRJobStats extends Jo
         addMapReduceStatistics(maps, reduces);
     }
 
-    private TaskStat getTaskStat(TaskReport[] tasks) {
-        int size = tasks.length;
+    private TaskStat getTaskStat(Iterator<TaskReport> tasks) {
+        int size = 0;
         long max = 0;
         long min = Long.MAX_VALUE;
         long median = 0;
         long total = 0;
-        long durations[] = new long[size];
+        List<Long> durations = new ArrayList<Long>();
 
-        for (int i = 0; i < tasks.length; i++) {
-            TaskReport rpt = tasks[i];
+        while(tasks.hasNext()){
+            TaskReport rpt = tasks.next();
             long duration = rpt.getFinishTime() - rpt.getStartTime();
-            durations[i] = duration;
+            durations.add(duration);
             max = (duration > max) ? duration : max;
             min = (duration < min) ? duration : min;
             total += duration;
+            size++;
         }
         long avg = total / size;
 
@@ -387,8 +386,8 @@ public final class MRJobStats extends Jo
         return new TaskStat(size, max, min, avg, median);
     }
 
-    private void addMapReduceStatistics(TaskReport[] maps, TaskReport[] reduces) {
-        if (maps != null && maps.length > 0) {
+    private void addMapReduceStatistics(Iterator<TaskReport> maps, Iterator<TaskReport> reduces) {
+        if (maps != null && maps.hasNext()) {
             TaskStat st = getTaskStat(maps);
             setMapStat(st.size, st.max, st.min, st.avg, st.median);
         } else {
@@ -398,7 +397,7 @@ public final class MRJobStats extends Jo
             }
         }
 
-        if (reduces != null && reduces.length > 0) {
+        if (reduces != null && reduces.hasNext()) {
             TaskStat st = getTaskStat(reduces);
             setReduceStat(st.size, st.max, st.min, st.avg, st.median);
         } else {