You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2016/02/29 15:05:33 UTC
svn commit: r1732897 - in /pig/branches/spark:
src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
test/org/apache/pig/test/TestStoreBase.java
Author: xuefu
Date: Mon Feb 29 14:05:33 2016
New Revision: 1732897
URL: http://svn.apache.org/viewvc?rev=1732897&view=rev
Log:
PIG-4243: Fix TestStore for Spark engine (Liyun via Xuefu)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
pig/branches/spark/test/org/apache/pig/test/TestStoreBase.java
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java?rev=1732897&r1=1732896&r2=1732897&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/SparkLauncher.java Mon Feb 29 14:05:33 2016
@@ -38,10 +38,12 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapreduce.Job;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigConstants;
import org.apache.pig.PigException;
import org.apache.pig.backend.BackendException;
+import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.Launcher;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -110,6 +112,7 @@ import org.apache.pig.impl.plan.Operator
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.Utils;
+import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.spark.SparkCounters;
import org.apache.pig.tools.pigstats.spark.SparkPigStats;
@@ -211,9 +214,8 @@ public class SparkLauncher extends Launc
uploadUDFJars(sparkplan);
new JobGraphBuilder(sparkplan, convertMap, sparkStats, sparkContext, jobMetricsListener, jobGroupID).visit();
- cleanUpSparkJob();
+ cleanUpSparkJob(sparkStats);
sparkStats.finish();
-
return sparkStats;
}
@@ -279,7 +281,7 @@ public class SparkLauncher extends Launc
}
}
- private void cleanUpSparkJob() {
+ private void cleanUpSparkJob(SparkPigStats sparkStats) throws ExecException {
LOG.info("clean up Spark Job");
boolean isLocal = System.getenv("SPARK_MASTER") != null ? System
.getenv("SPARK_MASTER").equalsIgnoreCase("LOCAL") : true;
@@ -311,6 +313,28 @@ public class SparkLauncher extends Launc
}
}
}
+
+ // run cleanup for all of the stores
+ for (OutputStats output : sparkStats.getOutputStats()) {
+ POStore store = output.getPOStore();
+ try {
+ if (!output.isSuccessful()) {
+ store.getStoreFunc().cleanupOnFailure(
+ store.getSFile().getFileName(),
+ Job.getInstance(output.getConf()));
+ } else {
+ store.getStoreFunc().cleanupOnSuccess(
+ store.getSFile().getFileName(),
+ Job.getInstance(output.getConf()));
+ }
+ } catch (IOException e) {
+ throw new ExecException(e);
+ } catch (AbstractMethodError nsme) {
+ // Just swallow it. This means we're running against an
+ // older instance of a StoreFunc that doesn't implement
+ // this method.
+ }
+ }
}
private void addFilesToSparkJob() throws IOException {
Modified: pig/branches/spark/test/org/apache/pig/test/TestStoreBase.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestStoreBase.java?rev=1732897&r1=1732896&r2=1732897&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestStoreBase.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestStoreBase.java Mon Feb 29 14:05:33 2016
@@ -144,24 +144,66 @@ public abstract class TestStoreBase {
String outputFileName2 = TESTDIR + "/TestStore-output-" + new Random().nextLong() + ".txt";
Map<String, Boolean> filesToVerify = new HashMap<String, Boolean>();
- filesToVerify.put(outputFileName1 + "_cleanupOnFailure_succeeded1", Boolean.TRUE);
- filesToVerify.put(outputFileName2 + "_cleanupOnFailure_succeeded2", Boolean.TRUE);
- filesToVerify.put(outputFileName1 + "_cleanupOnFailure_failed1", Boolean.FALSE);
- filesToVerify.put(outputFileName2 + "_cleanupOnFailure_failed2", Boolean.FALSE);
- filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "1", Boolean.TRUE);
- filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "2", Boolean.TRUE);
- filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "1", Boolean.TRUE);
- filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "2", Boolean.TRUE);
- filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "1", Boolean.FALSE);
- filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "2", Boolean.FALSE);
- filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.TRUE);
- filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2", Boolean.TRUE);
- filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "1", Boolean.FALSE);
- filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "2", Boolean.FALSE);
- filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.TRUE);
- filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", Boolean.TRUE);
- filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "1", Boolean.FALSE);
- filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "2", Boolean.FALSE);
+ if (mode.toString().startsWith("SPARK")) {
+ filesToVerify.put(outputFileName1 + "_cleanupOnFailure_succeeded1", Boolean.TRUE);
+ filesToVerify.put(outputFileName2 + "_cleanupOnFailure_succeeded2", Boolean.TRUE);
+ filesToVerify.put(outputFileName1 + "_cleanupOnFailure_failed1", Boolean.FALSE);
+ filesToVerify.put(outputFileName2 + "_cleanupOnFailure_failed2", Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "1", Boolean.TRUE);
+ /* A = load xx;
+ store A into '1.out' using DummyStore('true','1'); -- first job should fail
+ store A into '2.out' using DummyStore('false','1'); -- second job should success
+ After multiQuery optimization the spark plan will be:
+ Split - scope-14
+ | |
+ | a: Store(hdfs://1.out:myudfs.DummyStore('true','1')) - scope-4
+ | |
+ | a: Store(hdfs://2.out:myudfs.DummyStore('false','1')) - scope-7
+ |
+ |---a: Load(hdfs://zly2.sh.intel.com:8020/user/root/multiStore.txt:org.apache.pig.builtin.PigStorage) - scope-0------
+ In current code base, once the first job fails, the second job will not be executed.
+ the FILE_SETUPJOB_CALLED of second job will not exist.
+ I explain more detail in PIG-4243
+ */
+ filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "2", Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "1", Boolean.TRUE);
+ /*
+ In current code base, once the first job fails, the second job will not be executed.
+ the FILE_SETUPTASK_CALLED of second job will not exist.
+ */
+ filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "2", Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "1", Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "2", Boolean.FALSE);
+ // OutputCommitter.abortTask will not be invoked in spark mode. Detail see SPARK-7953
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2", Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "1", Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "2", Boolean.FALSE);
+ // OutputCommitter.abortJob will not be invoked in spark mode. Detail see SPARK-7953
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "1", Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "2", Boolean.FALSE);
+ } else {
+ filesToVerify.put(outputFileName1 + "_cleanupOnFailure_succeeded1", Boolean.TRUE);
+ filesToVerify.put(outputFileName2 + "_cleanupOnFailure_succeeded2", Boolean.TRUE);
+ filesToVerify.put(outputFileName1 + "_cleanupOnFailure_failed1", Boolean.FALSE);
+ filesToVerify.put(outputFileName2 + "_cleanupOnFailure_failed2", Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "1", Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_SETUPJOB_CALLED + "2", Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "1", Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_SETUPTASK_CALLED + "2", Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "1", Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_COMMITTASK_CALLED + "2", Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "1", Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTTASK_CALLED + "2", Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "1", Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_COMMITJOB_CALLED + "2", Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "1", Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_ABORTJOB_CALLED + "2", Boolean.TRUE);
+ filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "1", Boolean.FALSE);
+ filesToVerify.put(DummyOutputCommitter.FILE_CLEANUPJOB_CALLED + "2", Boolean.FALSE);
+ }
String[] inputData = new String[]{"hello\tworld", "bye\tworld"};