You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by zl...@apache.org on 2017/03/01 08:38:34 UTC

svn commit: r1784875 - in /pig/branches/spark: src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java test/org/apache/pig/test/TestEmptyInputDir.java

Author: zly
Date: Wed Mar  1 08:38:33 2017
New Revision: 1784875

URL: http://svn.apache.org/viewvc?rev=1784875&view=rev
Log:
PIG-5140:fix TestEmptyInputDir unit test failure after PIG-5132(Adam via Liyun)

Modified:
    pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
    pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
    pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java

Modified: pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java?rev=1784875&r1=1784874&r2=1784875&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java (original)
+++ pig/branches/spark/src/org/apache/pig/backend/hadoop/executionengine/spark/JobGraphBuilder.java Wed Mar  1 08:38:33 2017
@@ -69,6 +69,7 @@ import com.google.common.collect.Lists;
 public class JobGraphBuilder extends SparkOpPlanVisitor {
 
     private static final Log LOG = LogFactory.getLog(JobGraphBuilder.class);
+    public static final int NULLPART_JOB_ID = -1;
 
     private Map<Class<? extends PhysicalOperator>, RDDConverter> convertMap = null;
     private SparkPigStats sparkStats = null;
@@ -203,6 +204,21 @@ public class JobGraphBuilder extends Spa
                 if (!isFail) {
                     List<Integer> jobIDs = getJobIDs(seenJobIDs);
                     for (POStore poStore : poStores) {
+                        if (jobIDs.size() == 0) {
+                            /**
+                             * Spark internally misses information about its jobs that mapped 0 partitions.
+                             * Although these have valid jobIds, Spark itself is unable to tell anything about them.
+                             * If the store rdd had 0 partitions we return a dummy success stat with jobId =
+                             * NULLPART_JOB_ID, in any other cases we throw exception if no new jobId was seen.
+                             */
+                            if (physicalOpRdds.get(poStore.getOperatorKey()).partitions().length == 0) {
+                                sparkStats.addJobStats(poStore, sparkOperator, NULLPART_JOB_ID, null, sparkContext);
+                                return;
+                            } else {
+                                throw new RuntimeException("Expected at least one unseen jobID "
+                                        + " in this call to getJobIdsForGroup, but got 0");
+                            }
+                        }
                         SparkStatsUtil.waitForJobAddStats(jobIDs.get(i++), poStore, sparkOperator,
                                 jobMetricsListener, sparkContext, sparkStats);
                     }
@@ -351,11 +367,6 @@ public class JobGraphBuilder extends Spa
                         .getJobIdsForGroup(jobGroupID))));
         groupjobIDs.removeAll(seenJobIDs);
         List<Integer> unseenJobIDs = new ArrayList<Integer>(groupjobIDs);
-        if (unseenJobIDs.size() == 0) {
-            throw new RuntimeException("Expected at least one unseen jobID "
-                    + " in this call to getJobIdsForGroup, but got "
-                    + unseenJobIDs.size());
-        }
         seenJobIDs.addAll(unseenJobIDs);
         return unseenJobIDs;
     }

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java?rev=1784875&r1=1784874&r2=1784875&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkStatsUtil.java Wed Mar  1 08:38:33 2017
@@ -25,6 +25,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.spark.JobGraphBuilder;
 import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
 import org.apache.pig.backend.hadoop.executionengine.spark.operator.NativeSparkOperator;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
@@ -120,6 +121,9 @@ public class SparkStatsUtil {
 
     public static boolean isJobSuccess(int jobID,
                                        JavaSparkContext sparkContext) {
+        if (jobID == JobGraphBuilder.NULLPART_JOB_ID) {
+            return true;
+        }
         JobExecutionStatus status = getJobInfo(jobID, sparkContext).status();
         if (status == JobExecutionStatus.SUCCEEDED) {
             return true;

Modified: pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java?rev=1784875&r1=1784874&r2=1784875&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestEmptyInputDir.java Wed Mar  1 08:38:33 2017
@@ -33,6 +33,7 @@ import org.apache.pig.PigRunner;
 import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
 import org.junit.AfterClass;
+import org.junit.Assume;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
@@ -85,7 +86,8 @@ public class TestEmptyInputDir {
                 assertEquals(0, js.getNumberMaps());
             }
 
-            assertEmptyOutputFile();
+            //Spark doesn't create an empty result file part-*, only a _SUCCESS file if input dir was empty
+            Assume.assumeTrue("Skip this test for Spark. See PIG-5140", !Util.isSparkExecType(cluster.getExecType()));
         } finally {
             new File(PIG_FILE).delete();
             Util.deleteFile(cluster, OUTPUT_FILE);