You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2016/02/15 06:33:13 UTC

svn commit: r1730447 - in /pig/branches/spark: src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java test/org/apache/pig/test/TestPigRunner.java

Author: xuefu
Date: Mon Feb 15 05:33:12 2016
New Revision: 1730447

URL: http://svn.apache.org/viewvc?rev=1730447&view=rev
Log:
PIG-4616: Fix UT errors of TestPigRunner in Spark mode (Xianda via Xuefu)

Modified:
    pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
    pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java

Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java?rev=1730447&r1=1730446&r2=1730447&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java Mon Feb 15 05:33:12 2016
@@ -21,13 +21,11 @@ package org.apache.pig.tools.pigstats.sp
 import java.util.List;
 import java.util.Map;
 
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import scala.Option;
 
-import com.google.common.collect.Maps;
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.Counters;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
 import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
@@ -37,16 +35,20 @@ import org.apache.pig.tools.pigstats.Inp
 import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
-
+import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
 import org.apache.spark.executor.ShuffleReadMetrics;
 import org.apache.spark.executor.ShuffleWriteMetrics;
 import org.apache.spark.executor.TaskMetrics;
 
+import com.google.common.collect.Maps;
+
 public class SparkJobStats extends JobStats {
 
     private int jobId;
     private Map<String, Long> stats = Maps.newLinkedHashMap();
     private boolean disableCounter;
+    private Counters counters = null;
+    public static String FS_COUNTER_GROUP = "FS_GROUP";
 
     protected SparkJobStats(int jobId, PigStats.JobGraph plan, Configuration conf) {
         this(String.valueOf(jobId), plan, conf);
@@ -61,6 +63,7 @@ public class SparkJobStats extends JobSt
     public void setConf(Configuration conf) {
         super.setConf(conf);
         disableCounter = conf.getBoolean("pig.disable.counter", false);
+        initializeHadoopCounter();
     }
 
     public void addOutputInfo(POStore poStore, boolean success,
@@ -186,10 +189,14 @@ public class SparkJobStats extends JobSt
         results.put("DiskBytesSpilled", diskBytesSpilled);
         if (inputMetricExist) {
             results.put("BytesRead", bytesRead);
+            hdfsBytesRead = bytesRead;
+            counters.incrCounter(FS_COUNTER_GROUP, MRPigStatsUtil.HDFS_BYTES_READ, hdfsBytesRead);
         }
 
         if (outputMetricExist) {
             results.put("BytesWritten", bytesWritten);
+            hdfsBytesWritten = bytesWritten;
+            counters.incrCounter(FS_COUNTER_GROUP, MRPigStatsUtil.HDFS_BYTES_WRITTEN, hdfsBytesWritten);
         }
 
         if (shuffleReadMetricExist) {
@@ -300,7 +307,7 @@ public class SparkJobStats extends JobSt
 
     @Override
     public Counters getHadoopCounters() {
-        throw new UnsupportedOperationException();
+        return counters;
     }
 
     @Override
@@ -320,4 +327,11 @@ public class SparkJobStats extends JobSt
         annotate(ALIAS_LOCATION, sparkScriptInfo.getAliasLocation(sparkOperator));
         annotate(FEATURE, sparkScriptInfo.getPigFeatures(sparkOperator));
     }
+
+    private void initializeHadoopCounter() {
+        counters = new Counters();
+        Counters.Group fsGrp = counters.addGroup(FS_COUNTER_GROUP, FS_COUNTER_GROUP);
+        fsGrp.addCounter(MRPigStatsUtil.HDFS_BYTES_READ, MRPigStatsUtil.HDFS_BYTES_READ, 0);
+        fsGrp.addCounter(MRPigStatsUtil.HDFS_BYTES_WRITTEN, MRPigStatsUtil.HDFS_BYTES_WRITTEN, 0);
+    }
 }

Modified: pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java?rev=1730447&r1=1730446&r2=1730447&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java Mon Feb 15 05:33:12 2016
@@ -35,7 +35,6 @@ import java.util.Properties;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapred.Counters;
-import org.apache.pig.ExecType;
 import org.apache.pig.PigConfiguration;
 import org.apache.pig.PigRunner;
 import org.apache.pig.PigRunner.ReturnCode;
@@ -45,7 +44,6 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.newplan.Operator;
-import org.apache.pig.tools.pigstats.EmptyPigStats;
 import org.apache.pig.tools.pigstats.InputStats;
 import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.OutputStats;
@@ -54,6 +52,7 @@ import org.apache.pig.tools.pigstats.Pig
 import org.apache.pig.tools.pigstats.PigStatsUtil;
 import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
 import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
+import org.apache.pig.tools.pigstats.spark.SparkJobStats;
 import org.junit.AfterClass;
 import org.junit.Assume;
 import org.junit.Before;
@@ -200,6 +199,11 @@ public class TestPigRunner {
             if (execType.toString().startsWith("tez")) {
                 assertEquals(1, stats.getNumberJobs());
                 assertEquals(stats.getJobGraph().size(), 1);
+            } else if (execType.toString().startsWith("spark")) {
+                // In spark mode,the number of spark job is calculated by the number of POStore.
+                // 1 POStore generates 1 spark job.
+                assertEquals(1, stats.getNumberJobs());
+                assertEquals(stats.getJobGraph().size(), 1);
             } else {
                 assertEquals(2, stats.getNumberJobs());
                 assertEquals(stats.getJobGraph().size(), 2);
@@ -264,6 +268,10 @@ public class TestPigRunner {
                 assertEquals(stats.getJobGraph().size(), 1);
                 // 5 vertices
                 assertEquals(stats.getJobGraph().getSources().get(0).getPlan().size(), 5);
+            } else if (execType.equals("spark")) {
+                // In spark mode,the number of spark job is calculated by the number of POStore.
+                // 1 POStore generates 1 spark job.
+                assertEquals(stats.getJobGraph().size(), 1);
             } else {
                 assertEquals(stats.getJobGraph().size(), 4);
             }
@@ -284,7 +292,12 @@ public class TestPigRunner {
                 //       Need to investigate
                 // assertEquals("B", ((JobStats) stats.getJobGraph().getPredecessors(
                 //        js).get(0)).getAlias());
+            } else if (execType.equals("spark")) {
+                assertEquals("A,B", ((JobStats) stats.getJobGraph().getSources().get(
+                        0)).getAlias());
+                // TODO: alias is not set for sample-aggregation/partition/sort job.
             } else {
+
                 assertEquals("A", ((JobStats) stats.getJobGraph().getSources().get(
                         0)).getAlias());
                 assertEquals("B", ((JobStats) stats.getJobGraph().getPredecessors(
@@ -313,7 +326,14 @@ public class TestPigRunner {
             String[] args = { "-x", execType, PIG_FILE };
             PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
             assertTrue(stats.isSuccessful());
-            assertTrue(stats.getJobGraph().size() == 1);
+            if (execType.equals("spark")) {
+                // In spark mode,the number of spark job is calculated by the number of POStore.
+                // 2 POStore generates 2 spark jobs.
+                assertTrue(stats.getJobGraph().size() == 2);
+            } else {
+                assertTrue(stats.getJobGraph().size() == 1);
+            }
+
             // Each output file should include the following:
             // output:
             //   1\t2\t3\n
@@ -362,7 +382,13 @@ public class TestPigRunner {
             String[] args = { "-x", execType, PIG_FILE };
             PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
             assertTrue(stats.isSuccessful());
-            assertEquals(stats.getJobGraph().size(), 1);
+            if (execType.equals("spark")) {
+                // In spark mode,the number of spark job is calculated by the number of POStore.
+                // 2 POStore generates 2 spark jobs.
+                assertEquals(stats.getJobGraph().size(), 2);
+            } else {
+                assertEquals(stats.getJobGraph().size(), 1);
+            }
 
             // Each output file should include the following:
             // output:
@@ -411,17 +437,19 @@ public class TestPigRunner {
             PigStats stats = PigRunner.run(args, null);
             Iterator<JobStats> iter = stats.getJobGraph().iterator();
             while (iter.hasNext()) {
-                 JobStats js=iter.next();
-                 if (execType.equals("tez")) {
-                     assertEquals(js.getState().name(), "FAILED");
-                 } else {
-                     if(js.getState().name().equals("FAILED")) {
-                         List<Operator> ops=stats.getJobGraph().getSuccessors(js);
-                         for(Operator op : ops ) {
-                             assertEquals(((JobStats)op).getState().toString(), "UNKNOWN");
-                         }
-                     }
-                 }
+                JobStats js=iter.next();
+                if (execType.equals("tez")) {
+                    assertEquals(js.getState().name(), "FAILED");
+                } else if (execType.equals("spark")) {
+                    assertEquals(js.getState().name(), "FAILED");
+                } else {
+                    if(js.getState().name().equals("FAILED")) {
+                        List<Operator> ops=stats.getJobGraph().getSuccessors(js);
+                        for(Operator op : ops ) {
+                            assertEquals(((JobStats)op).getState().toString(), "UNKNOWN");
+                        }
+                    }
+                }
             }
         } finally {
             new File(PIG_FILE).delete();
@@ -630,8 +658,14 @@ public class TestPigRunner {
             PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
 
             assertTrue(stats.isSuccessful());
-
-            assertEquals(1, stats.getNumberJobs());
+            //In spark mode, one POStore will generate a spark action(spark job).
+            //In this case, the sparkplan has 1 sparkOperator(after multiquery optimization) but has 2 POStores
+            //which generate 2 spark actions(spark jobs).
+            if (execType.equals("spark")) {
+                assertEquals(2, stats.getNumberJobs());
+            } else {
+                assertEquals(1, stats.getNumberJobs());
+            }
             List<OutputStats> outputs = stats.getOutputStats();
             assertEquals(2, outputs.size());
             for (OutputStats outstats : outputs) {
@@ -743,7 +777,14 @@ public class TestPigRunner {
 
             assertTrue(!stats.isSuccessful());
             assertTrue(stats.getReturnCode() != 0);
-            assertTrue(stats.getOutputStats().size() == 0);
+            if (execType.equals("spark")) {
+                //Currently, even if failed, spark engine will add a failed OutputStats,
+                // see: SparkPigStats.addFailJobStats()
+                assertTrue(stats.getOutputStats().size() == 1);
+                assertTrue(stats.getOutputStats().get(0).isSuccessful() == false);
+            } else {
+                assertTrue(stats.getOutputStats().size() == 0);
+            }
 
         } finally {
             new File(PIG_FILE).delete();
@@ -766,7 +807,14 @@ public class TestPigRunner {
 
             assertTrue(!stats.isSuccessful());
             assertTrue(stats.getReturnCode() != 0);
-            assertTrue(stats.getOutputStats().size() == 0);
+            //Currently, even if failed, spark engine will add a failed OutputStats,
+            // see: SparkPigStats.addFailJobStats()
+            if (execType.equals("spark")) {
+                assertTrue(stats.getOutputStats().size() == 1);
+                assertTrue(stats.getOutputStats().get(0).isSuccessful() == false);
+            } else {
+                assertTrue(stats.getOutputStats().size() == 0);
+            }
 
         } finally {
             new File(PIG_FILE).delete();
@@ -827,8 +875,14 @@ public class TestPigRunner {
             PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
 
             assertTrue(stats.isSuccessful());
-
-            assertEquals(1, stats.getNumberJobs());
+            //In spark mode, one POStore will generate a spark action(spark job).
+            //In this case, the sparkplan has 1 sparkOperator(after multiquery optimization) but has 2 POStores
+            //which generate 2 spark actions(spark jobs).
+            if (execType.equals("spark")) {
+                assertEquals(2, stats.getNumberJobs());
+            } else {
+                assertEquals(1, stats.getNumberJobs());
+            }
             List<OutputStats> outputs = stats.getOutputStats();
             assertEquals(2, outputs.size());
             for (OutputStats outstats : outputs) {
@@ -922,6 +976,26 @@ public class TestPigRunner {
                         MRPigStatsUtil.HDFS_BYTES_WRITTEN).getValue());
                 assertEquals(30,counter.getGroup(FS_COUNTER_GROUP).getCounterForName(
                         MRPigStatsUtil.HDFS_BYTES_READ).getValue());
+            } else if (execType.equals("spark")) {
+                //There are 2 spark jobs because of 2 POStore although the spark plan is optimized by multiquery optimization.
+                List<JobStats> jobs = stats.getJobGraph().getJobList();
+                JobStats firstJob = jobs.get(0);
+                JobStats secondJob = jobs.get(1);
+                //the hdfs_bytes_read of two spark jobs are same(because the two spark jobs have same poLoad), we only
+                //use one of those to compare with expected hdfs_bytes_read(30)
+                //we count the hdfs_bytes_written of the two spark jobs to calculate the total hdfs_bytes_written
+                long hdfs_bytes_read = 0;
+                long hdfs_bytes_written = 0;
+
+                hdfs_bytes_read += firstJob.getHadoopCounters().getGroup(SparkJobStats.FS_COUNTER_GROUP).getCounterForName(
+                        MRPigStatsUtil.HDFS_BYTES_READ).getValue();
+                hdfs_bytes_written += firstJob.getHadoopCounters().getGroup(SparkJobStats.FS_COUNTER_GROUP).getCounterForName(
+                        MRPigStatsUtil.HDFS_BYTES_WRITTEN).getValue();
+                hdfs_bytes_written += secondJob.getHadoopCounters().getGroup(SparkJobStats.FS_COUNTER_GROUP).getCounterForName(
+                        MRPigStatsUtil.HDFS_BYTES_WRITTEN).getValue();
+
+                assertEquals(30, hdfs_bytes_read);
+                assertEquals(20, hdfs_bytes_written);
             } else {
                 Counters counter= ((MRJobStats)stats.getJobGraph().getSinks().get(0)).getHadoopCounters();
                 assertEquals(5, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(