You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2015/10/31 23:49:40 UTC
svn commit: r1711702 - in /pig/branches/spark/src/org/apache/pig:
backend/hadoop/executionengine/spark/converter/LoadConverter.java
tools/pigstats/spark/SparkJobStats.java
tools/pigstats/spark/SparkPigStats.java
tools/pigstats/spark/SparkStatsUtil.java
Author: xuefu
Date: Sat Oct 31 22:49:40 2015
New Revision: 1711702
URL: http://svn.apache.org/viewvc?rev=1711702&view=rev
Log:
PIG-4655: Support InputStats in spark mode (Xianda via Xuefu)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java?rev=1711702&r1=1711701&r2=1711702&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/converter/LoadConverter.java Sat Oct 31 22:49:40 2015
@@ -25,6 +25,9 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import org.apache.pig.tools.pigstats.spark.SparkCounters;
+import org.apache.pig.tools.pigstats.spark.SparkPigStatusReporter;
+import org.apache.pig.tools.pigstats.spark.SparkStatsUtil;
import scala.Function1;
import scala.Tuple2;
import scala.runtime.AbstractFunction1;
@@ -89,8 +92,26 @@ public class LoadConverter implements RD
jobConf, PigInputFormatSpark.class, Text.class, Tuple.class);
registerUdfFiles();
+
+ ToTupleFunction ttf = new ToTupleFunction();
+
+ //create SparkCounter and set it for ToTupleFunction
+ if (!op.isTmpLoad()) {
+ String counterName = SparkStatsUtil.getLoadSparkCounterName(op);
+ SparkPigStatusReporter counterReporter = SparkPigStatusReporter.getInstance();
+ if (counterReporter.getCounters() != null) {
+ counterReporter.getCounters().createCounter(
+ SparkStatsUtil.SPARK_INPUT_COUNTER_GROUP,
+ counterName);
+ }
+
+ ttf.setCounterGroupName(SparkStatsUtil.SPARK_INPUT_COUNTER_GROUP);
+ ttf.setCounterName(counterName);
+ ttf.setSparkCounters(SparkPigStatusReporter.getInstance().getCounters());
+ }
+
// map to get just RDD<Tuple>
- return hadoopRDD.map(new ToTupleFunction(),
+ return hadoopRDD.map(ttf,
SparkUtil.getManifest(Tuple.class));
}
@@ -113,10 +134,29 @@ public class LoadConverter implements RD
AbstractFunction1<Tuple2<Text, Tuple>, Tuple> implements
Function1<Tuple2<Text, Tuple>, Tuple>, Serializable {
+ private String counterGroupName;
+ private String counterName;
+ private SparkCounters sparkCounters;
+
@Override
public Tuple apply(Tuple2<Text, Tuple> v1) {
+ if (sparkCounters != null) {
+ sparkCounters.increment(counterGroupName, counterName, 1L);
+ }
return v1._2();
}
+
+ public void setCounterGroupName(String counterGroupName) {
+ this.counterGroupName = counterGroupName;
+ }
+
+ public void setCounterName(String counterName) {
+ this.counterName = counterName;
+ }
+
+ public void setSparkCounters(SparkCounters sparkCounters) {
+ this.sparkCounters = sparkCounters;
+ }
}
/**
@@ -166,4 +206,5 @@ public class LoadConverter implements RD
return jobConf;
}
+
}
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java?rev=1711702&r1=1711701&r2=1711702&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java Sat Oct 31 22:49:40 2015
@@ -21,7 +21,7 @@ package org.apache.pig.tools.pigstats.sp
import java.util.List;
import java.util.Map;
-import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import scala.Option;
import com.google.common.collect.Maps;
@@ -33,6 +33,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.tools.pigstats.InputStats;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
@@ -70,6 +71,21 @@ public class SparkJobStats extends JobSt
}
}
+ public void addInputStats(POLoad po, boolean success,
+ boolean singleInput,
+ Configuration conf){
+
+ long recordsCount = SparkStatsUtil.getLoadSparkCounterValue(po);
+ long bytesRead = -1;
+ if (singleInput && stats.get("BytesRead") != null) {
+ bytesRead = stats.get("BytesRead");
+ }
+ InputStats inputStats = new InputStats(po.getLFile().getFileName(),
+ bytesRead, recordsCount, success);
+ inputStats.setConf(conf);
+
+ inputs.add(inputStats);
+ }
public void collectStats(JobMetricsListener jobMetricsListener) {
if (jobMetricsListener != null) {
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1711702&r1=1711701&r2=1711702&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java Sat Oct 31 22:49:40 2015
@@ -18,20 +18,25 @@
package org.apache.pig.tools.pigstats.spark;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.Properties;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.JobClient;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
@@ -43,6 +48,8 @@ public class SparkPigStats extends PigSt
private Map<SparkJobStats,SparkOperator> jobSparkOperatorMap = new HashMap<SparkJobStats, SparkOperator>();
private static final Log LOG = LogFactory.getLog(SparkPigStats.class);
+ private Set<SparkOperator> sparkOperatorsSet = new HashSet<SparkOperator>();
+
private SparkScriptState sparkScriptState;
public SparkPigStats() {
@@ -63,12 +70,14 @@ public class SparkPigStats extends PigSt
boolean isSuccess = SparkStatsUtil.isJobSuccess(jobId, sparkContext);
SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan);
jobStats.setSuccessful(isSuccess);
- jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener, conf);
jobStats.collectStats(jobMetricsListener);
+ jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener, conf);
+ addInputInfoForSparkOper(sparkOperator, jobStats, isSuccess, jobMetricsListener, conf);
jobSparkOperatorMap.put(jobStats, sparkOperator);
jobPlan.add(jobStats);
}
+
public void addFailJobStats(POStore poStore, SparkOperator sparkOperator, String jobId,
JobMetricsListener jobMetricsListener,
JavaSparkContext sparkContext,
@@ -77,8 +86,9 @@ public class SparkPigStats extends PigSt
boolean isSuccess = false;
SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan);
jobStats.setSuccessful(isSuccess);
- jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener, conf);
jobStats.collectStats(jobMetricsListener);
+ jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener, conf);
+ addInputInfoForSparkOper(sparkOperator, jobStats, isSuccess, jobMetricsListener, conf);
jobSparkOperatorMap.put(jobStats, sparkOperator);
jobPlan.add(jobStats);
if (e != null) {
@@ -158,4 +168,42 @@ public class SparkPigStats extends PigSt
public OutputStats result(String alias) {
return null;
}
+
+ /**
+ * SparkPlan can have many SparkOperators.
+ * Each SparkOperator can have multiple POStores
+ * We currently collect stats once for every POStore,
+ * But do not want to collect input stats for every POStore
+ *
+ * e.g. After multiQuery optimization, the sparkOperator may look like this:
+ * POLoad_1 (PhysicalPlan) ...POStore_A
+ * \ /
+ * ...POSplit
+ * / \
+ * POLoad_2 (PhysicalPlan) ...POStore_B
+ */
+ private void addInputInfoForSparkOper(SparkOperator sparkOperator,
+ SparkJobStats jobStats,
+ boolean isSuccess,
+ JobMetricsListener jobMetricsListener,
+ Configuration conf) {
+ //to avoid repetition
+ if (sparkOperatorsSet.contains(sparkOperator)) {
+ return;
+ }
+
+ try {
+ List<POLoad> poLoads = PlanHelper.getPhysicalOperators(sparkOperator.physicalPlan, POLoad.class);
+ for (POLoad load : poLoads) {
+ if (!load.isTmpLoad()) {
+ jobStats.addInputStats(load, isSuccess, (poLoads.size() == 1), conf);
+ }
+ }
+ } catch (VisitorException ve) {
+ LOG.warn(ve);
+ }
+
+ sparkOperatorsSet.add(sparkOperator);
+ }
+
}
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1711702&r1=1711701&r2=1711702&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java Sat Oct 31 22:49:40 2015
@@ -19,6 +19,7 @@
package org.apache.pig.tools.pigstats.spark;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
@@ -29,8 +30,10 @@ import org.apache.spark.api.java.JavaSpa
public class SparkStatsUtil {
- public static final String SPARK_STORE_COUNTER_GROUP = "SparkStoreCounters";
+ public static final String SPARK_STORE_COUNTER_GROUP = "Spark Store Counters";
public static final String SPARK_STORE_RECORD_COUNTER = "Output records in ";
+ public static final String SPARK_INPUT_COUNTER_GROUP = "Spark Input Counters";
+ public static final String SPARK_INPUT_RECORD_COUNTER = "Input records from ";
public static void waitForJobAddStats(int jobID,
POStore poStore, SparkOperator sparkOperator,
@@ -75,11 +78,27 @@ public class SparkStatsUtil {
return sb.toString();
}
+ public static String getLoadSparkCounterName(POLoad load) {
+ String shortName = PigStatsUtil.getShortName(load.getLFile().getFileName());
+
+ StringBuffer sb = new StringBuffer(SPARK_INPUT_RECORD_COUNTER);
+ sb.append("_");
+ sb.append(load.getOperatorKey());
+ sb.append("_");
+ sb.append(shortName);
+ return sb.toString();
+ }
+
public static long getStoreSparkCounterValue(POStore store) {
SparkPigStatusReporter reporter = SparkPigStatusReporter.getInstance();
return reporter.getCounters().getValue(SPARK_STORE_COUNTER_GROUP, getStoreSparkCounterName(store));
}
+ public static long getLoadSparkCounterValue(POLoad load) {
+ SparkPigStatusReporter reporter = SparkPigStatusReporter.getInstance();
+ return reporter.getCounters().getValue(SPARK_INPUT_COUNTER_GROUP, getLoadSparkCounterName(load));
+ }
+
public static boolean isJobSuccess(int jobID,
JavaSparkContext sparkContext) {
JobExecutionStatus status = getJobInfo(jobID, sparkContext).status();