You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2016/09/14 20:28:54 UTC
svn commit: r1760751 - in /pig/branches/branch-0.16: ./
src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/
src/org/apache/pig/tools/pigstats/tez/ test/org/apache/pig/test/
Author: rohini
Date: Wed Sep 14 20:28:53 2016
New Revision: 1760751
URL: http://svn.apache.org/viewvc?rev=1760751&view=rev
Log:
PIG-5032: Output record stats in Tez is wrong when there is split followed by union (rohini)
Modified:
pig/branches/branch-0.16/CHANGES.txt
pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java
pig/branches/branch-0.16/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java
pig/branches/branch-0.16/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
pig/branches/branch-0.16/test/org/apache/pig/test/TestCounters.java
Modified: pig/branches/branch-0.16/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/CHANGES.txt?rev=1760751&r1=1760750&r2=1760751&view=diff
==============================================================================
--- pig/branches/branch-0.16/CHANGES.txt (original)
+++ pig/branches/branch-0.16/CHANGES.txt Wed Sep 14 20:28:53 2016
@@ -30,6 +30,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-5032: Output record stats in Tez is wrong when there is split followed by union (rohini)
+
PIG-5019: Pig generates tons of warnings for udf with enabled warnings aggregation (murshyd via rohini)
PIG-4974: A simple map reference fail to cast (knoguchi)
Modified: pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java?rev=1760751&r1=1760750&r2=1760751&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POStoreTez.java Wed Sep 14 20:28:53 2016
@@ -102,19 +102,19 @@ public class POStoreTez extends POStore
throw new ExecException(e);
}
- // Multiple outputs - can be another store or other outputs (shuffle, broadcast)
- if (outputs.size() > 1) {
- CounterGroup multiStoreGroup = processorContext.getCounters()
- .getGroup(MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
- if (multiStoreGroup == null) {
- processorContext.getCounters().addGroup(
- MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP,
- MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
- }
- String name = MRPigStatsUtil.getMultiStoreCounterName(this);
- if (name != null) {
- outputRecordCounter = multiStoreGroup.addCounter(name, name, 0);
- }
+ // Even if there is a single hdfs output, we add multi store counter
+ // Makes it easier for user to see records for a particular store from
+ // the DAG counter
+ CounterGroup multiStoreGroup = processorContext.getCounters()
+ .getGroup(MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+ if (multiStoreGroup == null) {
+ processorContext.getCounters().addGroup(
+ MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP,
+ MRPigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+ }
+ String name = MRPigStatsUtil.getMultiStoreCounterName(this);
+ if (name != null) {
+ outputRecordCounter = multiStoreGroup.addCounter(name, name, 0);
}
}
Modified: pig/branches/branch-0.16/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java?rev=1760751&r1=1760750&r2=1760751&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/tools/pigstats/tez/TezDAGStats.java Wed Sep 14 20:28:53 2016
@@ -245,7 +245,11 @@ public class TezDAGStats extends JobStat
OutputStats existingOut = outputsByLocation.get(output.getLocation());
// In case of multistore, bytesWritten is already calculated
// from size of all the files in the output directory.
- if (!output.getPOStore().isMultiStore() && output.getBytes() > -1) {
+ // So use that if there is a combination of multistore and single store
+ if (output.getPOStore().isMultiStore()) {
+ existingOut.setBytes(output.getBytes());
+ existingOut.setPOStore(output.getPOStore());
+ } else if (!existingOut.getPOStore().isMultiStore() && output.getBytes() > -1) {
long bytes = existingOut.getBytes() > -1
? (existingOut.getBytes() + output.getBytes())
: output.getBytes();
Modified: pig/branches/branch-0.16/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java?rev=1760751&r1=1760750&r2=1760751&view=diff
==============================================================================
--- pig/branches/branch-0.16/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java (original)
+++ pig/branches/branch-0.16/src/org/apache/pig/tools/pigstats/tez/TezVertexStats.java Wed Sep 14 20:28:53 2016
@@ -22,6 +22,7 @@ import static org.apache.pig.tools.pigst
import static org.apache.pig.tools.pigstats.tez.TezDAGStats.TASK_COUNTER_GROUP;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
@@ -289,13 +290,19 @@ public class TezVertexStats extends JobS
}
// Split followed by union will have multiple stores writing to same location
- Map<String, POStore> uniqueOutputs = new HashMap<String, POStore>();
+ Map<String, List<POStore>> uniqueOutputs = new HashMap<String, List<POStore>>();
for (POStore sto : stores) {
POStoreTez store = (POStoreTez) sto;
- uniqueOutputs.put(store.getOutputKey(), store);
+ List<POStore> stores = uniqueOutputs.get(store.getOutputKey());
+ if (stores == null) {
+ stores = new ArrayList<POStore>();
+ }
+ stores.add(store);
+ uniqueOutputs.put(store.getOutputKey(), stores);
}
- for (POStore sto : uniqueOutputs.values()) {
+ for (List<POStore> stores : uniqueOutputs.values()) {
+ POStore sto = stores.get(0);
if (sto.isTmpStore()) {
continue;
}
@@ -304,11 +311,16 @@ public class TezVertexStats extends JobS
String filename = sto.getSFile().getFileName();
if (counters != null) {
if (msGroup != null) {
- Long n = msGroup.get(PigStatsUtil.getMultiStoreCounterName(sto));
- if (n != null) records = n;
- }
- if (records == -1) {
- records = outputRecords;
+ long n = 0;
+ Long val = null;
+ for (POStore store : stores) {
+ val = msGroup.get(PigStatsUtil.getMultiStoreCounterName(store));
+ // Tez removes 0 value counters for efficiency.
+ if (val != null) {
+ n += val;
+ };
+ }
+ records = n;
}
if (isSuccessful() && records == -1) {
// Tez removes 0 value counters for efficiency.
@@ -338,13 +350,13 @@ public class TezVertexStats extends JobS
@Override
@Deprecated
public int getNumberMaps() {
- return this.isMapOpts ? numTasks : -1;
+ return this.isMapOpts ? numTasks : 0;
}
@Override
@Deprecated
public int getNumberReduces() {
- return this.isMapOpts ? -1 : numTasks;
+ return this.isMapOpts ? 0 : numTasks;
}
@Override
@@ -386,25 +398,25 @@ public class TezVertexStats extends JobS
@Override
@Deprecated
public long getMapInputRecords() {
- return this.isMapOpts ? numInputRecords : -1;
+ return this.isMapOpts ? numInputRecords : 0;
}
@Override
@Deprecated
public long getMapOutputRecords() {
- return this.isMapOpts ? numOutputRecords : -1;
+ return this.isMapOpts ? numOutputRecords : 0;
}
@Override
@Deprecated
public long getReduceInputRecords() {
- return this.isMapOpts ? -1 : numInputRecords;
+ return numReduceInputRecords;
}
@Override
@Deprecated
public long getReduceOutputRecords() {
- return this.isMapOpts ? -1 : numOutputRecords;
+ return this.isMapOpts ? 0 : numOutputRecords;
}
@Override
Modified: pig/branches/branch-0.16/test/org/apache/pig/test/TestCounters.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.16/test/org/apache/pig/test/TestCounters.java?rev=1760751&r1=1760750&r2=1760751&view=diff
==============================================================================
--- pig/branches/branch-0.16/test/org/apache/pig/test/TestCounters.java (original)
+++ pig/branches/branch-0.16/test/org/apache/pig/test/TestCounters.java Wed Sep 14 20:28:53 2016
@@ -30,17 +30,17 @@ import java.util.Map;
import java.util.Random;
import org.apache.hadoop.fs.Path;
-import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.tools.pigstats.InputStats;
import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.OutputStats;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.tools.pigstats.PigStats.JobGraph;
-import org.apache.pig.tools.pigstats.mapreduce.MRJobStats;
import org.junit.AfterClass;
+import org.junit.Assume;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -49,8 +49,8 @@ import org.junit.runners.JUnit4;
public class TestCounters {
String file = "input.txt";
- static MiniCluster cluster = MiniCluster.buildCluster();
-
+ static MiniGenericCluster cluster = MiniGenericCluster.buildCluster();
+
final int MAX = 100*1000;
Random r = new Random();
@@ -59,7 +59,7 @@ public class TestCounters {
public static void oneTimeTearDown() throws Exception {
cluster.shutDown();
}
-
+
@Test
public void testMapOnly() throws IOException, ExecException {
int count = 0;
@@ -70,13 +70,13 @@ public class TestCounters {
if(t > 50) count ++;
}
pw.close();
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = filter a by $0 > 50;");
pigServer.registerQuery("c = foreach b generate $0 - 50;");
ExecJob job = pigServer.store("c", "output_map_only");
PigStats pigStats = job.getStatistics();
-
+
//counting the no. of bytes in the output file
//long filesize = cluster.getFileSystem().getFileStatus(new Path("output_map_only")).getLen();
InputStream is = FileLocalizer.open(FileLocalizer.fullPath(
@@ -85,9 +85,9 @@ public class TestCounters {
long filesize = 0;
while(is.read() != -1) filesize++;
-
+
is.close();
-
+
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("output_map_only"), true);
@@ -98,7 +98,7 @@ public class TestCounters {
JobGraph jg = pigStats.getJobGraph();
Iterator<JobStats> iter = jg.iterator();
while (iter.hasNext()) {
- MRJobStats js = (MRJobStats) iter.next();
+ JobStats js = iter.next();
System.out.println("Map input records : " + js.getMapInputRecords());
assertEquals(MAX, js.getMapInputRecords());
@@ -123,20 +123,20 @@ public class TestCounters {
count ++;
}
pw.close();
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = filter a by $0 > 50;");
pigServer.registerQuery("c = foreach b generate $0 - 50;");
ExecJob job = pigServer.store("c", "output_map_only", "BinStorage");
PigStats pigStats = job.getStatistics();
-
+
InputStream is = FileLocalizer.open(FileLocalizer.fullPath(
"output_map_only", pigServer.getPigContext()),
pigServer.getPigContext());
long filesize = 0;
while(is.read() != -1) filesize++;
-
+
is.close();
cluster.getFileSystem().delete(new Path(file), true);
@@ -149,8 +149,8 @@ public class TestCounters {
JobGraph jp = pigStats.getJobGraph();
Iterator<JobStats> iter = jp.iterator();
while (iter.hasNext()) {
- MRJobStats js = (MRJobStats) iter.next();
-
+ JobStats js = iter.next();
+
System.out.println("Map input records : " + js.getMapInputRecords());
assertEquals(MAX, js.getMapInputRecords());
System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -158,7 +158,7 @@ public class TestCounters {
assertEquals(0, js.getReduceInputRecords());
assertEquals(0, js.getReduceOutputRecords());
}
-
+
System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten());
assertEquals(filesize, pigStats.getBytesWritten());
}
@@ -183,7 +183,7 @@ public class TestCounters {
if(nos[i] > 0) count ++;
}
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = group a by $0;");
pigServer.registerQuery("c = foreach b generate group;");
@@ -195,7 +195,7 @@ public class TestCounters {
long filesize = 0;
while(is.read() != -1) filesize++;
-
+
is.close();
cluster.getFileSystem().delete(new Path(file), true);
@@ -208,7 +208,7 @@ public class TestCounters {
JobGraph jp = pigStats.getJobGraph();
Iterator<JobStats> iter = jp.iterator();
while (iter.hasNext()) {
- MRJobStats js = (MRJobStats) iter.next();
+ JobStats js = iter.next();
System.out.println("Map input records : " + js.getMapInputRecords());
assertEquals(MAX, js.getMapInputRecords());
System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -242,7 +242,7 @@ public class TestCounters {
if(nos[i] > 0) count ++;
}
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = group a by $0;");
pigServer.registerQuery("c = foreach b generate group;");
@@ -253,9 +253,9 @@ public class TestCounters {
pigServer.getPigContext()), pigServer.getPigContext());
long filesize = 0;
while(is.read() != -1) filesize++;
-
+
is.close();
-
+
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("output"), true);
@@ -266,7 +266,7 @@ public class TestCounters {
JobGraph jp = pigStats.getJobGraph();
Iterator<JobStats> iter = jp.iterator();
while (iter.hasNext()) {
- MRJobStats js = (MRJobStats) iter.next();
+ JobStats js = iter.next();
System.out.println("Map input records : " + js.getMapInputRecords());
assertEquals(MAX, js.getMapInputRecords());
System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -300,7 +300,7 @@ public class TestCounters {
if(nos[i] > 0) count ++;
}
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = group a by $0;");
pigServer.registerQuery("c = foreach b generate group, SUM(a.$1);");
@@ -311,20 +311,20 @@ public class TestCounters {
pigServer.getPigContext()), pigServer.getPigContext());
long filesize = 0;
while(is.read() != -1) filesize++;
-
+
is.close();
-
+
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("output"), true);
System.out.println("============================================");
System.out.println("Test case MapCombineReduce");
System.out.println("============================================");
-
+
JobGraph jp = pigStats.getJobGraph();
Iterator<JobStats> iter = jp.iterator();
while (iter.hasNext()) {
- MRJobStats js = (MRJobStats) iter.next();
+ JobStats js = iter.next();
System.out.println("Map input records : " + js.getMapInputRecords());
assertEquals(MAX, js.getMapInputRecords());
System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -337,7 +337,7 @@ public class TestCounters {
System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten());
assertEquals(filesize, pigStats.getBytesWritten());
}
-
+
@Test
public void testMapCombineReduceBinStorage() throws IOException, ExecException {
int count = 0;
@@ -358,20 +358,20 @@ public class TestCounters {
if(nos[i] > 0) count ++;
}
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = group a by $0;");
pigServer.registerQuery("c = foreach b generate group, SUM(a.$1);");
ExecJob job = pigServer.store("c", "output", "BinStorage");
PigStats pigStats = job.getStatistics();
-
+
InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output",
pigServer.getPigContext()), pigServer.getPigContext());
long filesize = 0;
while(is.read() != -1) filesize++;
-
+
is.close();
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("output"), true);
@@ -379,11 +379,11 @@ public class TestCounters {
System.out.println("============================================");
System.out.println("Test case MapCombineReduce");
System.out.println("============================================");
-
+
JobGraph jp = pigStats.getJobGraph();
Iterator<JobStats> iter = jp.iterator();
while (iter.hasNext()) {
- MRJobStats js = (MRJobStats) iter.next();
+ JobStats js = iter.next();
System.out.println("Map input records : " + js.getMapInputRecords());
assertEquals(MAX, js.getMapInputRecords());
System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -399,6 +399,8 @@ public class TestCounters {
@Test
public void testMultipleMRJobs() throws IOException, ExecException {
+ Assume.assumeTrue("Skip this test for TEZ. Assert is done only for first MR job",
+ Util.isMapredExecType(cluster.getExecType()));
int count = 0;
PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
int [] nos = new int[10];
@@ -413,38 +415,38 @@ public class TestCounters {
}
pw.close();
- for(int i = 0; i < 10; i++) {
+ for(int i = 0; i < 10; i++) {
if(nos[i] > 0) count ++;
}
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = order a by $0;");
pigServer.registerQuery("c = group b by $0;");
pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);");
ExecJob job = pigServer.store("d", "output");
PigStats pigStats = job.getStatistics();
-
+
InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output",
pigServer.getPigContext()), pigServer.getPigContext());
long filesize = 0;
while(is.read() != -1) filesize++;
-
+
is.close();
-
+
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("output"), true);
-
+
System.out.println("============================================");
System.out.println("Test case MultipleMRJobs");
System.out.println("============================================");
-
+
JobGraph jp = pigStats.getJobGraph();
- MRJobStats js = (MRJobStats)jp.getSinks().get(0);
-
+ JobStats js = (JobStats)jp.getSinks().get(0);
+
System.out.println("Job id: " + js.getName());
System.out.println(jp.toString());
-
+
System.out.println("Map input records : " + js.getMapInputRecords());
assertEquals(MAX, js.getMapInputRecords());
System.out.println("Map output records : " + js.getMapOutputRecords());
@@ -453,12 +455,12 @@ public class TestCounters {
assertEquals(count, js.getReduceInputRecords());
System.out.println("Reduce output records : " + js.getReduceOutputRecords());
assertEquals(count, js.getReduceOutputRecords());
-
+
System.out.println("Hdfs bytes written : " + js.getHdfsBytesWritten());
assertEquals(filesize, js.getHdfsBytesWritten());
}
-
+
@Test
public void testMapOnlyMultiQueryStores() throws Exception {
PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file));
@@ -467,8 +469,8 @@ public class TestCounters {
pw.println(t);
}
pw.close();
-
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE,
+
+ PigServer pigServer = new PigServer(cluster.getExecType(),
cluster.getProperties());
pigServer.setBatchOn();
pigServer.registerQuery("a = load '" + file + "';");
@@ -479,22 +481,22 @@ public class TestCounters {
List<ExecJob> jobs = pigServer.executeBatch();
PigStats stats = jobs.get(0).getStatistics();
assertTrue(stats.getOutputLocations().size() == 2);
-
+
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("/tmp/outout1"), true);
cluster.getFileSystem().delete(new Path("/tmp/outout2"), true);
- MRJobStats js = (MRJobStats)stats.getJobGraph().getSinks().get(0);
-
+ JobStats js = (JobStats)stats.getJobGraph().getSinks().get(0);
+
Map<String, Long> entry = js.getMultiStoreCounters();
long counter = 0;
for (Long val : entry.values()) {
counter += val;
}
-
- assertEquals(MAX, counter);
- }
-
+
+ assertEquals(MAX, counter);
+ }
+
@Test
public void testMultiQueryStores() throws Exception {
int[] nums = new int[100];
@@ -505,13 +507,13 @@ public class TestCounters {
nums[t]++;
}
pw.close();
-
+
int groups = 0;
for (int i : nums) {
if (i > 0) groups++;
}
-
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE,
+
+ PigServer pigServer = new PigServer(cluster.getExecType(),
cluster.getProperties());
pigServer.setBatchOn();
pigServer.registerQuery("a = load '" + file + "';");
@@ -525,29 +527,29 @@ public class TestCounters {
pigServer.registerQuery("store g into '/tmp/outout2';");
List<ExecJob> jobs = pigServer.executeBatch();
PigStats stats = jobs.get(0).getStatistics();
-
+
assertTrue(stats.getOutputLocations().size() == 2);
-
+
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("/tmp/outout1"), true);
cluster.getFileSystem().delete(new Path("/tmp/outout2"), true);
- MRJobStats js = (MRJobStats)stats.getJobGraph().getSinks().get(0);
-
+ JobStats js = (JobStats)stats.getJobGraph().getSinks().get(0);
+
Map<String, Long> entry = js.getMultiStoreCounters();
long counter = 0;
for (Long val : entry.values()) {
counter += val;
}
-
- assertEquals(groups, counter);
- }
-
- /*
+
+ assertEquals(groups, counter);
+ }
+
+ /*
* IMPORTANT NOTE:
* COMMENTED OUT BECAUSE COUNTERS DO NOT CURRENTLY WORK IN LOCAL MODE -
* SEE PIG-1286 - UNCOMMENT WHEN IT IS FIXED
- */
+ */
// @Test
// public void testLocal() throws IOException, ExecException {
// int count = 0;
@@ -566,7 +568,7 @@ public class TestCounters {
// }
// pw.close();
//
-// for(int i = 0; i < 10; i++)
+// for(int i = 0; i < 10; i++)
// if(nos[i] > 0)
// count ++;
//
@@ -580,56 +582,56 @@ public class TestCounters {
// pigServer.registerQuery("c = group b by $0;");
// pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);");
// PigStats pigStats = pigServer.store("d", "file://" + out.getAbsolutePath()).getStatistics();
-// InputStream is = FileLocalizer.open(FileLocalizer.fullPath(out.getAbsolutePath(), pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+// InputStream is = FileLocalizer.open(FileLocalizer.fullPath(out.getAbsolutePath(), pigServer.getPigContext()), cluster.getExecType(), pigServer.getPigContext().getDfs());
// long filesize = 0;
// while(is.read() != -1) filesize++;
-//
+//
// is.close();
// out.delete();
-//
+//
// //Map<String, Map<String, String>> stats = pigStats.getPigStats();
-//
+//
// assertEquals(10, pigStats.getRecordsWritten());
// assertEquals(110, pigStats.getBytesWritten());
//
// }
@Test
- public void testJoinInputCounters() throws Exception {
+ public void testJoinInputCounters() throws Exception {
testInputCounters("join");
}
-
+
@Test
- public void testCogroupInputCounters() throws Exception {
+ public void testCogroupInputCounters() throws Exception {
testInputCounters("cogroup");
}
-
+
@Test
- public void testSkewedInputCounters() throws Exception {
+ public void testSkewedInputCounters() throws Exception {
testInputCounters("skewed");
}
-
+
@Test
- public void testSelfJoinInputCounters() throws Exception {
+ public void testSelfJoinInputCounters() throws Exception {
testInputCounters("self-join");
}
-
+
private static boolean multiInputCreated = false;
-
+
private static int count = 0;
-
- private void testInputCounters(String keyword) throws Exception {
+
+ private void testInputCounters(String keyword) throws Exception {
String file1 = "multi-input1.txt";
String file2 = "multi-input2.txt";
-
+
String output = keyword;
-
+
if (keyword.equals("self-join")) {
file2 = file1;
keyword = "join";
}
-
- final int MAX_NUM_RECORDS = 100;
+
+ final int MAX_NUM_RECORDS = 100;
if (!multiInputCreated) {
PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, file1));
for (int i = 0; i < MAX_NUM_RECORDS; i++) {
@@ -637,7 +639,7 @@ public class TestCounters {
pw.println(t);
}
pw.close();
-
+
PrintWriter pw2 = new PrintWriter(Util.createInputFile(cluster, file2));
for (int i = 0; i < MAX_NUM_RECORDS; i++) {
int t = r.nextInt(100);
@@ -649,8 +651,8 @@ public class TestCounters {
pw2.close();
multiInputCreated = true;
}
-
- PigServer pigServer = new PigServer(ExecType.MAPREDUCE,
+
+ PigServer pigServer = new PigServer(cluster.getExecType(),
cluster.getProperties());
pigServer.setBatchOn();
pigServer.registerQuery("a = load '" + file1 + "';");
@@ -661,7 +663,7 @@ public class TestCounters {
pigServer.registerQuery("c = join a by $0, b by $0 using 'skewed';");
}
ExecJob job = pigServer.store("c", output + "_output");
-
+
PigStats stats = job.getStatistics();
assertTrue(stats.isSuccessful());
List<InputStats> inputs = stats.getInputStats();
@@ -680,4 +682,46 @@ public class TestCounters {
}
}
}
+
+ @Test
+ public void testSplitUnionOutputCounters() throws Exception {
+ PigServer pigServer = new PigServer(cluster.getExecType(), cluster.getProperties());
+ PrintWriter pw = new PrintWriter(Util.createInputFile(cluster, "splitunion-input"));
+ for (int i = 0; i < 10; i++) {
+ pw.println(i);
+ }
+ pw.close();
+ String query =
+ "a = load 'splitunion-input';" +
+ "split a into b if $0 < 5, c otherwise;" +
+ "d = union b, c;";
+
+ pigServer.registerQuery(query);
+
+ ExecJob job = pigServer.store("d", "splitunion-output-0", "PigStorage");
+ PigStats stats1 = job.getStatistics();
+
+ query =
+ "a = load 'splitunion-input';" +
+ "split a into b if $0 < 3, c if $0 > 2 and $0 < 6, d if $0 > 5;" +
+ "e = distinct d;" +
+ "f = union b, c, e;";
+
+ pigServer.registerQuery(query);
+
+ job = pigServer.store("f", "splitunion-output-1", "PigStorage");
+ PigStats stats2 = job.getStatistics();
+
+ PigStats[] pigStats = new PigStats[]{stats1, stats2};
+ for (int i = 0; i < 2; i++) {
+ PigStats stats = pigStats[i];
+ assertTrue(stats.isSuccessful());
+ List<OutputStats> outputs = stats.getOutputStats();
+ assertEquals(1, outputs.size());
+ OutputStats output = outputs.get(0);
+ assertEquals("splitunion-output-" + i, output.getName());
+ assertEquals(10, output.getNumberRecords());
+ assertEquals(20, output.getBytes());
+ }
+ }
}