You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2016/02/15 06:33:13 UTC
svn commit: r1730447 - in /pig/branches/spark:
src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
test/org/apache/pig/test/TestPigRunner.java
Author: xuefu
Date: Mon Feb 15 05:33:12 2016
New Revision: 1730447
URL: http://svn.apache.org/viewvc?rev=1730447&view=rev
Log:
PIG-4616: Fix UT errors of TestPigRunner in Spark mode (Xianda via Xuefu)
Modified:
pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java
Modified: pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java?rev=1730447&r1=1730446&r2=1730447&view=diff
==============================================================================
--- pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java (original)
+++ pig/branches/spark/src/org/apache/pig/tools/pigstats/spark/SparkJobStats.java Mon Feb 15 05:33:12 2016
@@ -21,13 +21,11 @@ package org.apache.pig.tools.pigstats.sp
import java.util.List;
import java.util.Map;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import scala.Option;
-import com.google.common.collect.Maps;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.Counters;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.spark.JobMetricsListener;
import org.apache.pig.backend.hadoop.executionengine.spark.plan.SparkOperator;
@@ -37,16 +35,20 @@ import org.apache.pig.tools.pigstats.Inp
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
-
+import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
import org.apache.spark.executor.ShuffleReadMetrics;
import org.apache.spark.executor.ShuffleWriteMetrics;
import org.apache.spark.executor.TaskMetrics;
+import com.google.common.collect.Maps;
+
public class SparkJobStats extends JobStats {
private int jobId;
private Map<String, Long> stats = Maps.newLinkedHashMap();
private boolean disableCounter;
+ private Counters counters = null;
+ public static String FS_COUNTER_GROUP = "FS_GROUP";
protected SparkJobStats(int jobId, PigStats.JobGraph plan, Configuration conf) {
this(String.valueOf(jobId), plan, conf);
@@ -61,6 +63,7 @@ public class SparkJobStats extends JobSt
public void setConf(Configuration conf) {
super.setConf(conf);
disableCounter = conf.getBoolean("pig.disable.counter", false);
+ initializeHadoopCounter();
}
public void addOutputInfo(POStore poStore, boolean success,
@@ -186,10 +189,14 @@ public class SparkJobStats extends JobSt
results.put("DiskBytesSpilled", diskBytesSpilled);
if (inputMetricExist) {
results.put("BytesRead", bytesRead);
+ hdfsBytesRead = bytesRead;
+ counters.incrCounter(FS_COUNTER_GROUP, MRPigStatsUtil.HDFS_BYTES_READ, hdfsBytesRead);
}
if (outputMetricExist) {
results.put("BytesWritten", bytesWritten);
+ hdfsBytesWritten = bytesWritten;
+ counters.incrCounter(FS_COUNTER_GROUP, MRPigStatsUtil.HDFS_BYTES_WRITTEN, hdfsBytesWritten);
}
if (shuffleReadMetricExist) {
@@ -300,7 +307,7 @@ public class SparkJobStats extends JobSt
@Override
public Counters getHadoopCounters() {
- throw new UnsupportedOperationException();
+ return counters;
}
@Override
@@ -320,4 +327,11 @@ public class SparkJobStats extends JobSt
annotate(ALIAS_LOCATION, sparkScriptInfo.getAliasLocation(sparkOperator));
annotate(FEATURE, sparkScriptInfo.getPigFeatures(sparkOperator));
}
+
+ private void initializeHadoopCounter() {
+ counters = new Counters();
+ Counters.Group fsGrp = counters.addGroup(FS_COUNTER_GROUP, FS_COUNTER_GROUP);
+ fsGrp.addCounter(MRPigStatsUtil.HDFS_BYTES_READ, MRPigStatsUtil.HDFS_BYTES_READ, 0);
+ fsGrp.addCounter(MRPigStatsUtil.HDFS_BYTES_WRITTEN, MRPigStatsUtil.HDFS_BYTES_WRITTEN, 0);
+ }
}
Modified: pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java?rev=1730447&r1=1730446&r2=1730447&view=diff
==============================================================================
--- pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java (original)
+++ pig/branches/spark/test/org/apache/pig/test/TestPigRunner.java Mon Feb 15 05:33:12 2016
@@ -35,7 +35,6 @@ import java.util.Properties;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.Counters;
-import org.apache.pig.ExecType;
import org.apache.pig.PigConfiguration;
import org.apache.pig.PigRunner;
import org.apache.pig.PigRunner.ReturnCode;
@@ -45,7 +44,6 @@ import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.plan.OperatorPlan;
import org.apache.pig.newplan.Operator;
-import org.apache.pig.tools.pigstats.EmptyPigStats;
import org.apache.pig.tools.pigstats.InputStats;
import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.OutputStats;
@@ -54,6 +52,7 @@ import org.apache.pig.tools.pigstats.Pig
import org.apache.pig.tools.pigstats.PigStatsUtil;
import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
import org.apache.pig.tools.pigstats.mapreduce.MRPigStatsUtil;
+import org.apache.pig.tools.pigstats.spark.SparkJobStats;
import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.Before;
@@ -200,6 +199,11 @@ public class TestPigRunner {
if (execType.toString().startsWith("tez")) {
assertEquals(1, stats.getNumberJobs());
assertEquals(stats.getJobGraph().size(), 1);
+ } else if (execType.toString().startsWith("spark")) {
+ // In spark mode,the number of spark job is calculated by the number of POStore.
+ // 1 POStore generates 1 spark job.
+ assertEquals(1, stats.getNumberJobs());
+ assertEquals(stats.getJobGraph().size(), 1);
} else {
assertEquals(2, stats.getNumberJobs());
assertEquals(stats.getJobGraph().size(), 2);
@@ -264,6 +268,10 @@ public class TestPigRunner {
assertEquals(stats.getJobGraph().size(), 1);
// 5 vertices
assertEquals(stats.getJobGraph().getSources().get(0).getPlan().size(), 5);
+ } else if (execType.equals("spark")) {
+ // In spark mode,the number of spark job is calculated by the number of POStore.
+ // 1 POStore generates 1 spark job.
+ assertEquals(stats.getJobGraph().size(), 1);
} else {
assertEquals(stats.getJobGraph().size(), 4);
}
@@ -284,7 +292,12 @@ public class TestPigRunner {
// Need to investigate
// assertEquals("B", ((JobStats) stats.getJobGraph().getPredecessors(
// js).get(0)).getAlias());
+ } else if (execType.equals("spark")) {
+ assertEquals("A,B", ((JobStats) stats.getJobGraph().getSources().get(
+ 0)).getAlias());
+ // TODO: alias is not set for sample-aggregation/partition/sort job.
} else {
+
assertEquals("A", ((JobStats) stats.getJobGraph().getSources().get(
0)).getAlias());
assertEquals("B", ((JobStats) stats.getJobGraph().getPredecessors(
@@ -313,7 +326,14 @@ public class TestPigRunner {
String[] args = { "-x", execType, PIG_FILE };
PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.isSuccessful());
- assertTrue(stats.getJobGraph().size() == 1);
+ if (execType.equals("spark")) {
+ // In spark mode,the number of spark job is calculated by the number of POStore.
+ // 2 POStore generates 2 spark jobs.
+ assertTrue(stats.getJobGraph().size() == 2);
+ } else {
+ assertTrue(stats.getJobGraph().size() == 1);
+ }
+
// Each output file should include the following:
// output:
// 1\t2\t3\n
@@ -362,7 +382,13 @@ public class TestPigRunner {
String[] args = { "-x", execType, PIG_FILE };
PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.isSuccessful());
- assertEquals(stats.getJobGraph().size(), 1);
+ if (execType.equals("spark")) {
+ // In spark mode,the number of spark job is calculated by the number of POStore.
+ // 2 POStore generates 2 spark jobs.
+ assertEquals(stats.getJobGraph().size(), 2);
+ } else {
+ assertEquals(stats.getJobGraph().size(), 1);
+ }
// Each output file should include the following:
// output:
@@ -411,17 +437,19 @@ public class TestPigRunner {
PigStats stats = PigRunner.run(args, null);
Iterator<JobStats> iter = stats.getJobGraph().iterator();
while (iter.hasNext()) {
- JobStats js=iter.next();
- if (execType.equals("tez")) {
- assertEquals(js.getState().name(), "FAILED");
- } else {
- if(js.getState().name().equals("FAILED")) {
- List<Operator> ops=stats.getJobGraph().getSuccessors(js);
- for(Operator op : ops ) {
- assertEquals(((JobStats)op).getState().toString(), "UNKNOWN");
- }
- }
- }
+ JobStats js=iter.next();
+ if (execType.equals("tez")) {
+ assertEquals(js.getState().name(), "FAILED");
+ } else if (execType.equals("spark")) {
+ assertEquals(js.getState().name(), "FAILED");
+ } else {
+ if(js.getState().name().equals("FAILED")) {
+ List<Operator> ops=stats.getJobGraph().getSuccessors(js);
+ for(Operator op : ops ) {
+ assertEquals(((JobStats)op).getState().toString(), "UNKNOWN");
+ }
+ }
+ }
}
} finally {
new File(PIG_FILE).delete();
@@ -630,8 +658,14 @@ public class TestPigRunner {
PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.isSuccessful());
-
- assertEquals(1, stats.getNumberJobs());
+ //In spark mode, one POStore will generate a spark action(spark job).
+ //In this case, the sparkplan has 1 sparkOperator(after multiquery optimization) but has 2 POStores
+ //which generate 2 spark actions(spark jobs).
+ if (execType.equals("spark")) {
+ assertEquals(2, stats.getNumberJobs());
+ } else {
+ assertEquals(1, stats.getNumberJobs());
+ }
List<OutputStats> outputs = stats.getOutputStats();
assertEquals(2, outputs.size());
for (OutputStats outstats : outputs) {
@@ -743,7 +777,14 @@ public class TestPigRunner {
assertTrue(!stats.isSuccessful());
assertTrue(stats.getReturnCode() != 0);
- assertTrue(stats.getOutputStats().size() == 0);
+ if (execType.equals("spark")) {
+ //Currently, even if failed, spark engine will add a failed OutputStats,
+ // see: SparkPigStats.addFailJobStats()
+ assertTrue(stats.getOutputStats().size() == 1);
+ assertTrue(stats.getOutputStats().get(0).isSuccessful() == false);
+ } else {
+ assertTrue(stats.getOutputStats().size() == 0);
+ }
} finally {
new File(PIG_FILE).delete();
@@ -766,7 +807,14 @@ public class TestPigRunner {
assertTrue(!stats.isSuccessful());
assertTrue(stats.getReturnCode() != 0);
- assertTrue(stats.getOutputStats().size() == 0);
+ //Currently, even if failed, spark engine will add a failed OutputStats,
+ // see: SparkPigStats.addFailJobStats()
+ if (execType.equals("spark")) {
+ assertTrue(stats.getOutputStats().size() == 1);
+ assertTrue(stats.getOutputStats().get(0).isSuccessful() == false);
+ } else {
+ assertTrue(stats.getOutputStats().size() == 0);
+ }
} finally {
new File(PIG_FILE).delete();
@@ -827,8 +875,14 @@ public class TestPigRunner {
PigStats stats = PigRunner.run(args, new TestNotificationListener(execType));
assertTrue(stats.isSuccessful());
-
- assertEquals(1, stats.getNumberJobs());
+ //In spark mode, one POStore will generate a spark action(spark job).
+ //In this case, the sparkplan has 1 sparkOperator(after multiquery optimization) but has 2 POStores
+ //which generate 2 spark actions(spark jobs).
+ if (execType.equals("spark")) {
+ assertEquals(2, stats.getNumberJobs());
+ } else {
+ assertEquals(1, stats.getNumberJobs());
+ }
List<OutputStats> outputs = stats.getOutputStats();
assertEquals(2, outputs.size());
for (OutputStats outstats : outputs) {
@@ -922,6 +976,26 @@ public class TestPigRunner {
MRPigStatsUtil.HDFS_BYTES_WRITTEN).getValue());
assertEquals(30,counter.getGroup(FS_COUNTER_GROUP).getCounterForName(
MRPigStatsUtil.HDFS_BYTES_READ).getValue());
+ } else if (execType.equals("spark")) {
+ //There are 2 spark jobs because of 2 POStore although the spark plan is optimized by multiquery optimization.
+ List<JobStats> jobs = stats.getJobGraph().getJobList();
+ JobStats firstJob = jobs.get(0);
+ JobStats secondJob = jobs.get(1);
+ //the hdfs_bytes_read of two spark jobs are same(because the two spark jobs have same poLoad), we only
+ //use one of those to compare with expected hdfs_bytes_read(30)
+ //we count the hdfs_bytes_written of the two spark jobs to calculate the total hdfs_bytes_written
+ long hdfs_bytes_read = 0;
+ long hdfs_bytes_written = 0;
+
+ hdfs_bytes_read += firstJob.getHadoopCounters().getGroup(SparkJobStats.FS_COUNTER_GROUP).getCounterForName(
+ MRPigStatsUtil.HDFS_BYTES_READ).getValue();
+ hdfs_bytes_written += firstJob.getHadoopCounters().getGroup(SparkJobStats.FS_COUNTER_GROUP).getCounterForName(
+ MRPigStatsUtil.HDFS_BYTES_WRITTEN).getValue();
+ hdfs_bytes_written += secondJob.getHadoopCounters().getGroup(SparkJobStats.FS_COUNTER_GROUP).getCounterForName(
+ MRPigStatsUtil.HDFS_BYTES_WRITTEN).getValue();
+
+ assertEquals(30, hdfs_bytes_read);
+ assertEquals(20, hdfs_bytes_written);
} else {
Counters counter= ((MRJobStats)stats.getJobGraph().getSinks().get(0)).getHadoopCounters();
assertEquals(5, counter.getGroup(MRPigStatsUtil.TASK_COUNTER_GROUP).getCounterForName(