You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2016/02/29 15:05:33 UTC

svn commit: r1732897 - in /pig/branches/spark: src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java test/org/apache/pig/test/TestStoreBase.java

Author: xuefu
Date: Mon Feb 29 14:05:33 2016
New Revision: 1732897

URL: http://svn.apache.org/viewvc?rev=1732897&view=rev
Log:
PIG-4243: Fix TestStore for Spark engine (Liyun via Xuefu)

Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
    pig/branches/spark/test/org/apache/pig/test/TestStoreBase.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1732897&r1=1732896&r2=1732897&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Mon Feb 29 14:05:33 2016
@@ -38,10 +38,12 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigConstants;
 import org.apache.pig.PigException;
 import org.apache.pig.backend.BackendException;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
 import org.apache.pig.backend.hadoop.executionengine.Launcher;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -110,6 +112,7 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.Utils;
+import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.spark.SparkCounters;
 import org.apache.pig.tools.pigstats.spark.SparkPigStats;
@@ -211,9 +214,8 @@ public class SparkLauncher extends Launc
 
         uploadUDFJars(sparkplan);
         new JobGraphBuilder(sparkplan, convertMap, sparkStats, sparkContext, jobMetricsListener, jobGroupID).visit();
-        cleanUpSparkJob();
+        cleanUpSparkJob(sparkStats);
         sparkStats.finish();
-
         return sparkStats;
     }
 
@@ -279,7 +281,7 @@ public class SparkLauncher extends Launc
         }
     }
 
-    private void cleanUpSparkJob() {
+    private void cleanUpSparkJob(SparkPigStats sparkStats) throws ExecException {
         LOG.info("clean up Spark Job");
         boolean isLocal = System.getenv("SPARK_MASTER") != null ? System
                 .getenv("SPARK_MASTER").equalsIgnoreCase("LOCAL") : true;
@@ -311,6 +313,28 @@ public class SparkLauncher extends Launc
                 }
             }
         }
+
+        // run cleanup for all of the stores
+        for (OutputStats output : sparkStats.getOutputStats()) {
+            POStore store = output.getPOStore();
+            try {
+                if (!output.isSuccessful()) {
+                    store.getStoreFunc().cleanupOnFailure(
+                            store.getSFile().getFileName(),
+                            Job.getInstance(output.getConf()));
+                } else {
+                    store.getStoreFunc().cleanupOnSuccess(
+                            store.getSFile().getFileName(),
+                            Job.getInstance(output.getConf()));
+                }
+            } catch (IOException e) {
+                throw new ExecException(e);
+            } catch (AbstractMethodError nsme) {
+                // Just swallow it.  This means we're running against an
+                // older instance of a StoreFunc that doesn't implement
+                // this method.
+            }
+        }
     }
 
     private void addFilesToSparkJob() throws IOException {

Modified: pig/branches/spark/test/org/apache/pig/test/TestStoreBase.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestStoreBase.java?rev=1732897&r1=1732896&r2=1732897&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestStoreBase.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestStoreBase.java Mon Feb 29 14:05:33 2016
@@ -144,24 +144,66 @@ public abstract class TestStoreBase {
         String outputFileName2 = TESTDIR + "/TestStore-output-" + new Random().nextLong() + ".txt";
 
         Map<String, Boolean> filesToVerify = new HashMap<String, Boolean>();
-        filesToVerify.put(outputFileName1 + "_cleanupOnFailure_succeeded1", Boolean.TRUE);
-        filesToVerify.put(outputFileName2 + "_cleanupOnFailure_succeeded2", Boolean.TRUE);
-        filesToVerify.put(outputFileName1 + "_cleanupOnFailure_failed1", Boolean.FALSE);
-        filesToVerify.put(outputFileName2 + "_cleanupOnFailure_failed2", Boolean.FALSE);
-        filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "1", Boolean.TRUE);
-        filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "2", Boolean.TRUE);
-        filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "1", Boolean.TRUE);
-        filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "2", Boolean.TRUE);
-        filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "1", Boolean.FALSE);
-        filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "2", Boolean.FALSE);
-        filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.TRUE);
-        filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2", Boolean.TRUE);
-        filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "1", Boolean.FALSE);
-        filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "2", Boolean.FALSE);
-        filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.TRUE);
-        filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", Boolean.TRUE);
-        filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "1", Boolean.FALSE);
-        filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "2", Boolean.FALSE);
+        if (mode.toString().startsWith("SPARK")) {
+            filesToVerify.put(outputFileName1 + "_cleanupOnFailure_succeeded1", Boolean.TRUE);
+            filesToVerify.put(outputFileName2 + "_cleanupOnFailure_succeeded2", Boolean.TRUE);
+            filesToVerify.put(outputFileName1 + "_cleanupOnFailure_failed1", Boolean.FALSE);
+            filesToVerify.put(outputFileName2 + "_cleanupOnFailure_failed2", Boolean.FALSE);
+            filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "1", Boolean.TRUE);
+             /* A = load xx;
+             store A into '1.out' using DummyStore('true','1');   -- first job should fail
+             store A into '2.out' using DummyStore('false','1');  -- second job should success
+             After multiQuery optimization the spark plan will be:
+           Split - scope-14
+            |   |
+            |   a: Store(hdfs://1.out:myudfs.DummyStore('true','1')) - scope-4
+            |   |
+            |   a: Store(hdfs://2.out:myudfs.DummyStore('false','1')) - scope-7
+            |
+            |---a: Load(hdfs://zly2.sh.intel.com:8020/user/root/multiStore.txt:org.apache.pig.builtin.PigStorage) - scope-0------
+            In current code base, once the first job fails, the second job will not be executed.
+            the FILE_SETUPJOB_CALLED of second job will not exist.
+            I explain more detail in PIG-4243
+            */
+            filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "2", Boolean.FALSE);
+            filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "1", Boolean.TRUE);
+                /*
+            In current code base, once the first job fails, the second job will not be executed.
+            the FILE_SETUPTASK_CALLED of second job will not exist.
+            */
+            filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "2", Boolean.FALSE);
+            filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "1", Boolean.FALSE);
+            filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "2", Boolean.FALSE);
+            // OutputCommitter.abortTask will not be invoked in spark mode. Detail see SPARK-7953
+            filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.FALSE);
+            filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2", Boolean.FALSE);
+            filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "1", Boolean.FALSE);
+            filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "2", Boolean.FALSE);
+            // OutputCommitter.abortJob will not be invoked in spark mode. Detail see SPARK-7953
+            filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.FALSE);
+            filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", Boolean.FALSE);
+            filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "1", Boolean.FALSE);
+            filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "2", Boolean.FALSE);
+        } else {
+            filesToVerify.put(outputFileName1 + "_cleanupOnFailure_succeeded1", Boolean.TRUE);
+            filesToVerify.put(outputFileName2 + "_cleanupOnFailure_succeeded2", Boolean.TRUE);
+            filesToVerify.put(outputFileName1 + "_cleanupOnFailure_failed1", Boolean.FALSE);
+            filesToVerify.put(outputFileName2 + "_cleanupOnFailure_failed2", Boolean.FALSE);
+            filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "1", Boolean.TRUE);
+            filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "2", Boolean.TRUE);
+            filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "1", Boolean.TRUE);
+            filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "2", Boolean.TRUE);
+            filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "1", Boolean.FALSE);
+            filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "2", Boolean.FALSE);
+            filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.TRUE);
+            filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2", Boolean.TRUE);
+            filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "1", Boolean.FALSE);
+            filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "2", Boolean.FALSE);
+            filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.TRUE);
+            filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", Boolean.TRUE);
+            filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "1", Boolean.FALSE);
+            filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "2", Boolean.FALSE);
+        }
 
         String[] inputData = new String[]{"hello\tworld", "bye\tworld"};