You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2015/03/11 02:27:06 UTC

svn commit: r1665753 - in /pig/branches/spark/src/org/apache/pig: backend/hadoop/executionengine/spark/ backend/hadoop/executionengine/spark/optimizer/ backend/hadoop/executionengine/util/ tools/pigstats/spark/

Author: xuefu
Date: Wed Mar 11 01:27:06 2015
New Revision: 1665753

URL: http://svn.apache.org/r1665753
Log:
PIG-4269: Enable unit test TestAccumulator for spark (Liyun via Xuefu)

Added:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java
Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
    pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
    pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
    pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1665753&r1=1665752&r2=1665753&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Wed Mar 11 01:27:06 2015
@@ -25,7 +25,6 @@ import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -86,6 +85,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.StreamConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.converter.UnionConverter;
 import org.apache.pig.backend.hadoop.executionengine.spark.operator.POStreamSpark;
+import org.apache.pig.backend.hadoop.executionengine.spark.optimizer.AccumulatorOptimizer;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkCompiler;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
@@ -106,6 +106,7 @@ import org.apache.spark.api.java.JavaSpa
 import org.apache.spark.rdd.RDD;
 import org.apache.spark.scheduler.JobLogger;
 import org.apache.spark.scheduler.StatsReportListener;
+import org.apache.spark.SparkException;
 
 /**
  * Main class that launches pig for Spark
@@ -210,6 +211,15 @@ public class SparkLauncher extends Launc
 		return sparkStats;
 	}
 
+	private void optimize(PigContext pc, SparkOperPlan plan) throws VisitorException {
+		boolean isAccum =
+				Boolean.valueOf(pc.getProperties().getProperty("opt.accumulator","true"));
+		if (isAccum) {
+			AccumulatorOptimizer accum = new AccumulatorOptimizer(plan);
+			accum.visit();
+		}
+	}
+
 	/**
 	 * In Spark, currently only async actions return job id. There is no async
 	 * equivalent of actions like saveAsNewAPIHadoopFile()
@@ -396,6 +406,8 @@ public class SparkLauncher extends Launc
 		SparkPOPackageAnnotator pkgAnnotator = new SparkPOPackageAnnotator(
 				sparkPlan);
 		pkgAnnotator.visit();
+
+		optimize(pigContext, sparkPlan);
 		return sparkPlan;
 	}
 
@@ -501,7 +513,6 @@ public class SparkLauncher extends Launc
 			Map<Class<? extends PhysicalOperator>, POConverter> convertMap,
 			Set<Integer> seenJobIDs, SparkPigStats sparkStats, JobConf conf)
 			throws IOException, InterruptedException {
-
 		List<SparkOperator> predecessors = sparkPlan
 				.getPredecessors(sparkOperator);
 		List<RDD<Tuple>> predecessorRDDs = Lists.newArrayList();
@@ -517,6 +528,8 @@ public class SparkLauncher extends Launc
 		}
 
 		List<PhysicalOperator> leafPOs = sparkOperator.physicalPlan.getLeaves();
+		boolean isFail = false;
+		Exception exception = null;
 		if (leafPOs != null && leafPOs.size() != 1) {
 			throw new IllegalArgumentException(
 					String.format(
@@ -528,19 +541,34 @@ public class SparkLauncher extends Launc
 							sparkOperator.name()));
 		} else {
 			PhysicalOperator leafPO = leafPOs.get(0);
-			physicalToRDD(sparkOperator.physicalPlan, leafPO, physicalOpRdds,
-					predecessorRDDs, convertMap);
-			sparkOpRdds.put(sparkOperator.getOperatorKey(),
-					physicalOpRdds.get(leafPO.getOperatorKey()));
+			try {
+				physicalToRDD(sparkOperator.physicalPlan, leafPO, physicalOpRdds,
+						predecessorRDDs, convertMap);
+				sparkOpRdds.put(sparkOperator.getOperatorKey(),
+						physicalOpRdds.get(leafPO.getOperatorKey()));
+			}catch(Exception e) {
+				if( e instanceof  SparkException) {
+					LOG.info("throw SparkException, error founds when running " +
+							"rdds in spark");
+				}
+				exception = e;
+				isFail = true;
+			}
 		}
 
 		List<POStore> poStores = PlanHelper.getPhysicalOperators(
 				sparkOperator.physicalPlan, POStore.class);
 		if (poStores != null && poStores.size() == 1) {
 			POStore poStore = poStores.get(0);
-			for (int jobID : getJobIDs(seenJobIDs)) {
-				SparkStatsUtil.waitForJobAddStats(jobID, poStore,
-						jobMetricsListener, sparkContext, sparkStats, conf);
+			if( isFail == false ) {
+				for (int jobID : getJobIDs(seenJobIDs)) {
+					SparkStatsUtil.waitForJobAddStats(jobID, poStore,
+							jobMetricsListener, sparkContext, sparkStats, conf);
+				}
+			}else{
+				String failJobID =sparkOperator.name().concat("_fail");
+				SparkStatsUtil.addFailJobStats(failJobID, poStore,sparkStats,
+						conf,exception);
 			}
 		} else {
 			LOG.info(String

Added: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java?rev=1665753&view=auto
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java (added)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/optimizer/AccumulatorOptimizer.java Wed Mar 11 01:27:06 2015
@@ -0,0 +1,26 @@
+package org.apache.pig.backend.hadoop.executionengine.spark.optimizer;
+
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOpPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
+import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
+import org.apache.pig.backend.hadoop.executionengine.util.AccumulatorOptimizerUtil;
+import org.apache.pig.impl.plan.DepthFirstWalker;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * A visitor to optimize plans that determines if a vertex plan can run in
+ * accumulative mode.
+ */
+public class AccumulatorOptimizer extends SparkOpPlanVisitor {
+
+    public AccumulatorOptimizer(SparkOperPlan plan) {
+		super(plan, new DepthFirstWalker<SparkOperator, SparkOperPlan>(plan));
+    }
+
+    @Override
+    public void visitSparkOp(SparkOperator sparkOperator) throws
+			VisitorException {
+        AccumulatorOptimizerUtil.addAccumulatorSpark(sparkOperator
+                .physicalPlan);
+    }
+}
\ No newline at end of file

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java?rev=1665753&r1=1665752&r2=1665753&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/util/AccumulatorOptimizerUtil.java Wed Mar 11 01:27:06 2015
@@ -37,11 +37,14 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.UnaryExpressionOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSortedDistinct;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.Packager;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.util.PlanHelper;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.VisitorException;
 
 public class AccumulatorOptimizerUtil {
     private static final Log LOG = LogFactory.getLog(AccumulatorOptimizerUtil.class);
@@ -286,4 +289,85 @@ public class AccumulatorOptimizerUtil {
 
         return false;
     }
+
+    public static void addAccumulatorSpark(PhysicalPlan plan) throws
+            VisitorException {
+        List<PhysicalOperator> pos = plan.getRoots();
+        if (pos == null || pos.size() == 0) {
+            return;
+        }
+
+        // See if this is a POGlobalRearrange
+        PhysicalOperator po_globalRearrange = pos.get(0);
+        if (!po_globalRearrange.getClass().equals(POGlobalRearrange.class)) {
+            return;
+        }
+
+        List<PhysicalOperator> poPackages = plan.getSuccessors(po_globalRearrange);
+
+        if (poPackages == null || poPackages.size() == 0) {
+            return;
+        }
+        // See if this is a POPackage
+        PhysicalOperator po_package = poPackages.get(0);
+        if (!po_package.getClass().equals(POPackage.class)) {
+            return;
+        }
+
+        Packager pkgr = ((POPackage) po_package).getPkgr();
+        // Check that this is a standard package, not a subclass
+        if (!pkgr.getClass().equals(Packager.class)) {
+            return;
+        }
+
+        // if POPackage is for distinct, just return
+        if (pkgr.isDistinct()) {
+            return;
+        }
+
+        // if any input to POPackage is inner, just return
+        boolean[] isInner = pkgr.getInner();
+        for (boolean b : isInner) {
+            if (b) {
+                return;
+            }
+        }
+
+        List<PhysicalOperator> l = plan.getSuccessors(po_package);
+        // there should be only one POForEach
+        if (l == null || l.size() == 0 || l.size() > 1) {
+            return;
+        }
+
+        PhysicalOperator po_foreach = l.get(0);
+        if (!(po_foreach instanceof POForEach)) {
+            return;
+        }
+
+        boolean foundUDF = false;
+        List<PhysicalPlan> list = ((POForEach) po_foreach).getInputPlans();
+        for (PhysicalPlan p : list) {
+            PhysicalOperator po = p.getLeaves().get(0);
+
+            // only expression operators are allowed
+            if (!(po instanceof ExpressionOperator)) {
+                return;
+            }
+
+            if (((ExpressionOperator) po).containUDF()) {
+                foundUDF = true;
+            }
+
+            if (!check(po)) {
+                return;
+            }
+        }
+
+        if (foundUDF) {
+            // if all tests are passed, reducer can run in accumulative mode
+            LOG.info("Reducer is to run in accumulative mode.");
+            po_package.setAccumulative();
+            po_foreach.setAccumulative();
+        }
+    }
 }

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java?rev=1665753&r1=1665752&r2=1665753&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java Wed Mar 11 01:27:06 2015
@@ -66,14 +66,15 @@ public class SparkJobStats extends JobSt
       }
   }
 
-  public void collectStats(JobMetricsListener jobMetricsListener) {
-      Map<String, List<TaskMetrics>> taskMetrics = jobMetricsListener.getJobMetric(jobId);
-      if (taskMetrics == null) {
-        throw new RuntimeException("No task metrics available for jobId " + jobId);
-      }
-
-      stats = combineTaskMetrics(taskMetrics);
-  }
+	public void collectStats(JobMetricsListener jobMetricsListener) {
+		if (jobMetricsListener != null) {
+			Map<String, List<TaskMetrics>> taskMetrics = jobMetricsListener.getJobMetric(jobId);
+			if (taskMetrics == null) {
+				throw new RuntimeException("No task metrics available for jobId " + jobId);
+			}
+			stats = combineTaskMetrics(taskMetrics);
+		}
+	}
 
   public Map<String, Long> getStats() {
       return stats;

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java?rev=1665753&r1=1665752&r2=1665753&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkPigStats.java Wed Mar 11 01:27:06 2015
@@ -36,6 +36,22 @@ public class SparkPigStats extends PigSt
         jobPlan.add(jobStats);
     }
 
+    public void addFailJobStats(POStore poStore, String jobId,
+                                JobMetricsListener jobMetricsListener,
+                                JavaSparkContext sparkContext,
+                                Configuration conf,
+                                Exception e) {
+        boolean isSuccess = false;
+        SparkJobStats jobStats = new SparkJobStats(jobId, jobPlan);
+        jobStats.setSuccessful(isSuccess);
+        jobStats.addOutputInfo(poStore, isSuccess, jobMetricsListener, conf);
+        jobStats.collectStats(jobMetricsListener);
+        jobPlan.add(jobStats);
+        if (e != null) {
+            jobStats.setBackendException(e);
+        }
+    }
+
     public void finish() {
         super.stop();
         display();

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1665753&r1=1665752&r2=1665753&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java Wed Mar 11 01:27:06 2015
@@ -43,10 +43,20 @@ public class SparkStatsUtil {
       // To workaround this, we will wait for this job to "finish".
       jobMetricsListener.waitForJobToEnd(jobID);
       sparkPigStats.addJobStats(poStore, jobID, jobMetricsListener,
-          sparkContext, jobConf);
+              sparkContext, jobConf);
       jobMetricsListener.cleanup(jobID);
   }
 
+    public static void addFailJobStats(String jobID,
+                                       POStore poStore,
+                                       SparkPigStats sparkPigStats,
+                                       JobConf jobConf, Exception e) {
+        JobMetricsListener jobMetricsListener = null;
+        JavaSparkContext sparkContext = null;
+        sparkPigStats.addFailJobStats(poStore, jobID, jobMetricsListener,
+                sparkContext, jobConf, e);
+    }
+
   public static boolean isJobSuccess(int jobID,
                                     JavaSparkContext sparkContext) {
       JobExecutionStatus status = getJobInfo(jobID, sparkContext).status();