You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2016/07/04 07:00:21 UTC

svn commit: r1751219 - in /pig/branches/spark: src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java test/org/apache/pig/test/TestGrunt.java

Author: praveen
Date: Mon Jul  4 07:00:21 2016
New Revision: 1751219

URL: http://svn.apache.org/viewvc?rev=1751219&view=rev
Log:
PIG-4927: Support stop.on.failure in spark mode (Liyun via Praveen)

Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    pig/branches/spark/test/org/apache/pig/test/TestGrunt.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java?rev=1751219&r1=1751218&r2=1751219&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java Mon Jul  4 07:00:21 2016
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
 import org.apache.pig.PigException;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PhyPlanSetter;
@@ -51,6 +52,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperPlan;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
 import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.DependencyOrderWalker;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -76,8 +78,11 @@ public class JobGraphBuilder extends Spa
     private Map<OperatorKey, RDD<Tuple>> sparkOpRdds = new HashMap<OperatorKey, RDD<Tuple>>();
     private Map<OperatorKey, RDD<Tuple>> physicalOpRdds = new HashMap<OperatorKey, RDD<Tuple>>();
     private JobConf jobConf = null;
+    private PigContext pc;
 
-    public JobGraphBuilder(SparkOperPlan plan, Map<Class<? extends PhysicalOperator>, RDDConverter> convertMap, SparkPigStats sparkStats, JavaSparkContext sparkContext, JobMetricsListener jobMetricsListener, String jobGroupID, JobConf jobConf) {
+    public JobGraphBuilder(SparkOperPlan plan, Map<Class<? extends PhysicalOperator>, RDDConverter> convertMap,
+                           SparkPigStats sparkStats, JavaSparkContext sparkContext, JobMetricsListener
+            jobMetricsListener, String jobGroupID, JobConf jobConf, PigContext pc) {
         super(plan, new DependencyOrderWalker<SparkOperator, SparkOperPlan>(plan, true));
         this.sparkPlan = plan;
         this.convertMap = convertMap;
@@ -86,6 +91,7 @@ public class JobGraphBuilder extends Spa
         this.jobMetricsListener = jobMetricsListener;
         this.jobGroupID = jobGroupID;
         this.jobConf = jobConf;
+        this.pc = pc;
     }
 
     @Override
@@ -98,7 +104,9 @@ public class JobGraphBuilder extends Spa
             finishUDFs(sparkOp.physicalPlan);
         } catch (InterruptedException e) {
             throw new RuntimeException("fail to get the rdds of this spark operator: ", e);
-        } catch (JobCreationException e){
+        } catch (JobCreationException e) {
+            throw new RuntimeException("fail to get the rdds of this spark operator: ", e);
+        } catch (ExecException e) {
             throw new RuntimeException("fail to get the rdds of this spark operator: ", e);
         } catch (IOException e) {
             throw new RuntimeException("fail to get the rdds of this spark operator: ", e);
@@ -161,7 +169,7 @@ public class JobGraphBuilder extends Spa
         }
     }
 
-    private void sparkOperToRDD(SparkOperator sparkOperator) throws InterruptedException, VisitorException, JobCreationException {
+    private void sparkOperToRDD(SparkOperator sparkOperator) throws InterruptedException, VisitorException, JobCreationException, ExecException {
         List<SparkOperator> predecessors = sparkPlan
                 .getPredecessors(sparkOperator);
         Set<OperatorKey> predecessorOfPreviousSparkOp = new HashSet<OperatorKey>();
@@ -192,6 +200,14 @@ public class JobGraphBuilder extends Spa
                     LOG.error("throw exception in sparkOperToRDD: ", e);
                     exception = e;
                     isFail = true;
+                    boolean stopOnFailure = Boolean.valueOf(pc
+                            .getProperties().getProperty("stop.on.failure",
+                                    "false"));
+                    if (stopOnFailure) {
+                        int errCode = 6017;
+                        throw new ExecException(e.getMessage(), errCode,
+                                PigException.REMOTE_ENVIRONMENT);
+                    }
                 }
             }
 
@@ -334,4 +350,4 @@ public class JobGraphBuilder extends Spa
         seenJobIDs.addAll(unseenJobIDs);
         return unseenJobIDs;
     }
-}
\ No newline at end of file
+}

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1751219&r1=1751218&r2=1751219&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Mon Jul  4 07:00:21 2016
@@ -215,7 +215,7 @@ public class SparkLauncher extends Launc
         convertMap.put(POPreCombinerLocalRearrange.class, new LocalRearrangeConverter());
 
         uploadResources(sparkplan);
-        new JobGraphBuilder(sparkplan, convertMap, sparkStats, sparkContext, jobMetricsListener, jobGroupID, jobConf).visit();
+        new JobGraphBuilder(sparkplan, convertMap, sparkStats, sparkContext, jobMetricsListener, jobGroupID, jobConf, pigContext).visit();
         cleanUpSparkJob(sparkStats);
         sparkStats.finish();
         return sparkStats;

Modified: pig/branches/spark/test/org/apache/pig/test/TestGrunt.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestGrunt.java?rev=1751219&r1=1751218&r2=1751219&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestGrunt.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestGrunt.java Mon Jul  4 07:00:21 2016
@@ -998,7 +998,13 @@ public class TestGrunt {
             grunt.exec();
         } catch (PigException e) {
             caught = true;
-            assertTrue(e.getErrorCode() == 6017);
+            if (!Util.isSparkExecType(cluster.getExecType())) {
+                assertTrue(e.getErrorCode() == 6017);
+            } else {
+                //In spark mode, We wrap ExecException to RunTimeException and is thrown out in JobGraphBuilder#sparkOperToRDD,
+                //So unwrap the exception here
+                assertTrue(((ExecException) e.getCause().getCause()).getErrorCode() == 6017);
+            }
         }
 
         assertFalse(server.existsFile("done"));