You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2015/03/11 02:27:06 UTC
svn commit: r1665753 - in /pig/branches/spark/src/org/apache/pig:
backend/hadoop/executionengine/spark/
backend/hadoop/executionengine/spark/optimizer/
backend/hadoop/executionengine/util/ tools/pigstats/spark/
Author: xuefu
Date: Wed Mar 11 01:27:06 2015
New Revision: 1665753
URL: http://svn.apache.org/r1665753
Log:
PIG-4269: Enable unit test TestAccumulator for spark (Liyun via Xuefu)
Added:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1665753&r1=1665752&r2=1665753&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Wed Mar 11 01:27:06 2015
@@ -25,7 +25,6 @@ import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
-import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
@@ -86,6 +85,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.spark.converter.StreamConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.converter.UnionConverter;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.POStreamSpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.AccumulatorOptimizer;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
@@ -106,6 +106,7 @@ import org.apache.spark.api.java.JavaSpa
import org.apache.spark.rdd.RDD;
import org.apache.spark.scheduler.JobLogger;
import org.apache.spark.scheduler.StatsReportListener;
+import org.apache.spark.SparkException;
/**
* Main class that launches pig for Spark
@@ -210,6 +211,15 @@ public class SparkLauncher extends Launc
return sparkStats;
}
+ private void optimize(PigContext pc, SparkOperPlan plan) throws VisitorException {
+ boolean isAccum =
+ Boolean.valueOf(pc.getProperties().getProperty("opt.accumulator","true"));
+ if (isAccum) {
+ AccumulatorOptimizer accum = new AccumulatorOptimizer(plan);
+ accum.visit();
+ }
+ }
+
/**
* In Spark, currently only async actions return job id. There is no async
* equivalent of actions like saveAsNewAPIHadoopFile()
@@ -396,6 +406,8 @@ public class SparkLauncher extends Launc
SparkPOPackageAnnotator pkgAnnotator = new SparkPOPackageAnnotator(
sparkPlan);
pkgAnnotator.visit();
+
+ optimize(pigContext, sparkPlan);
return sparkPlan;
}
@@ -501,7 +513,6 @@ public class SparkLauncher extends Launc
Map<Class<? extends PhysicalOperator>, POConverter> convertMap,
Set<Integer> seenJobIDs, SparkPigStats sparkStats, JobConf conf)
throws IOException, InterruptedException {
-
List<SparkOperator> predecessors = sparkPlan
.getPredecessors(sparkOperator);
List<RDD<Tuple>> predecessorRDDs = Lists.newArrayList();
@@ -517,6 +528,8 @@ public class SparkLauncher extends Launc
}
List<PhysicalOperator> leafPOs = sparkOperator.physicalPlan.getLeaves();
+ boolean isFail = false;
+ Exception exception = null;
if (leafPOs != null && leafPOs.size() != 1) {
throw new IllegalArgumentException(
String.format(
@@ -528,19 +541,34 @@ public class SparkLauncher extends Launc
sparkOperator.name()));
} else {
PhysicalOperator leafPO = leafPOs.get(0);
- physicalToRDD(sparkOperator.physicalPlan, leafPO, physicalOpRdds,
- predecessorRDDs, convertMap);
- sparkOpRdds.put(sparkOperator.getOperatorKey(),
- physicalOpRdds.get(leafPO.getOperatorKey()));
+ try {
+ physicalToRDD(sparkOperator.physicalPlan, leafPO, physicalOpRdds,
+ predecessorRDDs, convertMap);
+ sparkOpRdds.put(sparkOperator.getOperatorKey(),
+ physicalOpRdds.get(leafPO.getOperatorKey()));
+ }catch(Exception e) {
+ if( e instanceof SparkException) {
+ LOG.info("throw SparkException, error founds when running " +
+ "rdds in spark");
+ }
+ exception = e;
+ isFail = true;
+ }
}
List<POStore> poStores = PlanHelper.getPhysicalOperators(
sparkOperator.physicalPlan, POStore.class);
if (poStores != null && poStores.size() == 1) {
POStore poStore = poStores.get(0);
- for (int jobID : getJobIDs(seenJobIDs)) {
- SparkStatsUtil.waitForJobAddStats(jobID, poStore,
- jobMetricsListener, sparkContext, sparkStats, conf);
+ if( isFail == false ) {
+ for (int jobID : getJobIDs(seenJobIDs)) {
+ SparkStatsUtil.waitForJobAddStats(jobID, poStore,
+ jobMetricsListener, sparkContext, sparkStats, conf);
+ }
+ }else{
+ String failJobID =sparkOperator.name().concat("_fail");
+ SparkStatsUtil.addFailJobStats(failJobID, poStore,sparkStats,
+ conf,exception);
}
} else {
LOG.info(String
Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java?rev=1665753&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java Wed Mar 11 01:27:06 2015
@@ -0,0 +1,26 @@
+package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;
+
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.backend.hadoop.executionengine.util.AccumulatorOptimizerUtil;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * A visitor to optimize plans that determines if a vertex plan can run in
+ * accumulative mode.
+ */
+public class AccumulatorOptimizer extends SparkOpPlanVisitor {
+
+ public AccumulatorOptimizer(SparkOperPlan plan) {
+ super(plan, new DepthFirstWalker<SparkOperator, SparkOperPlan>(plan));
+ }
+
+ @Override
+ public void visitSparkOp(SparkOperator sparkOperator) throws
+ VisitorException {
+ AccumulatorOptimizerUtil.addAccumulatorSpark(sparkOperator
+ .physicalPlan);
+ }
+}
\ No newline at end of file
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java?rev=1665753&r1=1665752&r2=1665753&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java Wed Mar 11 01:27:06 2015
@@ -37,11 +37,14 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.UnaryExpressionOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSortedDistinct;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
import org.apache.pig.data.DataType;
import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.VisitorException;
public class AccumulatorOptimizerUtil {
private static final Log LOG = LogFactory.getLog(AccumulatorOptimizerUtil.class);
@@ -286,4 +289,85 @@ public class AccumulatorOptimizerUtil {
return false;
}
+
+ public static void addAccumulatorSpark(PhysicalPlan plan) throws
+ VisitorException {
+ List<PhysicalOperator> pos = plan.getRoots();
+ if (pos == null || pos.size() == 0) {
+ return;
+ }
+
+ // See if this is a POGlobalRearrange
+ PhysicalOperator po_globalRearrange = pos.get(0);
+ if (!po_globalRearrange.getClass().equals(POGlobalRearrange.class)) {
+ return;
+ }
+
+ List<PhysicalOperator> poPackages = plan.getSuccessors(po_globalRearrange);
+
+ if (poPackages == null || poPackages.size() == 0) {
+ return;
+ }
+ // See if this is a POPackage
+ PhysicalOperator po_package = poPackages.get(0);
+ if (!po_package.getClass().equals(POPackage.class)) {
+ return;
+ }
+
+ Packager pkgr = ((POPackage) po_package).getPkgr();
+ // Check that this is a standard package, not a subclass
+ if (!pkgr.getClass().equals(Packager.class)) {
+ return;
+ }
+
+ // if POPackage is for distinct, just return
+ if (pkgr.isDistinct()) {
+ return;
+ }
+
+ // if any input to POPackage is inner, just return
+ boolean[] isInner = pkgr.getInner();
+ for (boolean b : isInner) {
+ if (b) {
+ return;
+ }
+ }
+
+ List<PhysicalOperator> l = plan.getSuccessors(po_package);
+ // there should be only one POForEach
+ if (l == null || l.size() == 0 || l.size() > 1) {
+ return;
+ }
+
+ PhysicalOperator po_foreach = l.get(0);
+ if (!(po_foreach instanceof POForEach)) {
+ return;
+ }
+
+ boolean foundUDF = false;
+ List<PhysicalPlan> list = ((POForEach) po_foreach).getInputPlans();
+ for (PhysicalPlan p : list) {
+ PhysicalOperator po = p.getLeaves().get(0);
+
+ // only expression operators are allowed
+ if (!(po instanceof ExpressionOperator)) {
+ return;
+ }
+
+ if (((ExpressionOperator) po).containUDF()) {
+ foundUDF = true;
+ }
+
+ if (!check(po)) {
+ return;
+ }
+ }
+
+ if (foundUDF) {
+ // if all tests are passed, reducer can run in accumulative mode
+ LOG.info("Reducer is to run in accumulative mode.");
+ po_package.setAccumulative();
+ po_foreach.setAccumulative();
+ }
+ }
}
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java?rev=1665753&r1=1665752&r2=1665753&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java Wed Mar 11 01:27:06 2015
@@ -66,14 +66,15 @@ public class SparkJobStats extends JobSt
}
}
- public void collectStats(JobMetricsListener jobMetricsListener) {
- Map<String, List<TaskMetrics>> taskMetrics = jobMetricsListener.getJobMetric(jobId);
- if (taskMetrics == null) {
- throw new RuntimeException("No task metrics available for jobId " + jobId);
- }
-
- stats = combineTaskMetrics(taskMetrics);
- }
+ public void collectStats(JobMetricsListener jobMetricsListener) {
+ if (jobMetricsListener != null) {
+ Map<String, List<TaskMetrics>> taskMetrics = jobMetricsListener.getJobMetric(jobId);
+ if (taskMetrics == null) {
+ throw new RuntimeException("No task metrics available for jobId " + jobId);
+ }
+ stats = combineTaskMetrics(taskMetrics);
+ }
+ }
public Map<String, Long> getStats() {
return stats;
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1665753&r1=1665752&r2=1665753&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java Wed Mar 11 01:27:06 2015
@@ -36,6 +36,22 @@ public class SparkPigStats extends PigSt
jobPlan.add(jobStats);
}
+ public void addFailJobStats(POStore poStore, String jobId,
+ JobMetricsListener jobMetricsListener,
+ JavaSparkContext sparkContext,
+ Configuration conf,
+ Exception e) {
+ boolean isSuccess = false;
+ SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan);
+ jobStats.setSuccessful(isSuccess);
+ jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener, conf);
+ jobStats.collectStats(jobMetricsListener);
+ jobPlan.add(jobStats);
+ if (e != null) {
+ jobStats.setBackendException(e);
+ }
+ }
+
public void finish() {
super.stop();
display();
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1665753&r1=1665752&r2=1665753&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java Wed Mar 11 01:27:06 2015
@@ -43,10 +43,20 @@ public class SparkStatsUtil {
// To workaround this, we will wait for this job to "finish".
jobMetricsListener.waitForJobToEnd(jobID);
sparkPigStats.addJobStats(poStore, jobID, jobMetricsListener,
- sparkContext, jobConf);
+ sparkContext, jobConf);
jobMetricsListener.cleanup(jobID);
}
+ public static void addFailJobStats(String jobID,
+ POStore poStore,
+ SparkPigStats sparkPigStats,
+ JobConf jobConf, Exception e) {
+ JobMetricsListener jobMetricsListener = null;
+ JavaSparkContext sparkContext = null;
+ sparkPigStats.addFailJobStats(poStore, jobID, jobMetricsListener,
+ sparkContext, jobConf, e);
+ }
+
public static boolean isJobSuccess(int jobID,
JavaSparkContext sparkContext) {
JobExecutionStatus status = getJobInfo(jobID, sparkContext).status();