You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2016/07/04 07:00:21 UTC
svn commit: r1751219 - in /pig/branches/spark:
src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
test/org/apache/pig/test/TestGrunt.java
Author: praveen
Date: Mon Jul 4 07:00:21 2016
New Revision: 1751219
URL: http://svn.apache.org/viewvc?rev=1751219&view=rev
Log:
PIG-4927: Support stop.on.failure in spark mode (Liyun via Praveen)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/test/org/apache/pig/test/TestGrunt.java
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java?rev=1751219&r1=1751218&r2=1751219&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java Mon Jul 4 07:00:21 2016
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter;
@@ -51,6 +52,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.DependencyOrderWalker;
import org.apache.pig.impl.plan.OperatorKey;
@@ -76,8 +78,11 @@ public class JobGraphBuilder extends Spa
private Map<OperatorKey, RDD<Tuple>> sparkOpRdds = new HashMap<OperatorKey, RDD<Tuple>>();
private Map<OperatorKey, RDD<Tuple>> physicalOpRdds = new HashMap<OperatorKey, RDD<Tuple>>();
private JobConf jobConf = null;
+ private PigContext pc;
- public JobGraphBuilder(SparkOperPlan plan, Map<Class<? extends PhysicalOperator>, RDDConverter> convertMap, SparkPigStats sparkStats, JavaSparkContext sparkContext, JobMetricsListener jobMetricsListener, String jobGroupID, JobConf jobConf) {
+ public JobGraphBuilder(SparkOperPlan plan, Map<Class<? extends PhysicalOperator>, RDDConverter> convertMap,
+ SparkPigStats sparkStats, JavaSparkContext sparkContext, JobMetricsListener
+ jobMetricsListener, String jobGroupID, JobConf jobConf, PigContext pc) {
super(plan, new DependencyOrderWalker<SparkOperator, SparkOperPlan>(plan, true));
this.sparkPlan = plan;
this.convertMap = convertMap;
@@ -86,6 +91,7 @@ public class JobGraphBuilder extends Spa
this.jobMetricsListener = jobMetricsListener;
this.jobGroupID = jobGroupID;
this.jobConf = jobConf;
+ this.pc = pc;
}
@Override
@@ -98,7 +104,9 @@ public class JobGraphBuilder extends Spa
finishUDFs(sparkOp.physicalPlan);
} catch (InterruptedException e) {
throw new RuntimeException("fail to get the rdds of this spark operator: ", e);
- } catch (JobCreationException e){
+ } catch (JobCreationException e) {
+ throw new RuntimeException("fail to get the rdds of this spark operator: ", e);
+ } catch (ExecException e) {
throw new RuntimeException("fail to get the rdds of this spark operator: ", e);
} catch (IOException e) {
throw new RuntimeException("fail to get the rdds of this spark operator: ", e);
@@ -161,7 +169,7 @@ public class JobGraphBuilder extends Spa
}
}
- private void sparkOperToRDD(SparkOperator sparkOperator) throws InterruptedException, VisitorException, JobCreationException {
+ private void sparkOperToRDD(SparkOperator sparkOperator) throws InterruptedException, VisitorException, JobCreationException, ExecException {
List<SparkOperator> predecessors = sparkPlan
.getPredecessors(sparkOperator);
Set<OperatorKey> predecessorOfPreviousSparkOp = new HashSet<OperatorKey>();
@@ -192,6 +200,14 @@ public class JobGraphBuilder extends Spa
LOG.error("throw exception in sparkOperToRDD: ", e);
exception = e;
isFail = true;
+ boolean stopOnFailure = Boolean.valueOf(pc
+ .getProperties().getProperty("stop.on.failure",
+ "false"));
+ if (stopOnFailure) {
+ int errCode = 6017;
+ throw new ExecException(e.getMessage(), errCode,
+ PigException.REMOTE_ENVIRONMENT);
+ }
}
}
@@ -334,4 +350,4 @@ public class JobGraphBuilder extends Spa
seenJobIDs.addAll(unseenJobIDs);
return unseenJobIDs;
}
-}
\ No newline at end of file
+}
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1751219&r1=1751218&r2=1751219&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Mon Jul 4 07:00:21 2016
@@ -215,7 +215,7 @@ public class SparkLauncher extends Launc
convertMap.put(POPreCombinerLocalRearrange.class, new LocalRearrangeConverter());
uploadResources(sparkplan);
- new JobGraphBuilder(sparkplan, convertMap, sparkStats, sparkContext, jobMetricsListener, jobGroupID, jobConf).visit();
+ new JobGraphBuilder(sparkplan, convertMap, sparkStats, sparkContext, jobMetricsListener, jobGroupID, jobConf, pigContext).visit();
cleanUpSparkJob(sparkStats);
sparkStats.finish();
return sparkStats;
Modified: pig/branches/spark/test/org/apache/pig/test/TestGrunt.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestGrunt.java?rev=1751219&r1=1751218&r2=1751219&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestGrunt.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestGrunt.java Mon Jul 4 07:00:21 2016
@@ -998,7 +998,13 @@ public class TestGrunt {
grunt.exec();
} catch (PigException e) {
caught = true;
- assertTrue(e.getErrorCode() == 6017);
+ if (!Util.isSparkExecType(cluster.getExecType())) {
+ assertTrue(e.getErrorCode() == 6017);
+ } else {
+ //In spark mode, We wrap ExecException to RunTimeException and is thrown out in JobGraphBuilder#sparkOperToRDD,
+ //So unwrap the exception here
+ assertTrue(((ExecException) e.getCause().getCause()).getErrorCode() == 6017);
+ }
}
assertFalse(server.existsFile("done"));