You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by rd...@apache.org on 2010/06/23 19:29:34 UTC

svn commit: r957277 [3/3] - in /hadoop/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLaye...

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java?rev=957277&r1=957276&r2=957277&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestCounters.java Wed Jun 23 17:29:33 2010
@@ -18,31 +18,34 @@
 
 package org.apache.pig.test;
 
-import java.io.File;
-import java.io.FileOutputStream;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.PrintWriter;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
 
-import junit.framework.TestCase;
-
 import org.apache.hadoop.fs.Path;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.experimental.plan.Operator;
 import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.PigStats;
-import org.apache.pig.tools.pigstats.PigStatsUtil;
+import org.apache.pig.tools.pigstats.PigStats.JobGraph;
 import org.junit.AfterClass;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
+
 @RunWith(JUnit4.class)
-public class TestCounters extends TestCase {
+public class TestCounters {
     String file = "input.txt";
 
     static MiniCluster cluster = MiniCluster.buildCluster();
@@ -63,22 +66,21 @@ public class TestCounters extends TestCa
         for(int i = 0; i < MAX; i++) {
             int t = r.nextInt(100);
             pw.println(t);
-            if(t > 50)
-                count ++;
+            if(t > 50) count ++;
         }
         pw.close();
         PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = filter a by $0 > 50;");
         pigServer.registerQuery("c = foreach b generate $0 - 50;");
-        PigStats pigStats = pigServer.store("c", "output_map_only").getStatistics();
-
-        //PigStats pigStats = pigServer.getPigStats();
-        Map<String, Map<String, String>> stats = pigStats.getPigStats();
+        ExecJob job = pigServer.store("c", "output_map_only");
+        PigStats pigStats = job.getStatistics();
         
         //counting the no. of bytes in the output file
         //long filesize = cluster.getFileSystem().getFileStatus(new Path("output_map_only")).getLen();
-        InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output_map_only", pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+        InputStream is = FileLocalizer.open(FileLocalizer.fullPath(
+                "output_map_only", pigServer.getPigContext()),
+                pigServer.getPigContext());
         long filesize = 0;
         while(is.read() != -1) filesize++;
         
@@ -90,34 +92,21 @@ public class TestCounters extends TestCa
         System.out.println("============================================");
         System.out.println("Test case Map Only");
         System.out.println("============================================");
-        System.out.println("MRPlan : \n" + pigStats.getMRPlan());
-        for(Map.Entry<String, Map<String, String>> entry : stats.entrySet()) {
-            System.out.println("============================================");
-            System.out.println("Job : " + entry.getKey());
-            for(Map.Entry<String, String> e1 : entry.getValue().entrySet()) {
-                System.out.println(" - " + e1.getKey() + " : \n" + e1.getValue());
-            }
-            System.out.println("============================================");
-        }
-
-        Map.Entry<String, Map<String, String>> e = stats.entrySet().iterator().next();
-
-        //System.out.println("Job Name : " + e.getKey());
-
-        Map<String, String> jobStats = e.getValue();
-
-        System.out.println("Map input records : " + jobStats.get("PIG_STATS_MAP_INPUT_RECORDS"));
-        assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_INPUT_RECORDS")));
-        System.out.println("Map output records : " + jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"));
-        assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS")));
-        assertNull(jobStats.get("PIG_STATS_REDUCE_PLAN"));
-        assertNull(jobStats.get("PIG_STATS_COMBINE_PLAN"));
-        assertNotNull(jobStats.get("PIG_STATS_MAP_PLAN"));
-        assertEquals(0, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS")));
-        assertEquals(0, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS")));
 
-        assertEquals(count, pigStats.getRecordsWritten());
-        assertEquals(filesize, pigStats.getBytesWritten());
+        JobGraph jg = pigStats.getJobGraph();
+        Iterator<JobStats> iter = jg.iterator();
+        while (iter.hasNext()) {
+            JobStats js = iter.next();                    
+
+            System.out.println("Map input records : " + js.getMapInputRecords());
+            assertEquals(MAX, js.getMapInputRecords());
+            System.out.println("Map output records : " + js.getMapOutputRecords());
+            assertEquals(count, js.getMapOutputRecords());
+            assertEquals(0, js.getReduceInputRecords());
+            assertEquals(0, js.getReduceOutputRecords());
+            System.out.println("Hdfs bytes written : " + js.getHdfsBytesWritten());
+            assertEquals(filesize, js.getHdfsBytesWritten());
+        }
 
     }
 
@@ -136,49 +125,38 @@ public class TestCounters extends TestCa
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = filter a by $0 > 50;");
         pigServer.registerQuery("c = foreach b generate $0 - 50;");
-        //pigServer.store("c", "output_map_only");
-        PigStats pigStats = pigServer.store("c", "output_map_only", "BinStorage").getStatistics();
+        ExecJob job = pigServer.store("c", "output_map_only", "BinStorage");
+        PigStats pigStats = job.getStatistics();
         
-        InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output_map_only", pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+        InputStream is = FileLocalizer.open(FileLocalizer.fullPath(
+                "output_map_only", pigServer.getPigContext()),
+                pigServer.getPigContext());
         long filesize = 0;
         while(is.read() != -1) filesize++;
         
         is.close();
 
-        Map<String, Map<String, String>> stats = pigStats.getPigStats();
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("output_map_only"), true);
 
         System.out.println("============================================");
         System.out.println("Test case Map Only");
         System.out.println("============================================");
-        System.out.println("MRPlan : \n" + pigStats.getMRPlan());
-        for(Map.Entry<String, Map<String, String>> entry : stats.entrySet()) {
-            System.out.println("============================================");
-            System.out.println("Job : " + entry.getKey());
-            for(Map.Entry<String, String> e1 : entry.getValue().entrySet()) {
-                System.out.println(" - " + e1.getKey() + " : \n" + e1.getValue());
-            }
-            System.out.println("============================================");
-        }
-
-        Map.Entry<String, Map<String, String>> e = stats.entrySet().iterator().next();
-
-        //System.out.println("Job Name : " + e.getKey());
-
-        Map<String, String> jobStats = e.getValue();
-
-        System.out.println("Map input records : " + jobStats.get("PIG_STATS_MAP_INPUT_RECORDS"));
-        assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_INPUT_RECORDS")));
-        System.out.println("Map output records : " + jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"));
-        assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS")));
-        assertNull(jobStats.get("PIG_STATS_REDUCE_PLAN"));
-        assertNull(jobStats.get("PIG_STATS_COMBINE_PLAN"));
-        assertNotNull(jobStats.get("PIG_STATS_MAP_PLAN"));
-        assertEquals(0, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS")));
-        assertEquals(0, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS")));
 
-        assertEquals(count, pigStats.getRecordsWritten());
+        JobGraph jp = pigStats.getJobGraph();
+        Iterator<JobStats> iter = jp.iterator();
+        while (iter.hasNext()) {
+            JobStats js = iter.next();
+        
+            System.out.println("Map input records : " + js.getMapInputRecords());
+            assertEquals(MAX, js.getMapInputRecords());
+            System.out.println("Map output records : " + js.getMapOutputRecords());
+            assertEquals(count, js.getMapOutputRecords());
+            assertEquals(0, js.getReduceInputRecords());
+            assertEquals(0, js.getReduceOutputRecords());
+        }
+            
+        System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten());
         assertEquals(filesize, pigStats.getBytesWritten());
     }
 
@@ -198,56 +176,44 @@ public class TestCounters extends TestCa
         }
         pw.close();
 
-        for(int i = 0; i < 10; i++) 
-            if(nos[i] > 0)
-                count ++;
+        for(int i = 0; i < 10; i++) {
+            if(nos[i] > 0) count ++;
+        }
 
         PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = group a by $0;");
         pigServer.registerQuery("c = foreach b generate group;");
-        PigStats pigStats = pigServer.store("c", "output").getStatistics();
-        InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output", pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+        ExecJob job = pigServer.store("c", "output");
+        PigStats pigStats = job.getStatistics();
+        InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output",
+                pigServer.getPigContext()), pigServer.getPigContext());
         long filesize = 0;
         while(is.read() != -1) filesize++;
         
         is.close();
 
-        Map<String, Map<String, String>> stats = pigStats.getPigStats();
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("output"), true);
 
         System.out.println("============================================");
         System.out.println("Test case MapReduce");
         System.out.println("============================================");
-        System.out.println("MRPlan : \n" + pigStats.getMRPlan());
-        for(Map.Entry<String, Map<String, String>> entry : stats.entrySet()) {
-            System.out.println("============================================");
-            System.out.println("Job : " + entry.getKey());
-            for(Map.Entry<String, String> e1 : entry.getValue().entrySet()) {
-                System.out.println(" - " + e1.getKey() + " : \n" + e1.getValue());
-            }
-            System.out.println("============================================");
-        }
-
-        Map.Entry<String, Map<String, String>> e = stats.entrySet().iterator().next();
-
-        Map<String, String> jobStats = e.getValue();
-
-        System.out.println("Map input records : " + jobStats.get("PIG_STATS_MAP_INPUT_RECORDS"));
-        assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_INPUT_RECORDS")));
-        System.out.println("Map output records : " + jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"));
-        assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS")));
-        System.out.println("Reduce input records : " + jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS"));
-        assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS")));
-        System.out.println("Reduce output records : " + jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS"));
-        assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS")));
-
-        assertNull(jobStats.get("PIG_STATS_COMBINE_PLAN"));
-        assertNotNull(jobStats.get("PIG_STATS_MAP_PLAN"));
-        assertNotNull(jobStats.get("PIG_STATS_REDUCE_PLAN"));
 
-        assertEquals(count, pigStats.getRecordsWritten());
+        JobGraph jp = pigStats.getJobGraph();
+        Iterator<JobStats> iter = jp.iterator();
+        while (iter.hasNext()) {
+            JobStats js = iter.next();
+            System.out.println("Map input records : " + js.getMapInputRecords());
+            assertEquals(MAX, js.getMapInputRecords());
+            System.out.println("Map output records : " + js.getMapOutputRecords());
+            assertEquals(MAX, js.getMapOutputRecords());
+            System.out.println("Reduce input records : " + js.getReduceInputRecords());
+            assertEquals(MAX, js.getReduceInputRecords());
+            System.out.println("Reduce output records : " + js.getReduceOutputRecords());
+            assertEquals(count, js.getReduceOutputRecords());
+        }
+        System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten());
         assertEquals(filesize, pigStats.getBytesWritten());
     }
 
@@ -267,59 +233,46 @@ public class TestCounters extends TestCa
         }
         pw.close();
 
-        for(int i = 0; i < 10; i++) 
-            if(nos[i] > 0)
-                count ++;
+        for(int i = 0; i < 10; i++) {
+            if(nos[i] > 0) count ++;
+        }
 
         PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = group a by $0;");
         pigServer.registerQuery("c = foreach b generate group;");
-        PigStats pigStats = pigServer.store("c", "output", "BinStorage").getStatistics();
+        ExecJob job = pigServer.store("c", "output", "BinStorage");
+        PigStats pigStats = job.getStatistics();
 
-        InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output", pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+        InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output",
+                pigServer.getPigContext()), pigServer.getPigContext());
         long filesize = 0;
         while(is.read() != -1) filesize++;
         
         is.close();
         
-        Map<String, Map<String, String>> stats = pigStats.getPigStats();
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("output"), true);
 
         System.out.println("============================================");
         System.out.println("Test case MapReduce");
         System.out.println("============================================");
-        System.out.println("MRPlan : \n" + pigStats.getMRPlan());
-        for(Map.Entry<String, Map<String, String>> entry : stats.entrySet()) {
-            System.out.println("============================================");
-            System.out.println("Job : " + entry.getKey());
-            for(Map.Entry<String, String> e1 : entry.getValue().entrySet()) {
-                System.out.println(" - " + e1.getKey() + " : \n" + e1.getValue());
-            }
-            System.out.println("============================================");
-        }
-
-        Map.Entry<String, Map<String, String>> e = stats.entrySet().iterator().next();
-
-        Map<String, String> jobStats = e.getValue();
-
-        System.out.println("Map input records : " + jobStats.get("PIG_STATS_MAP_INPUT_RECORDS"));
-        assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_INPUT_RECORDS")));
-        System.out.println("Map output records : " + jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"));
-        assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS")));
-        System.out.println("Reduce input records : " + jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS"));
-        assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS")));
-        System.out.println("Reduce output records : " + jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS"));
-        assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS")));
-
-        assertNull(jobStats.get("PIG_STATS_COMBINE_PLAN"));
-        assertNotNull(jobStats.get("PIG_STATS_MAP_PLAN"));
-        assertNotNull(jobStats.get("PIG_STATS_REDUCE_PLAN"));
-        
-        assertEquals(count, pigStats.getRecordsWritten());
-        assertEquals(filesize, pigStats.getBytesWritten());
 
+        JobGraph jp = pigStats.getJobGraph();
+        Iterator<JobStats> iter = jp.iterator();
+        while (iter.hasNext()) {
+            JobStats js = iter.next();
+            System.out.println("Map input records : " + js.getMapInputRecords());
+            assertEquals(MAX, js.getMapInputRecords());
+            System.out.println("Map output records : " + js.getMapOutputRecords());
+            assertEquals(MAX, js.getMapOutputRecords());
+            System.out.println("Reduce input records : " + js.getReduceInputRecords());
+            assertEquals(MAX, js.getReduceInputRecords());
+            System.out.println("Reduce output records : " + js.getReduceOutputRecords());
+            assertEquals(count, js.getReduceOutputRecords());
+        }
+        System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten());
+        assertEquals(filesize, pigStats.getBytesWritten());
     }
 
     @Test
@@ -338,59 +291,48 @@ public class TestCounters extends TestCa
         }
         pw.close();
 
-        for(int i = 0; i < 10; i++) 
-            if(nos[i] > 0)
-                count ++;
+        for(int i = 0; i < 10; i++) {
+            if(nos[i] > 0) count ++;
+        }
 
         PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = group a by $0;");
         pigServer.registerQuery("c = foreach b generate group, SUM(a.$1);");
-        PigStats pigStats = pigServer.store("c", "output").getStatistics();
+        ExecJob job = pigServer.store("c", "output");
+        PigStats pigStats = job.getStatistics();
 
-        InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output", pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+        InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output",
+                pigServer.getPigContext()), pigServer.getPigContext());
         long filesize = 0;
         while(is.read() != -1) filesize++;
         
         is.close();
-        Map<String, Map<String, String>> stats = pigStats.getPigStats();
+ 
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("output"), true);
 
         System.out.println("============================================");
         System.out.println("Test case MapCombineReduce");
         System.out.println("============================================");
-        System.out.println("MRPlan : \n" + pigStats.getMRPlan());
-        for(Map.Entry<String, Map<String, String>> entry : stats.entrySet()) {
-            System.out.println("============================================");
-            System.out.println("Job : " + entry.getKey());
-            for(Map.Entry<String, String> e1 : entry.getValue().entrySet()) {
-                System.out.println(" - " + e1.getKey() + " : \n" + e1.getValue());
-            }
-            System.out.println("============================================");
-        }
-
-        Map.Entry<String, Map<String, String>> e = stats.entrySet().iterator().next();
-
-        Map<String, String> jobStats = e.getValue();
-
-        System.out.println("Map input records : " + jobStats.get("PIG_STATS_MAP_INPUT_RECORDS"));
-        assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_INPUT_RECORDS")));
-        System.out.println("Map output records : " + jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"));
-        assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS")));
-        System.out.println("Reduce input records : " + jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS"));
-        assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS")));
-        System.out.println("Reduce output records : " + jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS"));
-        assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS")));
-
-        assertNotNull(jobStats.get("PIG_STATS_COMBINE_PLAN"));
-        assertNotNull(jobStats.get("PIG_STATS_MAP_PLAN"));
-        assertNotNull(jobStats.get("PIG_STATS_REDUCE_PLAN"));
-
-        assertEquals(count, pigStats.getRecordsWritten());
+        
+        JobGraph jp = pigStats.getJobGraph();
+        Iterator<JobStats> iter = jp.iterator();
+        while (iter.hasNext()) {
+            JobStats js = iter.next();
+            System.out.println("Map input records : " + js.getMapInputRecords());
+            assertEquals(MAX, js.getMapInputRecords());
+            System.out.println("Map output records : " + js.getMapOutputRecords());
+            assertEquals(MAX, js.getMapOutputRecords());
+            System.out.println("Reduce input records : " + js.getReduceInputRecords());
+            assertEquals(count, js.getReduceInputRecords());
+            System.out.println("Reduce output records : " + js.getReduceOutputRecords());
+            assertEquals(count, js.getReduceOutputRecords());
+        }
+        System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten());
         assertEquals(filesize, pigStats.getBytesWritten());
     }
-    
+     
     @Test
     public void testMapCombineReduceBinStorage() throws IOException, ExecException {
         int count = 0;
@@ -407,56 +349,44 @@ public class TestCounters extends TestCa
         }
         pw.close();
 
-        for(int i = 0; i < 10; i++) 
-            if(nos[i] > 0)
-                count ++;
+        for(int i = 0; i < 10; i++) {
+            if(nos[i] > 0) count ++;
+        }
 
         PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = group a by $0;");
         pigServer.registerQuery("c = foreach b generate group, SUM(a.$1);");
-        PigStats pigStats = pigServer.store("c", "output", "BinStorage").getStatistics();
-
-        InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output", pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+        ExecJob job = pigServer.store("c", "output", "BinStorage");
+        PigStats pigStats = job.getStatistics();
+        
+        InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output",
+                pigServer.getPigContext()), pigServer.getPigContext());
         long filesize = 0;
         while(is.read() != -1) filesize++;
         
         is.close();
-        Map<String, Map<String, String>> stats = pigStats.getPigStats();
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("output"), true);
 
         System.out.println("============================================");
         System.out.println("Test case MapCombineReduce");
         System.out.println("============================================");
-        System.out.println("MRPlan : \n" + pigStats.getMRPlan());
-        for(Map.Entry<String, Map<String, String>> entry : stats.entrySet()) {
-            System.out.println("============================================");
-            System.out.println("Job : " + entry.getKey());
-            for(Map.Entry<String, String> e1 : entry.getValue().entrySet()) {
-                System.out.println(" - " + e1.getKey() + " : \n" + e1.getValue());
-            }
-            System.out.println("============================================");
-        }
-
-        Map.Entry<String, Map<String, String>> e = stats.entrySet().iterator().next();
-
-        Map<String, String> jobStats = e.getValue();
-
-        System.out.println("Map input records : " + jobStats.get("PIG_STATS_MAP_INPUT_RECORDS"));
-        assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_INPUT_RECORDS")));
-        System.out.println("Map output records : " + jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"));
-        assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS")));
-        System.out.println("Reduce input records : " + jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS"));
-        assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS")));
-        System.out.println("Reduce output records : " + jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS"));
-        assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS")));
-
-        assertNotNull(jobStats.get("PIG_STATS_COMBINE_PLAN"));
-        assertNotNull(jobStats.get("PIG_STATS_MAP_PLAN"));
-        assertNotNull(jobStats.get("PIG_STATS_REDUCE_PLAN"));
-
-        assertEquals(count, pigStats.getRecordsWritten());
+ 
+        JobGraph jp = pigStats.getJobGraph();
+        Iterator<JobStats> iter = jp.iterator();
+        while (iter.hasNext()) {
+            JobStats js = iter.next();
+            System.out.println("Map input records : " + js.getMapInputRecords());
+            assertEquals(MAX, js.getMapInputRecords());
+            System.out.println("Map output records : " + js.getMapOutputRecords());
+            assertEquals(MAX, js.getMapOutputRecords());
+            System.out.println("Reduce input records : " + js.getReduceInputRecords());
+            assertEquals(count, js.getReduceInputRecords());
+            System.out.println("Reduce output records : " + js.getReduceOutputRecords());
+            assertEquals(count, js.getReduceOutputRecords());
+        }
+        System.out.println("Hdfs bytes written : " + pigStats.getBytesWritten());
         assertEquals(filesize, pigStats.getBytesWritten());
     }
 
@@ -476,52 +406,49 @@ public class TestCounters extends TestCa
         }
         pw.close();
 
-        for(int i = 0; i < 10; i++) 
-            if(nos[i] > 0)
-                count ++;
+        for(int i = 0; i < 10; i++) { 
+            if(nos[i] > 0) count ++;
+        }
 
         PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
         pigServer.registerQuery("a = load '" + file + "';");
         pigServer.registerQuery("b = order a by $0;");
         pigServer.registerQuery("c = group b by $0;");
         pigServer.registerQuery("d = foreach c generate group, SUM(b.$1);");
-        PigStats pigStats = pigServer.store("d", "output").getStatistics();
+        ExecJob job = pigServer.store("d", "output");
+        PigStats pigStats = job.getStatistics();
         
-        InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output", pigServer.getPigContext()), ExecType.MAPREDUCE, pigServer.getPigContext().getDfs());
+        InputStream is = FileLocalizer.open(FileLocalizer.fullPath("output",
+                pigServer.getPigContext()), pigServer.getPigContext());
         long filesize = 0;
         while(is.read() != -1) filesize++;
         
         is.close();
-        Map<String, Map<String, String>> stats = pigStats.getPigStats();
+        
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("output"), true);
         
         System.out.println("============================================");
         System.out.println("Test case MultipleMRJobs");
         System.out.println("============================================");
-        System.out.println("MRPlan : \n" + pigStats.getMRPlan());
-        for(Map.Entry<String, Map<String, String>> entry : stats.entrySet()) {
-            System.out.println("============================================");
-            System.out.println("Job : " + entry.getKey());
-            for(Map.Entry<String, String> e1 : entry.getValue().entrySet()) {
-                System.out.println(" - " + e1.getKey() + " : \n" + e1.getValue());
-            }
-            System.out.println("============================================");
-        }
-
-        Map<String, String> jobStats = stats.get(pigStats.getRootJobIDs().get(0));
-
-        System.out.println("Map input records : " + jobStats.get("PIG_STATS_MAP_INPUT_RECORDS"));
-        assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_INPUT_RECORDS")));
-        System.out.println("Map output records : " + jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS"));
-        assertEquals(MAX, Integer.parseInt(jobStats.get("PIG_STATS_MAP_OUTPUT_RECORDS")));
-        System.out.println("Reduce input records : " + jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS"));
-        assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_INPUT_RECORDS")));
-        System.out.println("Reduce output records : " + jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS"));
-        assertEquals(count, Integer.parseInt(jobStats.get("PIG_STATS_REDUCE_OUTPUT_RECORDS")));
         
-        assertEquals(count, pigStats.getRecordsWritten());
-        assertEquals(filesize, pigStats.getBytesWritten());
+        JobGraph jp = pigStats.getJobGraph();
+        JobStats js = (JobStats)jp.getSinks().get(0);
+        
+        System.out.println("Job id: " + js.getName());
+        System.out.println(jp.toString());
+        
+        System.out.println("Map input records : " + js.getMapInputRecords());
+        assertEquals(MAX, js.getMapInputRecords());
+        System.out.println("Map output records : " + js.getMapOutputRecords());
+        assertEquals(MAX, js.getMapOutputRecords());
+        System.out.println("Reduce input records : " + js.getReduceInputRecords());
+        assertEquals(count, js.getReduceInputRecords());
+        System.out.println("Reduce output records : " + js.getReduceOutputRecords());
+        assertEquals(count, js.getReduceOutputRecords());
+        
+        System.out.println("Hdfs bytes written : " + js.getHdfsBytesWritten());
+        assertEquals(filesize, js.getHdfsBytesWritten());
 
     }
     
@@ -543,22 +470,19 @@ public class TestCounters extends TestCa
         pigServer.registerQuery("store b into '/tmp/outout1';");
         pigServer.registerQuery("store c into '/tmp/outout2';");
         List<ExecJob> jobs = pigServer.executeBatch();
-        
-        assertTrue(jobs != null && jobs.size() == 2);
-        
-        Map<String, Map<String, String>> stats = 
-            jobs.get(0).getStatistics().getPigStats();
+        PigStats stats = jobs.get(0).getStatistics();
+        assertTrue(stats.getOutputLocations().size() == 2);
         
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("/tmp/outout1"), true);
         cluster.getFileSystem().delete(new Path("/tmp/outout2"), true);
 
-        Map<String, String> entry = 
-            stats.get(PigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+        JobStats js = (JobStats)stats.getJobGraph().getSinks().get(0);
         
+        Map<String, Long> entry = js.getMultiStoreCounters();
         long counter = 0;
-        for (String val : entry.values()) {
-            counter += new Long(val);
+        for (Long val : entry.values()) {
+            counter += val;
         }
         
         assertEquals(MAX, counter);       
@@ -593,32 +517,30 @@ public class TestCounters extends TestCa
         pigServer.registerQuery("store d into '/tmp/outout1';");
         pigServer.registerQuery("store g into '/tmp/outout2';");
         List<ExecJob> jobs = pigServer.executeBatch();
+        PigStats stats = jobs.get(0).getStatistics();
         
-        assertTrue(jobs != null && jobs.size() == 2);
-        
-        Map<String, Map<String, String>> stats = 
-            jobs.get(0).getStatistics().getPigStats();
-        
+        assertTrue(stats.getOutputLocations().size() == 2);
+               
         cluster.getFileSystem().delete(new Path(file), true);
         cluster.getFileSystem().delete(new Path("/tmp/outout1"), true);
         cluster.getFileSystem().delete(new Path("/tmp/outout2"), true);
 
-        Map<String, String> entry = 
-            stats.get(PigStatsUtil.MULTI_STORE_COUNTER_GROUP);
+        JobStats js = (JobStats)stats.getJobGraph().getSinks().get(0);
         
+        Map<String, Long> entry = js.getMultiStoreCounters();
         long counter = 0;
-        for (String val : entry.values()) {
-            counter += new Long(val);      
+        for (Long val : entry.values()) {
+            counter += val;
         }
         
         assertEquals(groups, counter);       
     }    
     
-    /*
+    /*    
      * IMPORTANT NOTE:
      * COMMENTED OUT BECAUSE COUNTERS DO NOT CURRENTLY WORK IN LOCAL MODE -
      * SEE PIG-1286 - UNCOMMENT WHEN IT IS FIXED
-     */
+     */ 
 //    @Test
 //    public void testLocal() throws IOException, ExecException {
 //        int count = 0;

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=957277&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java Wed Jun 23 17:29:33 2010
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.PrintWriter;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.pig.PigRunner;
+import org.apache.pig.PigRunner.ReturnCode;
+import org.apache.pig.tools.pigstats.JobStats;
+import org.apache.pig.tools.pigstats.OutputStats;
+import org.apache.pig.tools.pigstats.PigStats;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestPigRunner {
+    
+    private static MiniCluster cluster; 
+    
+    private static final String INPUT_FILE = "input";
+    private static final String OUTPUT_FILE = "output";
+    private static final String PIG_FILE = "test.pig";
+
+    @BeforeClass
+    public static void setUpBeforeClass() throws Exception {
+        cluster = MiniCluster.buildCluster();
+        PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+        w.println("1\t2\t3");
+        w.println("5\t3\t4");
+        w.println("3\t4\t5");
+        w.println("5\t6\t7");
+        w.println("3\t7\t8");
+        w.close();
+        Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+    }
+
+    @AfterClass
+    public static void tearDownAfterClass() throws Exception {
+        new File(INPUT_FILE).delete();
+        cluster.shutDown();
+    }
+
+    @Test
+    public void simpleTest() throws Exception {
+        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
+        w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
+        w.println("B = group A by a0;");
+        w.println("C = foreach B generate group, COUNT(A);");
+        w.println("store C into '" + OUTPUT_FILE + "';");
+        w.close();
+        
+        try {
+            String[] args = { PIG_FILE };
+            PigStats stats = PigRunner.run(args);
+     
+            assertTrue(stats.isSuccessful());
+            
+            assertTrue(stats.getJobGraph().size() == 1);
+            String name = stats.getOutputNames().get(0);
+            assertEquals(OUTPUT_FILE, name);
+            assertEquals(12, stats.getBytesWritten());
+            assertEquals(3, stats.getRecordWritten());       
+            
+            assertEquals("A,B,C",
+                    ((JobStats)stats.getJobGraph().getSinks().get(0)).getAlias());
+        } finally {
+            new File(PIG_FILE).delete();
+            Util.deleteFile(cluster, OUTPUT_FILE);
+        }
+    }
+    
+    @Test
+    public void orderByTest() throws Exception {
+        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
+        w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
+        w.println("B = order A by a0;");
+        w.println("C = limit B 2;");
+        w.println("store C into '" + OUTPUT_FILE + "';");
+        w.close();
+        String[] args = { PIG_FILE };
+        try {
+            PigStats stats = PigRunner.run(args);
+            assertTrue(stats.isSuccessful());
+            assertTrue(stats.getJobGraph().size() == 3);
+            assertTrue(stats.getJobGraph().getSinks().size() == 1);
+            assertTrue(stats.getJobGraph().getSources().size() == 1);
+            JobStats js = (JobStats) stats.getJobGraph().getSinks().get(0);
+            assertEquals(OUTPUT_FILE, js.getOutputs().get(0).getName());
+            assertEquals(2, js.getOutputs().get(0).getNumberRecords());
+            assertEquals(12, js.getOutputs().get(0).getBytes());
+            assertEquals(OUTPUT_FILE, stats.getOutputNames().get(0));
+            assertEquals(2, stats.getRecordWritten());
+            assertEquals(12, stats.getBytesWritten());
+            
+            assertEquals("A", ((JobStats) stats.getJobGraph().getSources().get(
+                    0)).getAlias());
+            assertEquals("B", ((JobStats) stats.getJobGraph().getPredecessors(
+                    js).get(0)).getAlias());
+            assertEquals("B", js.getAlias()); 
+        } finally {
+            new File(PIG_FILE).delete();
+            Util.deleteFile(cluster, OUTPUT_FILE);
+        }
+    }
+    
+    @Test
+    public void simpleMultiQueryTest() throws Exception {
+        final String OUTPUT_FILE_2 = "output2";
+        
+        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
+        w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
+        w.println("B = filter A by a0 >= 4;");
+        w.println("C = filter A by a0 < 4;");
+        w.println("store B into '" + OUTPUT_FILE_2 + "';");
+        w.println("store C into '" + OUTPUT_FILE + "';");
+        w.close();
+        
+        try {
+            String[] args = { PIG_FILE };
+            PigStats stats = PigRunner.run(args);
+            assertTrue(stats.isSuccessful());
+            assertTrue(stats.getJobGraph().size() == 1);
+            assertEquals(5, stats.getRecordWritten());
+            assertEquals(28, stats.getBytesWritten());
+            assertTrue(stats.getOutputNames().size() == 2);
+            for (String fname : stats.getOutputNames()) {
+                assertTrue(fname.equals(OUTPUT_FILE) || fname.equals(OUTPUT_FILE_2));
+                if (fname.equals(OUTPUT_FILE)) {
+                    assertEquals(3, stats.getNumberRecords(fname));
+                } else {
+                    assertEquals(2, stats.getNumberRecords(fname));
+                }                
+            }            
+            assertEquals("A,B,C",
+                    ((JobStats)stats.getJobGraph().getSinks().get(0)).getAlias());
+        } finally {
+            new File(PIG_FILE).delete();
+            Util.deleteFile(cluster, OUTPUT_FILE);
+            Util.deleteFile(cluster, OUTPUT_FILE_2);
+        }
+    }
+    
+    @Test
+    public void simpleMultiQueryTest2() throws Exception {
+        final String OUTPUT_FILE_2 = "output2";
+        
+        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
+        w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
+        w.println("B = filter A by a0 >= 4;");
+        w.println("C = filter A by a0 < 4;");
+        w.println("D = group C by a0;");
+        w.println("E = foreach D generate group, COUNT(C);");
+        w.println("store B into '" + OUTPUT_FILE_2 + "';");
+        w.println("store E into '" + OUTPUT_FILE + "';");
+        w.close();
+        
+        try {
+            String[] args = { PIG_FILE };
+            PigStats stats = PigRunner.run(args);
+            assertTrue(stats.isSuccessful());
+            assertTrue(stats.getJobGraph().size() == 1);
+            assertEquals(4, stats.getRecordWritten());           
+            assertEquals(18, stats.getBytesWritten());
+            assertTrue(stats.getOutputNames().size() == 2);
+            for (String fname : stats.getOutputNames()) {               
+                assertTrue(fname.equals(OUTPUT_FILE) || fname.equals(OUTPUT_FILE_2));
+                if (fname.equals(OUTPUT_FILE)) {
+                    assertEquals(2, stats.getNumberRecords(fname));
+                } else {
+                    assertEquals(2, stats.getNumberRecords(fname));
+                }                
+            }           
+            assertEquals("A,B,C,D,E",
+                    ((JobStats)stats.getJobGraph().getSinks().get(0)).getAlias());
+        } finally {
+            new File(PIG_FILE).delete();
+            Util.deleteFile(cluster, OUTPUT_FILE);
+            Util.deleteFile(cluster, OUTPUT_FILE_2);
+        }
+    }
+    
+    @Test
+    public void simpleNegativeTest() throws Exception {
+        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
+        w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
+        w.println("B = group A by a;");
+        w.println("C = foreach B generate group, COUNT(A);");
+        w.println("store C into '" + OUTPUT_FILE + "';");
+        w.close();
+        String[] args = { "-c", PIG_FILE };
+        PigStats stats = PigRunner.run(args);
+        assertTrue(stats.getReturnCode() == ReturnCode.PIG_EXCEPTION);
+        assertTrue(stats.getErrorCode() == 1000);
+        assertEquals("Error during parsing. Invalid alias: a in {a0: int,a1: int,a2: int}", 
+                stats.getErrorMessage());
+    }
+    
+    @Test
+    public void simpleNegativeTest2() throws Exception {
+        String[] args = { "-c", "-e", "this is a test" };
+        PigStats stats = PigRunner.run(args);        
+        assertTrue(stats.getReturnCode() == ReturnCode.ILLEGAL_ARGS);
+    }
+
+    @Test
+    public void simpleNegativeTest3() throws Exception {
+        String[] args = { "-c", "-y" };
+        PigStats stats = PigRunner.run(args);     
+        assertTrue(stats.getReturnCode() == ReturnCode.PARSE_EXCEPTION);
+        assertEquals("Found unknown option (-y) at position 2", 
+                stats.getErrorMessage());
+    }
+     
+    @Test
+    public void NagetiveTest() throws Exception {
+        final String OUTPUT_FILE_2 = "output2";
+        PrintWriter w = new PrintWriter(new FileWriter(PIG_FILE));
+        w.println("A = load '" + INPUT_FILE + "' as (a0:int, a1:int, a2:int);");
+        w.println("B = foreach A generate 1;");
+        w.println("C = foreach A generate 0/0;");
+        w.println("store B into '" + OUTPUT_FILE + "';");
+        w.println("store C into '" + OUTPUT_FILE_2 + "';");
+        w.println("D = load '" + OUTPUT_FILE_2 + "';");
+        w.println("E = stream D through `false`;");
+        w.println("store E into 'ee';");
+        w.close(); 
+        
+        try {
+            String[] args = { PIG_FILE };
+            PigStats stats = PigRunner.run(args); 
+            System.out.println("++++ code: " + stats.getReturnCode());
+            assertTrue(!stats.isSuccessful());            
+            assertTrue(stats.getReturnCode() == ReturnCode.PARTIAL_FAILURE);
+            assertTrue(stats.getJobGraph().size() == 2);
+            JobStats job = (JobStats)stats.getJobGraph().getSources().get(0);
+            assertTrue(job.isSuccessful());
+            job = (JobStats)stats.getJobGraph().getSinks().get(0);
+            assertTrue(!job.isSuccessful());
+            assertTrue(stats.getOutputStats().size() == 3);
+            for (OutputStats output : stats.getOutputStats()) {
+                if (output.getName().equals("ee")) {
+                    assertTrue(!output.isSuccessful());
+                } else {
+                    assertTrue(output.isSuccessful());
+                }
+            }
+        } finally {
+            new File(PIG_FILE).delete();
+            Util.deleteFile(cluster, OUTPUT_FILE);
+            Util.deleteFile(cluster, OUTPUT_FILE_2);
+        }
+    }
+}

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java?rev=957277&r1=957276&r2=957277&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPigStats.java Wed Jun 23 17:29:33 2010
@@ -25,7 +25,7 @@ import junit.framework.TestCase;
 
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
-import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.backend.executionengine.ExecJob;
 import org.apache.pig.tools.pigstats.PigStats;
 
 public class TestPigStats extends TestCase {
@@ -38,10 +38,9 @@ public class TestPigStats extends TestCa
             String filePath = outputFile.getAbsolutePath();
             outputFile.delete();
             PigServer pig = new PigServer(ExecType.LOCAL);
-            pig
-                    .registerQuery("A = load 'test/org/apache/pig/test/data/passwd';");
-            PigStats stats = pig.store("A", filePath)
-                    .getStatistics();
+            pig.registerQuery("A = load 'test/org/apache/pig/test/data/passwd';");
+            ExecJob job = pig.store("A", filePath);
+            PigStats stats = job.getStatistics();
             File dataFile = new File( outputFile.getAbsoluteFile() + File.separator + "part-00000" );
             assertEquals(dataFile.length(), stats.getBytesWritten());
         } catch (IOException e) {
@@ -55,7 +54,6 @@ public class TestPigStats extends TestCa
                 deleteDirectory(outputFile);
             }
         }
-
     }
     
     private void deleteDirectory( File dir ) {