You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/03/01 08:38:34 UTC
svn commit: r1784875 - in /pig/branches/spark:
src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
test/org/apache/pig/test/TestEmptyInputDir.java
Author: zly
Date: Wed Mar 1 08:38:33 2017
New Revision: 1784875
URL: http://svn.apache.org/viewvc?rev=1784875&view=rev
Log:
PIG-5140:fix TestEmptyInputDir unit test failure after PIG-5132(Adam via Liyun)
Modified:
pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java
Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java?rev=1784875&r1=1784874&r2=1784875&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java Wed Mar 1 08:38:33 2017
@@ -69,6 +69,7 @@ import com.google.common.collect.Lists;
public class JobGraphBuilder extends SparkOpPlanVisitor {
private static final Log LOG = LogFactory.getLog(JobGraphBuilder.class);
+ public static final int NULLPART_JOB_ID = -1;
private Map<Class<? extends PhysicalOperator>, RDDConverter> convertMap = null;
private SparkPigStats sparkStats = null;
@@ -203,6 +204,21 @@ public class JobGraphBuilder extends Spa
if (!isFail) {
List<Integer> jobIDs = getJobIDs(seenJobIDs);
for (POStore poStore : poStores) {
+ if (jobIDs.size() == 0) {
+ /**
+ * Spark internally misses information about its jobs that mapped 0 partitions.
+ * Although these have valid jobIds, Spark itself is unable to tell anything about them.
+ * If the store rdd had 0 partitions we return a dummy success stat with jobId =
+ * NULLPART_JOB_ID, in any other cases we throw exception if no new jobId was seen.
+ */
+ if (physicalOpRdds.get(poStore.getOperatorKey()).partitions().length == 0) {
+ sparkStats.addJobStats(poStore, sparkOperator, NULLPART_JOB_ID, null, sparkContext);
+ return;
+ } else {
+ throw new RuntimeException("Expected at least one unseen jobID "
+ + " in this call to getJobIdsForGroup, but got 0");
+ }
+ }
SparkStatsUtil.waitForJobAddStats(jobIDs.get(i++), poStore, sparkOperator,
jobMetricsListener, sparkContext, sparkStats);
}
@@ -351,11 +367,6 @@ public class JobGraphBuilder extends Spa
.getJobIdsForGroup(jobGroupID))));
groupjobIDs.removeAll(seenJobIDs);
List<Integer> unseenJobIDs = new ArrayList<Integer>(groupjobIDs);
- if (unseenJobIDs.size() == 0) {
- throw new RuntimeException("Expected at least one unseen jobID "
- + " in this call to getJobIdsForGroup, but got "
- + unseenJobIDs.size());
- }
seenJobIDs.addAll(unseenJobIDs);
return unseenJobIDs;
}
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1784875&r1=1784874&r2=1784875&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java Wed Mar 1 08:38:33 2017
@@ -25,6 +25,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.spark.JobGraphBuilder;
import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
@@ -120,6 +121,9 @@ public class SparkStatsUtil {
public static boolean isJobSuccess(int jobID,
JavaSparkContext sparkContext) {
+ if (jobID == JobGraphBuilder.NULLPART_JOB_ID) {
+ return true;
+ }
JobExecutionStatus status = getJobInfo(jobID, sparkContext).status();
if (status == JobExecutionStatus.SUCCEEDED) {
return true;
Modified: pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java?rev=1784875&r1=1784874&r2=1784875&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java Wed Mar 1 08:38:33 2017
@@ -33,6 +33,7 @@ import org.apache.pig.PigRunner;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
import org.junit.AfterClass;
+import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -85,7 +86,8 @@ public class TestEmptyInputDir {
assertEquals(0, js.getNumberMaps());
}
- assertEmptyOutputFile();
+ //Spark doesn't create an empty result file part-*, only a _SUCCESS file if input dir was empty
+ Assume.assumeTrue("Skip this test for Spark. See PIG-5140", !Util.isSparkExecType(cluster.getExecType()));
} finally {
new File(PIG_FILE).delete();
Util.deleteFile(cluster, OUTPUT_FILE);