You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by rd...@apache.org on 2010/06/23 19:29:34 UTC
svn commit: r957277 [3/3] - in /hadoop/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLaye...
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java?rev=957277&r1=957276&r2=957277&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java Wed Jun 23 17:29:33 2010
@@ -18,31 +18,34 @@
package org.apache.pig.test;
-import java.io.File;
-import java.io.FileOutputStream;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
import java.io.IOException;
import java.io.InputStream;
import java.io.PrintWriter;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Random;
-import junit.framework.TestCase;
-
import org.apache.hadoop.fs.Path;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.experimental.plan.Operator;
import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.tools.pigstats.JobStats;
import org.apache.pig.tools.pigstats.PigStats;
-import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.pig.tools.pigstats.PigStats.JobGraph;
import org.junit.AfterClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+
@RunWith(JUnit4.class)
-public class TestCounters extends TestCase {
+public class TestCounters {
String file = "input.txt";
static MiniCluster cluster = MiniCluster.buildCluster();
@@ -63,22 +66,21 @@ public class TestCounters extends TestCa
for(int i = 0; i < MAX; i++) {
int t = r.nextInt(100);
pw.println(t);
- if(t > 50)
- count ++;
+ if(t > 50) count ++;
}
pw.close();
PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = filter a by $0 > 50;");
pigServer.registerQuery("c = foreach b generate $0 - 50;");
- PigStats pigStats = pigServer.store("c", "output_map_only").getStatistics();
-
- //PigStats pigStats = pigServer.getPigStats();
- Map<String, Map<String, String>> stats = pigStats.getPigStats();
+ ExecJob job = pigServer.store("c", "output_map_only");
+ PigStats pigStats = job.getStatistics();
//counting the no. of bytes in the output file
//long filesize = cluster.getFileSystem().getFileStatus(new Path("output_map_only")).getLen();
- InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output_map_only", pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+ InputStream is = FileLocalizer.open(FileLocalizer.fullPath(
+ "output_map_only", pigServer.getPigContext()),
+ pigServer.getPigContext());
long filesize = 0;
while(is.read() != -1) filesize++;
@@ -90,34 +92,21 @@ public class TestCounters extends TestCa
System.out.println("============================================");
System.out.println("Test case Map Only");
System.out.println("============================================");
- System.out.println("MRPlan : \n" + pigStats.getMRPlan());
- for(Map.Entry<String, Map<String, String>> entry : stats.entrySet()) {
- System.out.println("============================================");
- System.out.println("Job : " + entry.getKey());
- for(Map.Entry<String, String> e1 : entry.getValue().entrySet()) {
- System.out.println(" - " + e1.getKey() + " : \n" + e1.getValue());
- }
- System.out.println("============================================");
- }
-
- Map.Entry<String, Map<String, String>> e = stats.entrySet().iterator().next();
-
- //System.out.println("Job Name : " + e.getKey());
-
- Map<String, String> jobStats = e.getValue();
-
- System.out.println("Map input records : " + jobStats.get("PIG_STATS_MAP_INPUT_RECORDS"));
- assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_INPUT_RECORDS")));
- System.out.println("Map output records : " + jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"));
- assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS")));
- assertNull(jobStats.get("PIG_STATS_REDUCE_PLAN"));
- assertNull(jobStats.get("PIG_STATS_COMBINE_PLAN"));
- assertNotNull(jobStats.get("PIG_STATS_MAP_PLAN"));
- assertEquals(0, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS")));
- assertEquals(0, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS")));
- assertEquals(count, pigStats.getRecordsWritten());
- assertEquals(filesize, pigStats.getBytesWritten());
+ JobGraph jg = pigStats.getJobGraph();
+ Iterator<JobStats> iter = jg.iterator();
+ while (iter.hasNext()) {
+ JobStats js = iter.next();
+
+ System.out.println("Map input records : " + js.getMapInputRecords());
+ assertEquals(MAX, js.getMapInputRecords());
+ System.out.println("Map output records : " + js.getMapOutputRecords());
+ assertEquals(count, js.getMapOutputRecords());
+ assertEquals(0, js.getReduceInputRecords());
+ assertEquals(0, js.getReduceOutputRecords());
+ System.out.println("Hdfs bytes written : " + js.getHdfsBytesWritten());
+ assertEquals(filesize, js.getHdfsBytesWritten());
+ }
}
@@ -136,49 +125,38 @@ public class TestCounters extends TestCa
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = filter a by $0 > 50;");
pigServer.registerQuery("c = foreach b generate $0 - 50;");
- //pigServer.store("c", "output_map_only");
- PigStats pigStats = pigServer.store("c", "output_map_only", "BinStorage").getStatistics();
+ ExecJob job = pigServer.store("c", "output_map_only", "BinStorage");
+ PigStats pigStats = job.getStatistics();
- InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output_map_only", pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+ InputStream is = FileLocalizer.open(FileLocalizer.fullPath(
+ "output_map_only", pigServer.getPigContext()),
+ pigServer.getPigContext());
long filesize = 0;
while(is.read() != -1) filesize++;
is.close();
- Map<String, Map<String, String>> stats = pigStats.getPigStats();
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("output_map_only"), true);
System.out.println("============================================");
System.out.println("Test case Map Only");
System.out.println("============================================");
- System.out.println("MRPlan : \n" + pigStats.getMRPlan());
- for(Map.Entry<String, Map<String, String>> entry : stats.entrySet()) {
- System.out.println("============================================");
- System.out.println("Job : " + entry.getKey());
- for(Map.Entry<String, String> e1 : entry.getValue().entrySet()) {
- System.out.println(" - " + e1.getKey() + " : \n" + e1.getValue());
- }
- System.out.println("============================================");
- }
-
- Map.Entry<String, Map<String, String>> e = stats.entrySet().iterator().next();
-
- //System.out.println("Job Name : " + e.getKey());
-
- Map<String, String> jobStats = e.getValue();
-
- System.out.println("Map input records : " + jobStats.get("PIG_STATS_MAP_INPUT_RECORDS"));
- assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_INPUT_RECORDS")));
- System.out.println("Map output records : " + jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"));
- assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS")));
- assertNull(jobStats.get("PIG_STATS_REDUCE_PLAN"));
- assertNull(jobStats.get("PIG_STATS_COMBINE_PLAN"));
- assertNotNull(jobStats.get("PIG_STATS_MAP_PLAN"));
- assertEquals(0, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS")));
- assertEquals(0, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS")));
- assertEquals(count, pigStats.getRecordsWritten());
+ JobGraph jp = pigStats.getJobGraph();
+ Iterator<JobStats> iter = jp.iterator();
+ while (iter.hasNext()) {
+ JobStats js = iter.next();
+
+ System.out.println("Map input records : " + js.getMapInputRecords());
+ assertEquals(MAX, js.getMapInputRecords());
+ System.out.println("Map output records : " + js.getMapOutputRecords());
+ assertEquals(count, js.getMapOutputRecords());
+ assertEquals(0, js.getReduceInputRecords());
+ assertEquals(0, js.getReduceOutputRecords());
+ }
+
+ System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten());
assertEquals(filesize, pigStats.getBytesWritten());
}
@@ -198,56 +176,44 @@ public class TestCounters extends TestCa
}
pw.close();
- for(int i = 0; i < 10; i++)
- if(nos[i] > 0)
- count ++;
+ for(int i = 0; i < 10; i++) {
+ if(nos[i] > 0) count ++;
+ }
PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = group a by $0;");
pigServer.registerQuery("c = foreach b generate group;");
- PigStats pigStats = pigServer.store("c", "output").getStatistics();
- InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output", pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+ ExecJob job = pigServer.store("c", "output");
+ PigStats pigStats = job.getStatistics();
+ InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output",
+ pigServer.getPigContext()), pigServer.getPigContext());
long filesize = 0;
while(is.read() != -1) filesize++;
is.close();
- Map<String, Map<String, String>> stats = pigStats.getPigStats();
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("output"), true);
System.out.println("============================================");
System.out.println("Test case MapReduce");
System.out.println("============================================");
- System.out.println("MRPlan : \n" + pigStats.getMRPlan());
- for(Map.Entry<String, Map<String, String>> entry : stats.entrySet()) {
- System.out.println("============================================");
- System.out.println("Job : " + entry.getKey());
- for(Map.Entry<String, String> e1 : entry.getValue().entrySet()) {
- System.out.println(" - " + e1.getKey() + " : \n" + e1.getValue());
- }
- System.out.println("============================================");
- }
-
- Map.Entry<String, Map<String, String>> e = stats.entrySet().iterator().next();
-
- Map<String, String> jobStats = e.getValue();
-
- System.out.println("Map input records : " + jobStats.get("PIG_STATS_MAP_INPUT_RECORDS"));
- assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_INPUT_RECORDS")));
- System.out.println("Map output records : " + jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"));
- assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS")));
- System.out.println("Reduce input records : " + jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS"));
- assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS")));
- System.out.println("Reduce output records : " + jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS"));
- assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS")));
-
- assertNull(jobStats.get("PIG_STATS_COMBINE_PLAN"));
- assertNotNull(jobStats.get("PIG_STATS_MAP_PLAN"));
- assertNotNull(jobStats.get("PIG_STATS_REDUCE_PLAN"));
- assertEquals(count, pigStats.getRecordsWritten());
+ JobGraph jp = pigStats.getJobGraph();
+ Iterator<JobStats> iter = jp.iterator();
+ while (iter.hasNext()) {
+ JobStats js = iter.next();
+ System.out.println("Map input records : " + js.getMapInputRecords());
+ assertEquals(MAX, js.getMapInputRecords());
+ System.out.println("Map output records : " + js.getMapOutputRecords());
+ assertEquals(MAX, js.getMapOutputRecords());
+ System.out.println("Reduce input records : " + js.getReduceInputRecords());
+ assertEquals(MAX, js.getReduceInputRecords());
+ System.out.println("Reduce output records : " + js.getReduceOutputRecords());
+ assertEquals(count, js.getReduceOutputRecords());
+ }
+ System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten());
assertEquals(filesize, pigStats.getBytesWritten());
}
@@ -267,59 +233,46 @@ public class TestCounters extends TestCa
}
pw.close();
- for(int i = 0; i < 10; i++)
- if(nos[i] > 0)
- count ++;
+ for(int i = 0; i < 10; i++) {
+ if(nos[i] > 0) count ++;
+ }
PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = group a by $0;");
pigServer.registerQuery("c = foreach b generate group;");
- PigStats pigStats = pigServer.store("c", "output", "BinStorage").getStatistics();
+ ExecJob job = pigServer.store("c", "output", "BinStorage");
+ PigStats pigStats = job.getStatistics();
- InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output", pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+ InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output",
+ pigServer.getPigContext()), pigServer.getPigContext());
long filesize = 0;
while(is.read() != -1) filesize++;
is.close();
- Map<String, Map<String, String>> stats = pigStats.getPigStats();
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("output"), true);
System.out.println("============================================");
System.out.println("Test case MapReduce");
System.out.println("============================================");
- System.out.println("MRPlan : \n" + pigStats.getMRPlan());
- for(Map.Entry<String, Map<String, String>> entry : stats.entrySet()) {
- System.out.println("============================================");
- System.out.println("Job : " + entry.getKey());
- for(Map.Entry<String, String> e1 : entry.getValue().entrySet()) {
- System.out.println(" - " + e1.getKey() + " : \n" + e1.getValue());
- }
- System.out.println("============================================");
- }
-
- Map.Entry<String, Map<String, String>> e = stats.entrySet().iterator().next();
-
- Map<String, String> jobStats = e.getValue();
-
- System.out.println("Map input records : " + jobStats.get("PIG_STATS_MAP_INPUT_RECORDS"));
- assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_INPUT_RECORDS")));
- System.out.println("Map output records : " + jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"));
- assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS")));
- System.out.println("Reduce input records : " + jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS"));
- assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS")));
- System.out.println("Reduce output records : " + jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS"));
- assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS")));
-
- assertNull(jobStats.get("PIG_STATS_COMBINE_PLAN"));
- assertNotNull(jobStats.get("PIG_STATS_MAP_PLAN"));
- assertNotNull(jobStats.get("PIG_STATS_REDUCE_PLAN"));
-
- assertEquals(count, pigStats.getRecordsWritten());
- assertEquals(filesize, pigStats.getBytesWritten());
+ JobGraph jp = pigStats.getJobGraph();
+ Iterator<JobStats> iter = jp.iterator();
+ while (iter.hasNext()) {
+ JobStats js = iter.next();
+ System.out.println("Map input records : " + js.getMapInputRecords());
+ assertEquals(MAX, js.getMapInputRecords());
+ System.out.println("Map output records : " + js.getMapOutputRecords());
+ assertEquals(MAX, js.getMapOutputRecords());
+ System.out.println("Reduce input records : " + js.getReduceInputRecords());
+ assertEquals(MAX, js.getReduceInputRecords());
+ System.out.println("Reduce output records : " + js.getReduceOutputRecords());
+ assertEquals(count, js.getReduceOutputRecords());
+ }
+ System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten());
+ assertEquals(filesize, pigStats.getBytesWritten());
}
@Test
@@ -338,59 +291,48 @@ public class TestCounters extends TestCa
}
pw.close();
- for(int i = 0; i < 10; i++)
- if(nos[i] > 0)
- count ++;
+ for(int i = 0; i < 10; i++) {
+ if(nos[i] > 0) count ++;
+ }
PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = group a by $0;");
pigServer.registerQuery("c = foreach b generate group, SUM(a.$1);");
- PigStats pigStats = pigServer.store("c", "output").getStatistics();
+ ExecJob job = pigServer.store("c", "output");
+ PigStats pigStats = job.getStatistics();
- InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output", pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+ InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output",
+ pigServer.getPigContext()), pigServer.getPigContext());
long filesize = 0;
while(is.read() != -1) filesize++;
is.close();
- Map<String, Map<String, String>> stats = pigStats.getPigStats();
+
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("output"), true);
System.out.println("============================================");
System.out.println("Test case MapCombineReduce");
System.out.println("============================================");
- System.out.println("MRPlan : \n" + pigStats.getMRPlan());
- for(Map.Entry<String, Map<String, String>> entry : stats.entrySet()) {
- System.out.println("============================================");
- System.out.println("Job : " + entry.getKey());
- for(Map.Entry<String, String> e1 : entry.getValue().entrySet()) {
- System.out.println(" - " + e1.getKey() + " : \n" + e1.getValue());
- }
- System.out.println("============================================");
- }
-
- Map.Entry<String, Map<String, String>> e = stats.entrySet().iterator().next();
-
- Map<String, String> jobStats = e.getValue();
-
- System.out.println("Map input records : " + jobStats.get("PIG_STATS_MAP_INPUT_RECORDS"));
- assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_INPUT_RECORDS")));
- System.out.println("Map output records : " + jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"));
- assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS")));
- System.out.println("Reduce input records : " + jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS"));
- assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS")));
- System.out.println("Reduce output records : " + jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS"));
- assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS")));
-
- assertNotNull(jobStats.get("PIG_STATS_COMBINE_PLAN"));
- assertNotNull(jobStats.get("PIG_STATS_MAP_PLAN"));
- assertNotNull(jobStats.get("PIG_STATS_REDUCE_PLAN"));
-
- assertEquals(count, pigStats.getRecordsWritten());
+
+ JobGraph jp = pigStats.getJobGraph();
+ Iterator<JobStats> iter = jp.iterator();
+ while (iter.hasNext()) {
+ JobStats js = iter.next();
+ System.out.println("Map input records : " + js.getMapInputRecords());
+ assertEquals(MAX, js.getMapInputRecords());
+ System.out.println("Map output records : " + js.getMapOutputRecords());
+ assertEquals(MAX, js.getMapOutputRecords());
+ System.out.println("Reduce input records : " + js.getReduceInputRecords());
+ assertEquals(count, js.getReduceInputRecords());
+ System.out.println("Reduce output records : " + js.getReduceOutputRecords());
+ assertEquals(count, js.getReduceOutputRecords());
+ }
+ System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten());
assertEquals(filesize, pigStats.getBytesWritten());
}
-
+
@Test
public void testMapCombineReduceBinStorage() throws IOException, ExecException {
int count = 0;
@@ -407,56 +349,44 @@ public class TestCounters extends TestCa
}
pw.close();
- for(int i = 0; i < 10; i++)
- if(nos[i] > 0)
- count ++;
+ for(int i = 0; i < 10; i++) {
+ if(nos[i] > 0) count ++;
+ }
PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = group a by $0;");
pigServer.registerQuery("c = foreach b generate group, SUM(a.$1);");
- PigStats pigStats = pigServer.store("c", "output", "BinStorage").getStatistics();
-
- InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output", pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+ ExecJob job = pigServer.store("c", "output", "BinStorage");
+ PigStats pigStats = job.getStatistics();
+
+ InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output",
+ pigServer.getPigContext()), pigServer.getPigContext());
long filesize = 0;
while(is.read() != -1) filesize++;
is.close();
- Map<String, Map<String, String>> stats = pigStats.getPigStats();
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("output"), true);
System.out.println("============================================");
System.out.println("Test case MapCombineReduce");
System.out.println("============================================");
- System.out.println("MRPlan : \n" + pigStats.getMRPlan());
- for(Map.Entry<String, Map<String, String>> entry : stats.entrySet()) {
- System.out.println("============================================");
- System.out.println("Job : " + entry.getKey());
- for(Map.Entry<String, String> e1 : entry.getValue().entrySet()) {
- System.out.println(" - " + e1.getKey() + " : \n" + e1.getValue());
- }
- System.out.println("============================================");
- }
-
- Map.Entry<String, Map<String, String>> e = stats.entrySet().iterator().next();
-
- Map<String, String> jobStats = e.getValue();
-
- System.out.println("Map input records : " + jobStats.get("PIG_STATS_MAP_INPUT_RECORDS"));
- assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_INPUT_RECORDS")));
- System.out.println("Map output records : " + jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"));
- assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS")));
- System.out.println("Reduce input records : " + jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS"));
- assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS")));
- System.out.println("Reduce output records : " + jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS"));
- assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS")));
-
- assertNotNull(jobStats.get("PIG_STATS_COMBINE_PLAN"));
- assertNotNull(jobStats.get("PIG_STATS_MAP_PLAN"));
- assertNotNull(jobStats.get("PIG_STATS_REDUCE_PLAN"));
-
- assertEquals(count, pigStats.getRecordsWritten());
+
+ JobGraph jp = pigStats.getJobGraph();
+ Iterator<JobStats> iter = jp.iterator();
+ while (iter.hasNext()) {
+ JobStats js = iter.next();
+ System.out.println("Map input records : " + js.getMapInputRecords());
+ assertEquals(MAX, js.getMapInputRecords());
+ System.out.println("Map output records : " + js.getMapOutputRecords());
+ assertEquals(MAX, js.getMapOutputRecords());
+ System.out.println("Reduce input records : " + js.getReduceInputRecords());
+ assertEquals(count, js.getReduceInputRecords());
+ System.out.println("Reduce output records : " + js.getReduceOutputRecords());
+ assertEquals(count, js.getReduceOutputRecords());
+ }
+ System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten());
assertEquals(filesize, pigStats.getBytesWritten());
}
@@ -476,52 +406,49 @@ public class TestCounters extends TestCa
}
pw.close();
- for(int i = 0; i < 10; i++)
- if(nos[i] > 0)
- count ++;
+ for(int i = 0; i < 10; i++) {
+ if(nos[i] > 0) count ++;
+ }
PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
pigServer.registerQuery("a = load '" + file + "';");
pigServer.registerQuery("b = order a by $0;");
pigServer.registerQuery("c = group b by $0;");
pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);");
- PigStats pigStats = pigServer.store("d", "output").getStatistics();
+ ExecJob job = pigServer.store("d", "output");
+ PigStats pigStats = job.getStatistics();
- InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output", pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+ InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output",
+ pigServer.getPigContext()), pigServer.getPigContext());
long filesize = 0;
while(is.read() != -1) filesize++;
is.close();
- Map<String, Map<String, String>> stats = pigStats.getPigStats();
+
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("output"), true);
System.out.println("============================================");
System.out.println("Test case MultipleMRJobs");
System.out.println("============================================");
- System.out.println("MRPlan : \n" + pigStats.getMRPlan());
- for(Map.Entry<String, Map<String, String>> entry : stats.entrySet()) {
- System.out.println("============================================");
- System.out.println("Job : " + entry.getKey());
- for(Map.Entry<String, String> e1 : entry.getValue().entrySet()) {
- System.out.println(" - " + e1.getKey() + " : \n" + e1.getValue());
- }
- System.out.println("============================================");
- }
-
- Map<String, String> jobStats = stats.get(pigStats.getRootJobIDs().get(0));
-
- System.out.println("Map input records : " + jobStats.get("PIG_STATS_MAP_INPUT_RECORDS"));
- assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_INPUT_RECORDS")));
- System.out.println("Map output records : " + jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"));
- assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS")));
- System.out.println("Reduce input records : " + jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS"));
- assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS")));
- System.out.println("Reduce output records : " + jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS"));
- assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS")));
- assertEquals(count, pigStats.getRecordsWritten());
- assertEquals(filesize, pigStats.getBytesWritten());
+ JobGraph jp = pigStats.getJobGraph();
+ JobStats js = (JobStats)jp.getSinks().get(0);
+
+ System.out.println("Job id: " + js.getName());
+ System.out.println(jp.toString());
+
+ System.out.println("Map input records : " + js.getMapInputRecords());
+ assertEquals(MAX, js.getMapInputRecords());
+ System.out.println("Map output records : " + js.getMapOutputRecords());
+ assertEquals(MAX, js.getMapOutputRecords());
+ System.out.println("Reduce input records : " + js.getReduceInputRecords());
+ assertEquals(count, js.getReduceInputRecords());
+ System.out.println("Reduce output records : " + js.getReduceOutputRecords());
+ assertEquals(count, js.getReduceOutputRecords());
+
+ System.out.println("Hdfs bytes written : " + js.getHdfsBytesWritten());
+ assertEquals(filesize, js.getHdfsBytesWritten());
}
@@ -543,22 +470,19 @@ public class TestCounters extends TestCa
pigServer.registerQuery("store b into '/tmp/outout1';");
pigServer.registerQuery("store c into '/tmp/outout2';");
List<ExecJob> jobs = pigServer.executeBatch();
-
- assertTrue(jobs != null && jobs.size() == 2);
-
- Map<String, Map<String, String>> stats =
- jobs.get(0).getStatistics().getPigStats();
+ PigStats stats = jobs.get(0).getStatistics();
+ assertTrue(stats.getOutputLocations().size() == 2);
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("/tmp/outout1"), true);
cluster.getFileSystem().delete(new Path("/tmp/outout2"), true);
- Map<String, String> entry =
- stats.get(PigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+ JobStats js = (JobStats)stats.getJobGraph().getSinks().get(0);
+ Map<String, Long> entry = js.getMultiStoreCounters();
long counter = 0;
- for (String val : entry.values()) {
- counter += new Long(val);
+ for (Long val : entry.values()) {
+ counter += val;
}
assertEquals(MAX, counter);
@@ -593,32 +517,30 @@ public class TestCounters extends TestCa
pigServer.registerQuery("store d into '/tmp/outout1';");
pigServer.registerQuery("store g into '/tmp/outout2';");
List<ExecJob> jobs = pigServer.executeBatch();
+ PigStats stats = jobs.get(0).getStatistics();
- assertTrue(jobs != null && jobs.size() == 2);
-
- Map<String, Map<String, String>> stats =
- jobs.get(0).getStatistics().getPigStats();
-
+ assertTrue(stats.getOutputLocations().size() == 2);
+
cluster.getFileSystem().delete(new Path(file), true);
cluster.getFileSystem().delete(new Path("/tmp/outout1"), true);
cluster.getFileSystem().delete(new Path("/tmp/outout2"), true);
- Map<String, String> entry =
- stats.get(PigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+ JobStats js = (JobStats)stats.getJobGraph().getSinks().get(0);
+ Map<String, Long> entry = js.getMultiStoreCounters();
long counter = 0;
- for (String val : entry.values()) {
- counter += new Long(val);
+ for (Long val : entry.values()) {
+ counter += val;
}
assertEquals(groups, counter);
}
- /*
+ /*
* IMPORTANT NOTE:
* COMMENTED OUT BECAUSE COUNTERS DO NOT CURRENTLY WORK IN LOCAL MODE -
* SEE PIG-1286 - UNCOMMENT WHEN IT IS FIXED
- */
+ */
// @Test
// public void testLocal() throws IOException, ExecException {
// int count = 0;
Added: hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=957277&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java Wed Jun 23 17:29:33 2010
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.PrintWriter;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.pig.PigRunner;
+import org.apache.pig.PigRunner.ReturnCode;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.OutputStats;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestPigRunner {
+
+ private static MiniCluster cluster;
+
+ private static final String INPUT_FILE = "input";
+ private static final String OUTPUT_FILE = "output";
+ private static final String PIG_FILE = "test.pig";
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ cluster = MiniCluster.buildCluster();
+ PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+ w.println("1\t2\t3");
+ w.println("5\t3\t4");
+ w.println("3\t4\t5");
+ w.println("5\t6\t7");
+ w.println("3\t7\t8");
+ w.close();
+ Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ new File(INPUT_FILE).delete();
+ cluster.shutDown();
+ }
+
+ @Test
+ public void simpleTest() throws Exception {
+ PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
+ w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
+ w.println("B = group A by a0;");
+ w.println("C = foreach B generate group, COUNT(A);");
+ w.println("store C into '" + OUTPUT_FILE + "';");
+ w.close();
+
+ try {
+ String[] args = { PIG_FILE };
+ PigStats stats = PigRunner.run(args);
+
+ assertTrue(stats.isSuccessful());
+
+ assertTrue(stats.getJobGraph().size() == 1);
+ String name = stats.getOutputNames().get(0);
+ assertEquals(OUTPUT_FILE, name);
+ assertEquals(12, stats.getBytesWritten());
+ assertEquals(3, stats.getRecordWritten());
+
+ assertEquals("A,B,C",
+ ((JobStats)stats.getJobGraph().getSinks().get(0)).getAlias());
+ } finally {
+ new File(PIG_FILE).delete();
+ Util.deleteFile(cluster, OUTPUT_FILE);
+ }
+ }
+
+ @Test
+ public void orderByTest() throws Exception {
+ PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
+ w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
+ w.println("B = order A by a0;");
+ w.println("C = limit B 2;");
+ w.println("store C into '" + OUTPUT_FILE + "';");
+ w.close();
+ String[] args = { PIG_FILE };
+ try {
+ PigStats stats = PigRunner.run(args);
+ assertTrue(stats.isSuccessful());
+ assertTrue(stats.getJobGraph().size() == 3);
+ assertTrue(stats.getJobGraph().getSinks().size() == 1);
+ assertTrue(stats.getJobGraph().getSources().size() == 1);
+ JobStats js = (JobStats) stats.getJobGraph().getSinks().get(0);
+ assertEquals(OUTPUT_FILE, js.getOutputs().get(0).getName());
+ assertEquals(2, js.getOutputs().get(0).getNumberRecords());
+ assertEquals(12, js.getOutputs().get(0).getBytes());
+ assertEquals(OUTPUT_FILE, stats.getOutputNames().get(0));
+ assertEquals(2, stats.getRecordWritten());
+ assertEquals(12, stats.getBytesWritten());
+
+ assertEquals("A", ((JobStats) stats.getJobGraph().getSources().get(
+ 0)).getAlias());
+ assertEquals("B", ((JobStats) stats.getJobGraph().getPredecessors(
+ js).get(0)).getAlias());
+ assertEquals("B", js.getAlias());
+ } finally {
+ new File(PIG_FILE).delete();
+ Util.deleteFile(cluster, OUTPUT_FILE);
+ }
+ }
+
+ @Test
+ public void simpleMultiQueryTest() throws Exception {
+ final String OUTPUT_FILE_2 = "output2";
+
+ PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
+ w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
+ w.println("B = filter A by a0 >= 4;");
+ w.println("C = filter A by a0 < 4;");
+ w.println("store B into '" + OUTPUT_FILE_2 + "';");
+ w.println("store C into '" + OUTPUT_FILE + "';");
+ w.close();
+
+ try {
+ String[] args = { PIG_FILE };
+ PigStats stats = PigRunner.run(args);
+ assertTrue(stats.isSuccessful());
+ assertTrue(stats.getJobGraph().size() == 1);
+ assertEquals(5, stats.getRecordWritten());
+ assertEquals(28, stats.getBytesWritten());
+ assertTrue(stats.getOutputNames().size() == 2);
+ for (String fname : stats.getOutputNames()) {
+ assertTrue(fname.equals(OUTPUT_FILE) || fname.equals(OUTPUT_FILE_2));
+ if (fname.equals(OUTPUT_FILE)) {
+ assertEquals(3, stats.getNumberRecords(fname));
+ } else {
+ assertEquals(2, stats.getNumberRecords(fname));
+ }
+ }
+ assertEquals("A,B,C",
+ ((JobStats)stats.getJobGraph().getSinks().get(0)).getAlias());
+ } finally {
+ new File(PIG_FILE).delete();
+ Util.deleteFile(cluster, OUTPUT_FILE);
+ Util.deleteFile(cluster, OUTPUT_FILE_2);
+ }
+ }
+
+ @Test
+ public void simpleMultiQueryTest2() throws Exception {
+ final String OUTPUT_FILE_2 = "output2";
+
+ PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
+ w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
+ w.println("B = filter A by a0 >= 4;");
+ w.println("C = filter A by a0 < 4;");
+ w.println("D = group C by a0;");
+ w.println("E = foreach D generate group, COUNT(C);");
+ w.println("store B into '" + OUTPUT_FILE_2 + "';");
+ w.println("store E into '" + OUTPUT_FILE + "';");
+ w.close();
+
+ try {
+ String[] args = { PIG_FILE };
+ PigStats stats = PigRunner.run(args);
+ assertTrue(stats.isSuccessful());
+ assertTrue(stats.getJobGraph().size() == 1);
+ assertEquals(4, stats.getRecordWritten());
+ assertEquals(18, stats.getBytesWritten());
+ assertTrue(stats.getOutputNames().size() == 2);
+ for (String fname : stats.getOutputNames()) {
+ assertTrue(fname.equals(OUTPUT_FILE) || fname.equals(OUTPUT_FILE_2));
+ if (fname.equals(OUTPUT_FILE)) {
+ assertEquals(2, stats.getNumberRecords(fname));
+ } else {
+ assertEquals(2, stats.getNumberRecords(fname));
+ }
+ }
+ assertEquals("A,B,C,D,E",
+ ((JobStats)stats.getJobGraph().getSinks().get(0)).getAlias());
+ } finally {
+ new File(PIG_FILE).delete();
+ Util.deleteFile(cluster, OUTPUT_FILE);
+ Util.deleteFile(cluster, OUTPUT_FILE_2);
+ }
+ }
+
+ @Test
+ public void simpleNegativeTest() throws Exception {
+ PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
+ w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
+ w.println("B = group A by a;");
+ w.println("C = foreach B generate group, COUNT(A);");
+ w.println("store C into '" + OUTPUT_FILE + "';");
+ w.close();
+ String[] args = { "-c", PIG_FILE };
+ PigStats stats = PigRunner.run(args);
+ assertTrue(stats.getReturnCode() == ReturnCode.PIG_EXCEPTION);
+ assertTrue(stats.getErrorCode() == 1000);
+ assertEquals("Error during parsing. Invalid alias: a in {a0: int,a1: int,a2: int}",
+ stats.getErrorMessage());
+ }
+
+ @Test
+ public void simpleNegativeTest2() throws Exception {
+ String[] args = { "-c", "-e", "this is a test" };
+ PigStats stats = PigRunner.run(args);
+ assertTrue(stats.getReturnCode() == ReturnCode.ILLEGAL_ARGS);
+ }
+
+ @Test
+ public void simpleNegativeTest3() throws Exception {
+ String[] args = { "-c", "-y" };
+ PigStats stats = PigRunner.run(args);
+ assertTrue(stats.getReturnCode() == ReturnCode.PARSE_EXCEPTION);
+ assertEquals("Found unknown option (-y) at position 2",
+ stats.getErrorMessage());
+ }
+
+ @Test
+ public void NagetiveTest() throws Exception {
+ final String OUTPUT_FILE_2 = "output2";
+ PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
+ w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
+ w.println("B = foreach A generate 1;");
+ w.println("C = foreach A generate 0/0;");
+ w.println("store B into '" + OUTPUT_FILE + "';");
+ w.println("store C into '" + OUTPUT_FILE_2 + "';");
+ w.println("D = load '" + OUTPUT_FILE_2 + "';");
+ w.println("E = stream D through `false`;");
+ w.println("store E into 'ee';");
+ w.close();
+
+ try {
+ String[] args = { PIG_FILE };
+ PigStats stats = PigRunner.run(args);
+ System.out.println("++++ code: " + stats.getReturnCode());
+ assertTrue(!stats.isSuccessful());
+ assertTrue(stats.getReturnCode() == ReturnCode.PARTIAL_FAILURE);
+ assertTrue(stats.getJobGraph().size() == 2);
+ JobStats job = (JobStats)stats.getJobGraph().getSources().get(0);
+ assertTrue(job.isSuccessful());
+ job = (JobStats)stats.getJobGraph().getSinks().get(0);
+ assertTrue(!job.isSuccessful());
+ assertTrue(stats.getOutputStats().size() == 3);
+ for (OutputStats output : stats.getOutputStats()) {
+ if (output.getName().equals("ee")) {
+ assertTrue(!output.isSuccessful());
+ } else {
+ assertTrue(output.isSuccessful());
+ }
+ }
+ } finally {
+ new File(PIG_FILE).delete();
+ Util.deleteFile(cluster, OUTPUT_FILE);
+ Util.deleteFile(cluster, OUTPUT_FILE_2);
+ }
+ }
+}
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java?rev=957277&r1=957276&r2=957277&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java Wed Jun 23 17:29:33 2010
@@ -25,7 +25,7 @@ import junit.framework.TestCase;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
-import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.tools.pigstats.PigStats;
public class TestPigStats extends TestCase {
@@ -38,10 +38,9 @@ public class TestPigStats extends TestCa
String filePath = outputFile.getAbsolutePath();
outputFile.delete();
PigServer pig = new PigServer(ExecType.LOCAL);
- pig
- .registerQuery("A = load 'test/org/apache/pig/test/data/passwd';");
- PigStats stats = pig.store("A", filePath)
- .getStatistics();
+ pig.registerQuery("A = load 'test/org/apache/pig/test/data/passwd';");
+ ExecJob job = pig.store("A", filePath);
+ PigStats stats = job.getStatistics();
File dataFile = new File( outputFile.getAbsoluteFile() + File.separator + "part-00000" );
assertEquals(dataFile.length(), stats.getBytesWritten());
} catch (IOException e) {
@@ -55,7 +54,6 @@ public class TestPigStats extends TestCa
deleteDirectory(outputFile);
}
}
-
}
private void deleteDirectory( File dir ) {