You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2015/10/31 23:49:40 UTC

svn commit: r1711702 - in /pig/branches/spark/src/org/apache/pig: backend/hadoop/executionengine/spark/converter/LoadConverter.java tools/pigstats/spark/SparkJobStats.java tools/pigstats/spark/SparkPigStats.java tools/pigstats/spark/SparkStatsUtil.java

Author: xuefu
Date: Sat Oct 31 22:49:40 2015
New Revision: 1711702

URL: http://svn.apache.org/viewvc?rev=1711702&view=rev
Log:
PIG-4655: Support InputStats in spark mode (Xianda via Xuefu)

Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
    pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
    pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
    pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1711702&r1=1711701&r2=1711702&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java Sat Oct 31 22:49:40 2015
@@ -25,6 +25,9 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.pig.tools.pigstats.spark.SparkCounters;
+import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter;
+import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
 import scala.Function1;
 import scala.Tuple2;
 import scala.runtime.AbstractFunction1;
@@ -89,8 +92,26 @@ public class LoadConverter implements RD
                 jobConf, PigInputFormatSpark.class, Text.class, Tuple.class);
 
         registerUdfFiles();
+
+        ToTupleFunction ttf = new ToTupleFunction();
+
+        //create SparkCounter and set it for ToTupleFunction
+        if (!op.isTmpLoad()) {
+            String counterName = SparkStatsUtil.getLoadSparkCounterName(op);
+            SparkPigStatusReporter counterReporter = SparkPigStatusReporter.getInstance();
+            if (counterReporter.getCounters() != null) {
+                counterReporter.getCounters().createCounter(
+                        SparkStatsUtil.SPARK_INPUT_COUNTER_GROUP,
+                        counterName);
+            }
+
+            ttf.setCounterGroupName(SparkStatsUtil.SPARK_INPUT_COUNTER_GROUP);
+            ttf.setCounterName(counterName);
+            ttf.setSparkCounters(SparkPigStatusReporter.getInstance().getCounters());
+        }
+
         // map to get just RDD<Tuple>
-        return hadoopRDD.map(new ToTupleFunction(),
+        return hadoopRDD.map(ttf,
                 SparkUtil.getManifest(Tuple.class));
     }
 
@@ -113,10 +134,29 @@ public class LoadConverter implements RD
             AbstractFunction1<Tuple2<Text, Tuple>, Tuple> implements
             Function1<Tuple2<Text, Tuple>, Tuple>, Serializable {
 
+        private String counterGroupName;
+        private String counterName;
+        private SparkCounters sparkCounters;
+
         @Override
         public Tuple apply(Tuple2<Text, Tuple> v1) {
+            if (sparkCounters != null) {
+                sparkCounters.increment(counterGroupName, counterName, 1L);
+            }
             return v1._2();
         }
+
+        public void setCounterGroupName(String counterGroupName) {
+            this.counterGroupName = counterGroupName;
+        }
+
+        public void setCounterName(String counterName) {
+            this.counterName = counterName;
+        }
+
+        public void setSparkCounters(SparkCounters sparkCounters) {
+            this.sparkCounters = sparkCounters;
+        }
     }
 
     /**
@@ -166,4 +206,5 @@ public class LoadConverter implements RD
 
         return jobConf;
     }
+
 }

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java?rev=1711702&r1=1711701&r2=1711702&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java Sat Oct 31 22:49:40 2015
@@ -21,7 +21,7 @@ package org.apache.pig.tools.pigstats.sp
 import java.util.List;
 import java.util.Map;
 
-import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import scala.Option;
 
 import com.google.common.collect.Maps;
@@ -33,6 +33,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.tools.pigstats.InputStats;
 import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
@@ -70,6 +71,21 @@ public class SparkJobStats extends JobSt
         }
     }
 
+    public void addInputStats(POLoad po, boolean success,
+                              boolean singleInput,
+                              Configuration conf){
+
+        long recordsCount = SparkStatsUtil.getLoadSparkCounterValue(po);
+        long bytesRead = -1;
+        if (singleInput && stats.get("BytesRead") != null) {
+            bytesRead = stats.get("BytesRead");
+        }
+        InputStats inputStats = new InputStats(po.getLFile().getFileName(),
+                bytesRead, recordsCount, success);
+        inputStats.setConf(conf);
+
+        inputs.add(inputStats);
+    }
 
     public void collectStats(JobMetricsListener jobMetricsListener) {
         if (jobMetricsListener != null) {

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1711702&r1=1711701&r2=1711702&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java Sat Oct 31 22:49:40 2015
@@ -18,20 +18,25 @@
 package org.apache.pig.tools.pigstats.spark;
 
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobClient;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
@@ -43,6 +48,8 @@ public class SparkPigStats extends PigSt
     private Map<SparkJobStats,SparkOperator> jobSparkOperatorMap = new HashMap<SparkJobStats, SparkOperator>();
     private static final Log LOG = LogFactory.getLog(SparkPigStats.class);
 
+    private Set<SparkOperator> sparkOperatorsSet = new HashSet<SparkOperator>();
+
     private SparkScriptState sparkScriptState;
 
     public SparkPigStats() {
@@ -63,12 +70,14 @@ public class SparkPigStats extends PigSt
         boolean isSuccess = SparkStatsUtil.isJobSuccess(jobId, sparkContext);
         SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan);
         jobStats.setSuccessful(isSuccess);
-        jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener, conf);
         jobStats.collectStats(jobMetricsListener);
+        jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener, conf);
+        addInputInfoForSparkOper(sparkOperator, jobStats, isSuccess, jobMetricsListener, conf);
         jobSparkOperatorMap.put(jobStats, sparkOperator);
         jobPlan.add(jobStats);
     }
 
+
     public void addFailJobStats(POStore poStore, SparkOperator sparkOperator, String jobId,
                                 JobMetricsListener jobMetricsListener,
                                 JavaSparkContext sparkContext,
@@ -77,8 +86,9 @@ public class SparkPigStats extends PigSt
         boolean isSuccess = false;
         SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan);
         jobStats.setSuccessful(isSuccess);
-        jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener, conf);
         jobStats.collectStats(jobMetricsListener);
+        jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener, conf);
+        addInputInfoForSparkOper(sparkOperator, jobStats, isSuccess, jobMetricsListener, conf);
         jobSparkOperatorMap.put(jobStats, sparkOperator);
         jobPlan.add(jobStats);
         if (e != null) {
@@ -158,4 +168,42 @@ public class SparkPigStats extends PigSt
     public OutputStats result(String alias) {
         return null;
     }
+
+    /**
+     * SparkPlan can have many SparkOperators.
+     * Each SparkOperator can have multiple POStores
+     * We currently collect stats once for every POStore,
+     * But do not want to collect input stats for every POStore
+     *
+     * e.g. After multiQuery optimization, the sparkOperator may look like this:
+     * POLoad_1             (PhysicalPlan) ...POStore_A
+     *         \          /
+     *          ...POSplit
+     *         /          \
+     * POLoad_2            (PhysicalPlan) ...POStore_B
+     */
+    private void addInputInfoForSparkOper(SparkOperator sparkOperator,
+                                          SparkJobStats jobStats,
+                                          boolean isSuccess,
+                                          JobMetricsListener jobMetricsListener,
+                                          Configuration conf) {
+        //to avoid repetition
+        if (sparkOperatorsSet.contains(sparkOperator)) {
+            return;
+        }
+
+        try {
+            List<POLoad> poLoads = PlanHelper.getPhysicalOperators(sparkOperator.physicalPlan, POLoad.class);
+            for (POLoad load : poLoads) {
+                if (!load.isTmpLoad()) {
+                    jobStats.addInputStats(load, isSuccess, (poLoads.size() == 1), conf);
+                }
+            }
+        } catch (VisitorException ve) {
+            LOG.warn(ve);
+        }
+
+        sparkOperatorsSet.add(sparkOperator);
+    }
+
 }

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1711702&r1=1711701&r2=1711702&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java Sat Oct 31 22:49:40 2015
@@ -19,6 +19,7 @@
 package org.apache.pig.tools.pigstats.spark;
 
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
@@ -29,8 +30,10 @@ import org.apache.spark.api.java.JavaSpa
 
 public class SparkStatsUtil {
 
-    public static final String SPARK_STORE_COUNTER_GROUP = "SparkStoreCounters";
+    public static final String SPARK_STORE_COUNTER_GROUP = "Spark Store Counters";
     public static final String SPARK_STORE_RECORD_COUNTER = "Output records in ";
+    public static final String SPARK_INPUT_COUNTER_GROUP = "Spark Input Counters";
+    public static final String SPARK_INPUT_RECORD_COUNTER = "Input records from ";
 
   public static void waitForJobAddStats(int jobID,
                                         POStore poStore, SparkOperator sparkOperator,
@@ -75,11 +78,27 @@ public class SparkStatsUtil {
         return sb.toString();
     }
 
+    public static String getLoadSparkCounterName(POLoad load) {
+        String shortName = PigStatsUtil.getShortName(load.getLFile().getFileName());
+
+        StringBuffer sb = new StringBuffer(SPARK_INPUT_RECORD_COUNTER);
+        sb.append("_");
+        sb.append(load.getOperatorKey());
+        sb.append("_");
+        sb.append(shortName);
+        return sb.toString();
+    }
+
     public static long getStoreSparkCounterValue(POStore store) {
         SparkPigStatusReporter reporter = SparkPigStatusReporter.getInstance();
         return reporter.getCounters().getValue(SPARK_STORE_COUNTER_GROUP, getStoreSparkCounterName(store));
     }
 
+    public static long getLoadSparkCounterValue(POLoad load) {
+        SparkPigStatusReporter reporter = SparkPigStatusReporter.getInstance();
+        return reporter.getCounters().getValue(SPARK_INPUT_COUNTER_GROUP, getLoadSparkCounterName(load));
+    }
+
     public static boolean isJobSuccess(int jobID,
                                     JavaSparkContext sparkContext) {
       JobExecutionStatus status = getJobInfo(jobID, sparkContext).status();