You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2009/11/24 20:54:34 UTC

svn commit: r883836 [22/23] - in /hadoop/pig/branches/load-store-redesign: ./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/ contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/ contrib/zebra/ contrib/zebr...

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLogicalPlanBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLogicalPlanBuilder.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLogicalPlanBuilder.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLogicalPlanBuilder.java Tue Nov 24 19:54:19 2009
@@ -297,7 +297,7 @@
     
     @Test
     public void testQuery22Fail() {
-        buildPlan("A = load 'a';");
+        buildPlan("A = load 'a' as (a:int, b: double);");
         try {
             buildPlan("B = group A by (*, $0);");
         } catch (AssertionFailedError e) {
@@ -329,15 +329,50 @@
 
     @Test
     public void testQuery23Fail() {
+        buildPlan("A = load 'a' as (a: int, b:double);");
+        buildPlan("B = load 'b';");
+        boolean exceptionThrown = false;
+        try {
+            buildPlan("C = cogroup A by (*, $0), B by ($0, $1);");
+        } catch (AssertionFailedError e) {
+            assertTrue(e.getMessage().contains("The arity of cogroup/group by columns " +
+                        "do not match"));
+            exceptionThrown = true;
+        }
+        assertTrue(exceptionThrown);
+    }
+
+    @Test
+    public void testQuery23Fail2() {
         buildPlan("A = load 'a';");
         buildPlan("B = load 'b';");
+        boolean exceptionThrown = false;
         try {
-            buildPlan("C = group A by (*, $0), B by ($0, $1);");
+            buildPlan("C = cogroup A by (*, $0), B by ($0, $1);");
         } catch (AssertionFailedError e) {
-            assertTrue(e.getMessage().contains("Grouping attributes can either be star (*"));
+            assertTrue(e.getMessage().contains("Cogroup/Group by * is only allowed if " +
+            "the input has a schema"));
+            exceptionThrown = true;
         }
+        assertTrue(exceptionThrown);
+    }
+    
+    @Test
+    public void testQuery23Fail3() {
+        buildPlan("A = load 'a' as (a: int, b:double);");
+        buildPlan("B = load 'b' as (a:int);");
+        boolean exceptionThrown = false;
+        try {
+            buildPlan("C = cogroup A by *, B by *;");
+        } catch (AssertionFailedError e) {
+            assertTrue(e.getMessage().contains("The arity of cogroup/group by columns " +
+                        "do not match"));
+            exceptionThrown = true;
+        }
+        assertTrue(exceptionThrown);
     }
 
+    
     @Test
     public void testQuery24() {
         buildPlan("a = load 'a';");
@@ -1591,7 +1626,7 @@
     }
 
     @Test
-    public void testQuery110()  throws FrontendException, ParseException {
+    public void testQuery110Fail()  throws FrontendException, ParseException {
         LogicalPlan lp;
         LOLoad load;
         LOCogroup cogroup;
@@ -1600,13 +1635,16 @@
         lp = buildPlan("b = load 'two';");
 
         load = (LOLoad) lp.getLeaves().get(0);
-
+        boolean exceptionThrown = false;
+        try{
         lp = buildPlan("c = cogroup a by $0, b by *;");
-        cogroup = (LOCogroup) lp.getLeaves().get(0);
-
-        MultiMap<LogicalOperator, LogicalPlan> mapGByPlans = cogroup.getGroupByPlans();
-        LogicalPlan cogroupPlan = (LogicalPlan)(mapGByPlans.get(load).toArray())[0];
-        assertTrue(checkPlanForProjectStar(cogroupPlan) == true);
+        } catch(AssertionFailedError e) {
+            assertTrue(e.getMessage().contains("Cogroup/Group by * is only allowed if " +
+                    "the input has a schema"));
+            exceptionThrown = true;
+        }
+        assertTrue(exceptionThrown);
+        
 
     }
 
@@ -2051,6 +2089,37 @@
         fail();
     }
 
+    @Test
+    public void testCogroupByStarFailure1() {
+        boolean exceptionThrown = false;
+        try {
+            buildPlan(" a = load '1.txt' as (a0:int, a1:int);");
+            buildPlan(" b = load '2.txt'; ");
+            buildPlan("c = cogroup a by *, b by *;");
+        } catch (AssertionFailedError e) {
+            assertTrue(e.getMessage().contains("Cogroup/Group by * is only allowed if " +
+                    "the input has a schema"));
+            exceptionThrown = true;
+        }
+        assertEquals("An exception was expected but did " +
+                "not occur", true, exceptionThrown);
+    }
+
+    @Test
+    public void testCogroupByStarFailure2() {
+        boolean exceptionThrown = false;
+        try {
+            buildPlan(" a = load '1.txt' ;");
+            buildPlan(" b = load '2.txt' as (b0:int, b1:int); ");
+            buildPlan("c = cogroup a by *, b by *;");
+        } catch (AssertionFailedError e) {
+            assertTrue(e.getMessage().contains("Cogroup/Group by * is only allowed if " +
+            "the input has a schema"));
+            exceptionThrown = true;
+        }
+        assertEquals("An exception was expected but did " +
+                "not occur", true, exceptionThrown);
+    }
     private void printPlan(LogicalPlan lp) {
         LOPrinter graphPrinter = new LOPrinter(System.err, lp);
         System.err.println("Printing the logical plan");
@@ -2131,5 +2200,5 @@
     Map<OperatorKey, LogicalOperator> logicalOpTable = new HashMap<OperatorKey, LogicalOperator>();
     Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>();
     Map<String, String> fileNameMap = new HashMap<String, String>();
-    PigContext pigContext = new PigContext(ExecType.LOCAL, new Properties());
+    PigContext pigContext = new PigContext(ExecType.MAPREDUCE, new Properties());
 }

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java Tue Nov 24 19:54:19 2009
@@ -21,6 +21,7 @@
 import java.io.File;
 import java.io.FileWriter;
 import java.io.IOException;
+import java.io.OutputStream;
 import java.io.PrintWriter;
 import java.io.StringReader;
 import java.util.ArrayList;
@@ -33,19 +34,33 @@
 import junit.framework.Assert;
 import junit.framework.TestCase;
 
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.util.Progressable;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
-import org.apache.pig.backend.executionengine.util.ExecTools;
+import org.apache.pig.ResourceSchema;
+import org.apache.pig.StoreConfig;
+import org.apache.pig.StoreFunc;
 import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.backend.executionengine.util.ExecTools;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.data.BagFactory;
+import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
@@ -53,14 +68,12 @@
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.plan.Operator;
 import org.apache.pig.impl.plan.OperatorPlan;
-import org.apache.pig.tools.grunt.GruntParser;
 import org.apache.pig.impl.util.LogUtils;
+import org.apache.pig.tools.grunt.GruntParser;
 import org.apache.pig.tools.pigscript.parser.ParseException;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.mapred.jobcontrol.Job;
 
 public class TestMultiQuery extends TestCase {
 
@@ -79,6 +92,285 @@
     public void tearDown() throws Exception {
         myPig = null;
     }
+    
+    @Test
+    public void testMultiQueryJiraPig1060() {
+
+        // test case: 
+
+        String INPUT_FILE = "pig-1060.txt";
+
+        try {
+
+            PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+            w.println("apple\t2");
+            w.println("apple\t12");
+            w.println("orange\t3");
+            w.println("orange\t23");
+            w.println("strawberry\t10");
+            w.println("strawberry\t34");
+
+            w.close();
+
+            Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+
+            myPig.setBatchOn();
+
+            myPig.registerQuery("data = load '" + INPUT_FILE +
+                                "' as (name:chararray, gid:int);");
+            myPig.registerQuery("f1 = filter data by gid < 5;");
+            myPig.registerQuery("g1 = group f1 by name;");
+            myPig.registerQuery("p1 = foreach g1 generate group, COUNT(f1.gid);");
+            myPig.registerQuery("store p1 into '/tmp/output1';");
+
+            myPig.registerQuery("f2 = filter data by gid > 5;");
+            myPig.registerQuery("g2 = group f2 by name;");
+            myPig.registerQuery("p2 = foreach g2 generate group, COUNT(f2.gid);");
+            myPig.registerQuery("store p2 into '/tmp/output2';");
+
+            myPig.registerQuery("f3 = filter f2 by gid > 10;");
+            myPig.registerQuery("g3 = group f3 by name;");
+            myPig.registerQuery("p3 = foreach g3 generate group, COUNT(f3.gid);");
+            myPig.registerQuery("store p3 into '/tmp/output3';");
+
+            myPig.registerQuery("f4 = filter f3 by gid < 20;");
+            myPig.registerQuery("g4 = group f4 by name;");
+            myPig.registerQuery("p4 = foreach g4 generate group, COUNT(f4.gid);");
+            myPig.registerQuery("store p4 into '/tmp/output4';");
+
+            LogicalPlan lp = checkLogicalPlan(1, 4, 27);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 4, 35);
+
+            checkMRPlan(pp, 1, 1, 1);
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } finally {
+            new File(INPUT_FILE).delete();
+            try {
+                Util.deleteFile(cluster, INPUT_FILE);
+            } catch (IOException e) {
+                e.printStackTrace();
+                Assert.fail();
+            }
+        }
+    }
+     
+    @Test
+    public void testMultiQueryJiraPig1060_2() {
+
+        // test case: 
+
+        String INPUT_FILE = "pig-1060.txt";
+
+        try {
+
+            PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+            w.println("apple\t2");
+            w.println("apple\t12");
+            w.println("orange\t3");
+            w.println("orange\t23");
+            w.println("strawberry\t10");
+            w.println("strawberry\t34");
+
+            w.close();
+
+            Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+
+            myPig.setBatchOn();
+
+            myPig.registerQuery("data = load '" + INPUT_FILE +
+            "' as (name:chararray, gid:int);");
+            myPig.registerQuery("f1 = filter data by gid < 5;");
+            myPig.registerQuery("g1 = group f1 by name;");
+            myPig.registerQuery("p1 = foreach g1 generate group, COUNT(f1.gid);");
+            myPig.registerQuery("store p1 into '/tmp/output1';");
+
+            myPig.registerQuery("f2 = filter data by gid > 5;");
+            myPig.registerQuery("g2 = group f2 by name;");
+            myPig.registerQuery("p2 = foreach g2 generate group, COUNT(f2.gid);");
+            myPig.registerQuery("store p2 into '/tmp/output2';");
+
+            myPig.registerQuery("f3 = filter f2 by gid > 10;");
+            myPig.registerQuery("g3 = group f3 by name;");
+            myPig.registerQuery("p3 = foreach g3 generate group, COUNT(f3.gid);");
+            myPig.registerQuery("store p3 into '/tmp/output3';");
+
+            myPig.registerQuery("f4 = filter f3 by gid < 20;");
+            myPig.registerQuery("g4 = group f4 by name;");
+            myPig.registerQuery("p4 = foreach g4 generate group, COUNT(f4.gid);");
+            myPig.registerQuery("store p4 into '/tmp/output4';");
+
+            List<ExecJob> jobs = myPig.executeBatch();
+            assertEquals(4, jobs.size());
+
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } finally {
+            new File(INPUT_FILE).delete();
+            try {
+                Util.deleteFile(cluster, INPUT_FILE);
+            } catch (IOException e) {
+                e.printStackTrace();
+                Assert.fail();
+            }
+        }
+    }
+
+    @Test
+    public void testMultiQueryJiraPig920() {
+
+        // test case: a simple diamond query
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+                                 "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = filter a by uid < 5;");
+            myPig.registerQuery("c = filter a by gid >= 5;");
+            myPig.registerQuery("d = cogroup c by $0, b by $0;");
+            myPig.registerQuery("e = foreach d generate group, COUNT(c), COUNT(b);");
+            myPig.registerQuery("store e into '/tmp/output1';");
+             
+            LogicalPlan lp = checkLogicalPlan(1, 1, 10);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 1, 13);
+
+            checkMRPlan(pp, 1, 1, 1);
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }       
+ 
+    @Test
+    public void testMultiQueryJiraPig920_1() {
+
+        // test case: a query with two diamonds
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+                                 "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = filter a by uid < 5;");
+            myPig.registerQuery("c = filter a by gid >= 5;");
+            myPig.registerQuery("d = filter a by uid >= 5;");
+            myPig.registerQuery("e = filter a by gid < 5;");
+            myPig.registerQuery("f = cogroup c by $0, b by $0;");
+            myPig.registerQuery("f1 = foreach f generate group, COUNT(c), COUNT(b);");
+            myPig.registerQuery("store f1 into '/tmp/output1';");
+            myPig.registerQuery("g = cogroup d by $0, e by $0;");
+            myPig.registerQuery("g1 = foreach g generate group, COUNT(d), COUNT(e);");
+            myPig.registerQuery("store g1 into '/tmp/output2';");
+             
+            LogicalPlan lp = checkLogicalPlan(1, 2, 17);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 23);
+
+            checkMRPlan(pp, 2, 2, 2);
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }
+
+    @Test
+    public void testMultiQueryJiraPig920_2() {
+
+        // test case: execution of a query with two diamonds
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+                                 "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = filter a by uid < 5;");
+            myPig.registerQuery("c = filter a by gid >= 5;");
+            myPig.registerQuery("d = filter a by uid >= 5;");
+            myPig.registerQuery("e = filter a by gid < 5;");
+            myPig.registerQuery("f = cogroup c by $0, b by $0;");
+            myPig.registerQuery("f1 = foreach f generate group, COUNT(c), COUNT(b);");
+            myPig.registerQuery("store f1 into '/tmp/output1';");
+            myPig.registerQuery("g = cogroup d by $0, e by $0;");
+            myPig.registerQuery("g1 = foreach g generate group, COUNT(d), COUNT(e);");
+            myPig.registerQuery("store g1 into '/tmp/output2';");
+             
+            List<ExecJob> jobs = myPig.executeBatch();
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
+
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }            
+    
+    @Test
+    public void testMultiQueryJiraPig920_3() {
+
+        // test case: execution of a simple diamond query
+        
+        String INPUT_FILE = "pig-920.txt";
+        
+        try {
+            
+            PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+            w.println("apple\tapple\t100\t10");
+            w.println("apple\tapple\t200\t20");
+            w.println("orange\torange\t100\t10");
+            w.println("orange\torange\t300\t20");
+   
+            w.close();
+            
+            Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+        
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load '" + INPUT_FILE +
+                                "' as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = filter a by uid < 300;");
+            myPig.registerQuery("c = filter a by gid > 10;");
+            myPig.registerQuery("d = cogroup c by $0, b by $0;");
+            myPig.registerQuery("e = foreach d generate group, COUNT(c), COUNT(b);");
+                                   
+            Iterator<Tuple> iter = myPig.openIterator("e");
+
+            List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                    new String[] { 
+                            "('apple',1L,2L)",
+                            "('orange',1L,1L)"
+                    });
+            
+            int counter = 0;
+            while (iter.hasNext()) {
+                assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
+            }
+
+            assertEquals(expectedResults.size(), counter);
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } finally {
+            new File(INPUT_FILE).delete();
+            try {
+                Util.deleteFile(cluster, INPUT_FILE);
+            } catch (IOException e) {
+                e.printStackTrace();
+                Assert.fail();
+            }
+            
+        }
+    }        
 
     @Test
     public void testMultiQueryWithDemoCase() {
@@ -327,6 +619,10 @@
             List<ExecJob> jobs = myPig.executeBatch();
             assertTrue(jobs.size() == 2);
 
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
+
         } catch (Exception e) {
             e.printStackTrace();
             Assert.fail();
@@ -449,7 +745,11 @@
             myPig.registerQuery("d2 = foreach d1 generate group, AVG(d.uid);");            
             myPig.registerQuery("store d2 into '/tmp/output3';");
              
-            myPig.executeBatch();
+            List<ExecJob> jobs = myPig.executeBatch();
+            
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
             
         } catch (Exception e) {
             e.printStackTrace();
@@ -481,7 +781,7 @@
 
             PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 25);
 
-            checkMRPlan(pp, 1, 1, 3);
+            checkMRPlan(pp, 1, 1, 2);
             
         } catch (Exception e) {
             e.printStackTrace();
@@ -509,8 +809,13 @@
             myPig.registerQuery("f1 = foreach f generate group, SUM(d.c::uid);");
             myPig.registerQuery("store f1 into '/tmp/output2';");
              
-            myPig.executeBatch();
+            List<ExecJob> jobs = myPig.executeBatch();
 
+            assertTrue(jobs.size() == 2);
+            
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
         } catch (Exception e) {
             e.printStackTrace();
             Assert.fail();
@@ -577,8 +882,12 @@
             myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) - MIN(d.uid);");
             myPig.registerQuery("store d2 into '/tmp/output3';");
              
-            myPig.executeBatch();
-            
+            List<ExecJob> jobs = myPig.executeBatch();
+
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
+  
         } catch (Exception e) {
             e.printStackTrace();
             Assert.fail();
@@ -645,7 +954,12 @@
             myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) - MIN(d.uid);");
             myPig.registerQuery("store d2 into '/tmp/output3';");
              
-            myPig.executeBatch();
+            List<ExecJob> jobs = myPig.executeBatch();
+            assertEquals(3, jobs.size());
+
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
             
         } catch (Exception e) {
             e.printStackTrace();
@@ -713,7 +1027,12 @@
             myPig.registerQuery("d2 = foreach d1 generate group, COUNT(d.uid);");
             myPig.registerQuery("store d2 into '/tmp/output3';");
              
-            myPig.executeBatch();
+            List<ExecJob> jobs = myPig.executeBatch();
+            assertEquals(3, jobs.size());
+
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
             
         } catch (Exception e) {
             e.printStackTrace();
@@ -826,7 +1145,12 @@
             myPig.registerQuery("H = foreach G generate group, COUNT(A1);");          
             myPig.registerQuery("store H into '/tmp/output3';");
              
-            myPig.executeBatch();
+            List<ExecJob> jobs = myPig.executeBatch();
+            assertEquals(3, jobs.size());
+
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
             
         } catch (Exception e) {
             e.printStackTrace();
@@ -892,7 +1216,12 @@
             myPig.registerQuery("g1 = foreach g generate group, COUNT(d2);");
             myPig.registerQuery("store g1 into '/tmp/output3';");
 
-            myPig.executeBatch();
+            List<ExecJob> jobs = myPig.executeBatch();
+            assertEquals(3, jobs.size());
+
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
             
         } catch (Exception e) {
             e.printStackTrace();
@@ -948,7 +1277,12 @@
             myPig.registerQuery("e = foreach d generate flatten(b), flatten(c);");
             myPig.registerQuery("store e into '/tmp/output2';");
 
-            myPig.executeBatch();
+            List<ExecJob> jobs = myPig.executeBatch();
+            assertTrue(jobs.size() == 2);
+
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
 
         } catch (Exception e) {
             e.printStackTrace();
@@ -1006,7 +1340,12 @@
             myPig.registerQuery("e = join c by gid, d by gid using \"repl\";");
             myPig.registerQuery("store e into '/tmp/output3';");
 
-            myPig.executeBatch();
+            List<ExecJob> jobs = myPig.executeBatch();
+            assertEquals(3, jobs.size());
+
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
 
         } catch (Exception e) {
             e.printStackTrace();
@@ -1115,7 +1454,12 @@
             myPig.registerQuery("b = load '/tmp/output1' using PigStorage(':'); ");
             myPig.registerQuery("store b into '/tmp/output2';");
 
-            myPig.executeBatch();
+            List<ExecJob> jobs = myPig.executeBatch();
+            assertTrue(jobs.size() == 2);
+
+            for (ExecJob job : jobs) {
+                assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+            }
 
         } catch (Exception e) {
             e.printStackTrace();
@@ -1433,7 +1777,7 @@
         }
         
     }
-    
+   
     @Test
     public void testMultiQueryWithTwoStores() {
 
@@ -1904,7 +2248,156 @@
             Assert.fail();
         } 
     }
+   
+    /**
+     * Test that pig calls checkOutputSpecs() method of the OutputFormat (if the
+     * StoreFunc defines an OutputFormat as the return value of 
+     * {@link StoreFunc#getStorePreparationClass()} 
+     * @throws IOException
+     */
+    @Test
+    public void testMultiStoreWithOutputFormat() throws IOException {
+        Util.createInputFile(cluster, "input.txt", new String[] {"hello", "bye"});
+        String query = "a = load 'input.txt';" +
+        		"b = filter a by $0 < 10;" +
+        		"store b into 'output1' using "+DummyStoreWithOutputFormat.class.getName()+"();" +
+        		"c = group a by $0;" +
+        		"d = foreach c generate group, COUNT(a.$0);" +
+        		"store d into 'output2' using "+DummyStoreWithOutputFormat.class.getName()+"();" ;
+        myPig.setBatchOn();
+        Util.registerMultiLineQuery(myPig, query);
+        myPig.executeBatch();
+        
+        // check that files were created as a result of the
+        // checkOutputSpecs() method of the OutputFormat being called
+        FileSystem fs = cluster.getFileSystem();
+        assertEquals(true, fs.exists(new Path("output1_checkOutputSpec_test")));
+        assertEquals(true, fs.exists(new Path("output2_checkOutputSpec_test")));
+        Util.deleteFile(cluster, "input.txt");
+        Util.deleteFile(cluster, "output1_checkOutputSpec_test");
+        Util.deleteFile(cluster, "output2_checkOutputSpec_test");
+    }
+
+    public static class DummyStoreWithOutputFormat implements StoreFunc {
+        
+        /**
+         * 
+         */
+        public DummyStoreWithOutputFormat() {
+            // TODO Auto-generated constructor stub
+        }
+
+        
+        /* (non-Javadoc)
+         * @see org.apache.pig.StoreFunc#putNext(org.apache.pig.data.Tuple)
+         */
+        @Override
+        public void putNext(Tuple f) throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+
+
+        /* (non-Javadoc)
+         * @see org.apache.pig.StoreFunc#checkSchema(org.apache.pig.ResourceSchema)
+         */
+        @Override
+        public void checkSchema(ResourceSchema s) throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+
+
+        /* (non-Javadoc)
+         * @see org.apache.pig.StoreFunc#getOutputFormat()
+         */
+        @Override
+        public org.apache.hadoop.mapreduce.OutputFormat getOutputFormat()
+                throws IOException {
+            return null;
+        }
+
+
+        /* (non-Javadoc)
+         * @see org.apache.pig.StoreFunc#prepareToWrite(org.apache.hadoop.mapreduce.RecordWriter)
+         */
+        @Override
+        public void prepareToWrite(
+                org.apache.hadoop.mapreduce.RecordWriter writer)
+                throws IOException {
+            
+        }
+
+
+        /* (non-Javadoc)
+         * @see org.apache.pig.StoreFunc#relToAbsPathForStoreLocation(java.lang.String, org.apache.hadoop.fs.Path)
+         */
+        @Override
+        public String relToAbsPathForStoreLocation(String location, Path curDir)
+                throws IOException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+
+        /* (non-Javadoc)
+         * @see org.apache.pig.StoreFunc#setStoreLocation(java.lang.String, org.apache.hadoop.mapreduce.Job)
+         */
+        @Override
+        public void setStoreLocation(String location, Job job)
+                throws IOException {
+            // TODO Auto-generated method stub
+            
+        }
+                
+    }
+    
+    @SuppressWarnings({ "deprecation", "unchecked" })
+    public static class DummyOutputFormat
+    extends OutputFormat<WritableComparable, Tuple> {
+
+        public DummyOutputFormat() {
+            
+        }
+
+        /* (non-Javadoc)
+         * @see org.apache.hadoop.mapreduce.OutputFormat#checkOutputSpecs(org.apache.hadoop.mapreduce.JobContext)
+         */
+        @Override
+        public void checkOutputSpecs(JobContext context) throws IOException,
+                InterruptedException {
+            StoreConfig sConfig = MapRedUtil.getStoreConfig(context.
+                    getConfiguration());
+            FileSystem fs = FileSystem.get(context.getConfiguration());
+            // create a file to test that this method got called
+            fs.create(new Path(sConfig.getLocation() + "_checkOutputSpec_test"));
+            // TODO Auto-generated method stub
+            
+        }
+        /* (non-Javadoc)
+         * @see org.apache.hadoop.mapreduce.OutputFormat#getOutputCommitter(org.apache.hadoop.mapreduce.TaskAttemptContext)
+         */
+        @Override
+        public OutputCommitter getOutputCommitter(TaskAttemptContext context)
+                throws IOException, InterruptedException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+        /* (non-Javadoc)
+         * @see org.apache.hadoop.mapreduce.OutputFormat#getRecordWriter(org.apache.hadoop.mapreduce.TaskAttemptContext)
+         */
+        @Override
+        public org.apache.hadoop.mapreduce.RecordWriter<WritableComparable, Tuple> getRecordWriter(
+                TaskAttemptContext context) throws IOException,
+                InterruptedException {
+            // TODO Auto-generated method stub
+            return null;
+        }
+        
+    }
 
+    
+    
     // --------------------------------------------------------------------------
     // Helper methods
 
@@ -2041,6 +2534,9 @@
 
         showPlanOperators(mrp);
         
+        System.out.println("===== Display map-reduce Plan =====");
+        System.out.println(mrp.toString());
+        
         Assert.assertEquals(expectedRoots, mrp.getRoots().size());
         Assert.assertEquals(expectedLeaves, mrp.getLeaves().size());
         Assert.assertEquals(expectedSize, mrp.size());

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQueryLocal.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQueryLocal.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQueryLocal.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQueryLocal.java Tue Nov 24 19:54:19 2009
@@ -32,11 +32,6 @@
 import org.apache.pig.PigException;
 import org.apache.pig.PigServer;
 import org.apache.pig.backend.executionengine.ExecJob;
-import org.apache.pig.backend.executionengine.util.ExecTools;
-import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRCompiler;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
@@ -86,7 +81,7 @@
             LogicalPlan lp = checkLogicalPlan(1, 2, 9);
 
             // XXX Physical plan has one less node in the local case
-            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 12);
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 2, 11);
 
             Assert.assertTrue(executePlan(pp));
 
@@ -186,7 +181,7 @@
 
             LogicalPlan lp = checkLogicalPlan(1, 3, 14);
 
-            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 17);
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 14);
 
             Assert.assertTrue(executePlan(pp));
 
@@ -248,7 +243,7 @@
             LogicalPlan lp = checkLogicalPlan(2, 3, 16);
 
             // XXX the total number of ops is one less in the local case
-            PhysicalPlan pp = checkPhysicalPlan(lp, 2, 3, 21);
+            PhysicalPlan pp = checkPhysicalPlan(lp, 2, 3, 19);
 
             Assert.assertTrue(executePlan(pp));
 
@@ -459,7 +454,7 @@
             myPig.registerQuery("store c into '/tmp/output5';");
 
             LogicalPlan lp = checkLogicalPlan(1, 3, 12);
-            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 19);
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 15);
 
             myPig.executeBatch();
             myPig.discardBatch(); 
@@ -536,7 +531,7 @@
     private PhysicalPlan checkPhysicalPlan(LogicalPlan lp, int expectedRoots,
             int expectedLeaves, int expectedSize) throws IOException {
 
-        System.out.println("===== check physical plan =====");
+        System.out.println("===== check physical plan =====");        
 
         PhysicalPlan pp = myPig.getPigContext().getExecutionEngine().compile(
                 lp, null);
@@ -565,16 +560,38 @@
     }
 
     private void deleteOutputFiles() {
-        try {
-            FileLocalizer.delete("/tmp/output1", myPig.getPigContext());
-            FileLocalizer.delete("/tmp/output2", myPig.getPigContext());
-            FileLocalizer.delete("/tmp/output3", myPig.getPigContext());
-            FileLocalizer.delete("/tmp/output4", myPig.getPigContext());
-            FileLocalizer.delete("/tmp/output5", myPig.getPigContext());
+        String outputFiles[] = { "/tmp/output1",
+                                 "/tmp/output2",
+                                 "/tmp/output3",
+                                 "/tmp/output4",
+                                 "/tmp/output5"
+                };
+        try {
+            for( String outputFile : outputFiles ) {
+                if( isDirectory(outputFile) ) {
+                    deleteDir( new File( outputFile ) );
+                } else {
+                    FileLocalizer.delete(outputFile, myPig.getPigContext());
+                }    
+            }            
         } catch (IOException e) {
             e.printStackTrace();
             Assert.fail();
         }
     }
+    
+    private void deleteDir( File file ) {
+        if( file.isDirectory() && file.listFiles().length != 0 ) {
+            for( File innerFile : file.listFiles() ) {
+                deleteDir( innerFile );
+            }
+        }
+        file.delete();
+    }
+    
+    private boolean isDirectory( String filepath ) {
+        File file = new File( filepath );
+        return file.isDirectory();
+    }
 
 }

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigContext.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigContext.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigContext.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigContext.java Tue Nov 24 19:54:19 2009
@@ -46,8 +46,8 @@
 public class TestPigContext extends TestCase {
 
     private static final String TMP_DIR_PROP = "/tmp/hadoop-hadoop";
-    private static final String FS_NAME = "machine:9000";
-    private static final String JOB_TRACKER = "machine:9001";
+    private static final String FS_NAME = "file:///";
+    private static final String JOB_TRACKER = "local";
 
     private File input;
     private PigContext pigContext;
@@ -68,7 +68,7 @@
         PigServer pigServer = new PigServer(pigContext);
         registerAndStore(pigServer);
         
-        check_asserts();
+        check_asserts(pigServer);
     }
 
     /**
@@ -79,7 +79,7 @@
         PigServer pigServer = new PigServer(ExecType.LOCAL, getProperties());
         registerAndStore(pigServer);
         
-        check_asserts();
+        check_asserts(pigServer);
     }
 
     /**
@@ -91,7 +91,7 @@
         PigServer pigServer = new PigServer(pigContext);
         registerAndStore(pigServer);
         
-        check_asserts();
+        check_asserts(pigServer);
     }
     
     @Test
@@ -218,7 +218,7 @@
     }
 
     private void registerAndStore(PigServer pigServer) throws IOException {
-        pigServer.debugOn();
+        // pigServer.debugOn();
         List<String> commands = getCommands();
         for (final String command : commands) {
             pigServer.registerQuery(command);
@@ -226,9 +226,9 @@
         pigServer.store("counts", input.getAbsolutePath() + ".out");
     }
 
-    private void check_asserts() {
-        assertEquals(JOB_TRACKER, pigContext.getProperties().getProperty("mapred.job.tracker"));
-        assertEquals(FS_NAME, pigContext.getProperties().getProperty("fs.default.name"));
-        assertEquals(TMP_DIR_PROP, pigContext.getProperties().getProperty("hadoop.tmp.dir"));
+    private void check_asserts(PigServer pigServer) {
+        assertEquals(JOB_TRACKER, pigServer.getPigContext().getProperties().getProperty("mapred.job.tracker"));
+        assertEquals(FS_NAME, pigServer.getPigContext().getProperties().getProperty("fs.default.name"));
+        assertEquals(TMP_DIR_PROP, pigServer.getPigContext().getProperties().getProperty("hadoop.tmp.dir"));
     }
 }

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigServer.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigServer.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigServer.java Tue Nov 24 19:54:19 2009
@@ -103,6 +103,7 @@
             throws IOException {
         assertFalse((new File(name)).canRead());
         
+        System.err. println("Location: " + location);
         assertTrue((new File(location)).mkdirs());
         
         assertTrue((new File(location + FILE_SEPARATOR + name)).

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigStats.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigStats.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigStats.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigStats.java Tue Nov 24 19:54:19 2009
@@ -25,6 +25,7 @@
 
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
+import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.tools.pigstats.PigStats;
 
 public class TestPigStats extends TestCase {
@@ -34,19 +35,38 @@
         File outputFile = null;
         try {
             outputFile = File.createTempFile("JIAR_1027", ".out");
+            String filePath = outputFile.getAbsolutePath();
+            outputFile.delete();
             PigServer pig = new PigServer(ExecType.LOCAL);
             pig
                     .registerQuery("A = load 'test/org/apache/pig/test/data/passwd';");
-            PigStats stats = pig.store("A", outputFile.getAbsolutePath())
+            PigStats stats = pig.store("A", filePath)
                     .getStatistics();
-            assertEquals(outputFile.length(), stats.getBytesWritten());
+            File dataFile = new File( outputFile.getAbsoluteFile() + File.separator + "part-00000" );
+            assertEquals(dataFile.length(), stats.getBytesWritten());
         } catch (IOException e) {
+            e.printStackTrace();
+            System.err.println( e.getMessage() );
             fail("IOException happened");
         } finally {
             if (outputFile != null) {
-                outputFile.delete();
+                // Hadoop Local mode creates a directory
+                // Hence we need to delete a directory recursively
+                deleteDirectory(outputFile);
             }
         }
 
     }
+    
+    private void deleteDirectory( File dir ) {
+        File[] files = dir.listFiles();
+        for( File file : files ) {
+            if( file.isDirectory() ) {
+                deleteDirectory(file);
+            } else {
+                file.delete();
+            }
+        }
+        dir.delete();
+    }
 }

Added: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigStorage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigStorage.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigStorage.java (added)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigStorage.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import static org.apache.pig.ExecType.MAPREDUCE;
+import static org.junit.Assert.*;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.Iterator;
+
+import junit.framework.Assert;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.Tuple;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestPigStorage {
+        
+    protected final Log log = LogFactory.getLog(getClass());
+    
+    private static MiniCluster cluster = MiniCluster.buildCluster();
+    private static PigServer pigServer = null;
+    
+    
+    @BeforeClass
+    public static void setup() {
+        try {
+            pigServer = new PigServer(MAPREDUCE, cluster.getProperties());
+        } catch (ExecException e) {
+            e.printStackTrace();
+            Assert.fail();
+        }
+    }
+    
+    @AfterClass
+    public static void shutdown() {
+        pigServer.shutdown();
+    }
+    
+    @Test
+    public void testBlockBoundary() {
+        
+        // This tests PigStorage loader with records exectly 
+        // on the boundary of the file blocks.
+        String[] inputs = {
+                "abcdefgh1", "abcdefgh2", "abcdefgh3", 
+                "abcdefgh4", "abcdefgh5", "abcdefgh6",
+                "abcdefgh7", "abcdefgh8", "abcdefgh9"
+        };
+        
+        String[] expected = {
+                "(abcdefgh1)", "(abcdefgh2)", "(abcdefgh3)", 
+                "(abcdefgh4)", "(abcdefgh5)", "(abcdefgh6)",
+                "(abcdefgh7)", "(abcdefgh8)", "(abcdefgh9)"
+        };
+        
+        System.setProperty("pig.overrideBlockSize", "20");
+        
+        String INPUT_FILE = "tmp.txt";
+        
+        try {
+                                    
+            PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+            for (String s : inputs) {
+                w.println(s);
+            }
+            w.close();
+            
+            Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+            
+            pigServer.registerQuery("a = load 'file:" + INPUT_FILE + "';");
+            
+            Iterator<Tuple> iter = pigServer.openIterator("a");
+            int counter = 0;
+            while (iter.hasNext()){
+                assertEquals(expected[counter++].toString(), iter.next().toString());
+            }
+            
+            assertEquals(expected.length, counter);
+        
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } finally {
+            new File(INPUT_FILE).delete();
+            try {
+                Util.deleteFile(cluster, INPUT_FILE);
+            } catch (IOException e) {
+                e.printStackTrace();
+                Assert.fail();
+            }
+        }
+    } 
+
+}

Added: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestSecondarySort.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestSecondarySort.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestSecondarySort.java (added)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestSecondarySort.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,498 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.executionengine.ExecJob;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.SecondaryKeyOptimizer;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.test.utils.LogicalPlanTester;
+import org.junit.Before;
+
+public class TestSecondarySort extends TestCase {
+    MiniCluster cluster = MiniCluster.buildCluster();
+    private PigServer pigServer;
+
+    static PigContext pc;
+    static{
+        pc = new PigContext(ExecType.MAPREDUCE,MiniCluster.buildCluster().getProperties());
+        try {
+            pc.connect();
+        } catch (ExecException e) {
+            e.printStackTrace();
+        }
+    }
+
+    @Before
+    @Override
+    public void setUp() throws Exception{
+        FileLocalizer.setR(new Random());
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+    }
+
+    public void testDistinctOptimization1() throws Exception{
+        // Limit in the foreach plan
+        LogicalPlanTester planTester = new LogicalPlanTester() ;
+        planTester.buildPlan("A = LOAD 'input1' AS (a0, a1, a2);");
+        planTester.buildPlan("B = LOAD 'input2' AS (b0, b1, b2);");
+        planTester.buildPlan("C = cogroup A by a0, B by b0;");
+        planTester.buildPlan("D = foreach C { E = limit A 10; F = E.a1; G = DISTINCT F; generate group, COUNT(G);};");
+        
+        LogicalPlan lp = planTester.buildPlan("store D into '/tmp';");
+        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+        
+        SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+        so.visit();
+        
+        assertTrue(so.getNumMRUseSecondaryKey()==1);
+        assertTrue(so.getNumSortRemoved()==0);
+        assertTrue(so.getDistinctChanged()==1);
+    }
+    
+    public void testDistinctOptimization2() throws Exception{
+        // Distinct on one entire input 
+        LogicalPlanTester planTester = new LogicalPlanTester() ;
+        planTester.buildPlan("A = LOAD 'input1' AS (a0, a1, a2);");
+        planTester.buildPlan("B = group A by $0;");
+        planTester.buildPlan("C = foreach B { D = distinct A; generate group, D;};");
+        
+        LogicalPlan lp = planTester.buildPlan("store C into '/tmp';");
+        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+        
+        SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+        so.visit();
+        
+        assertTrue(so.getNumMRUseSecondaryKey()==1);
+        assertTrue(so.getNumSortRemoved()==0);
+        assertTrue(so.getDistinctChanged()==1);
+    }
+    
+    public void testDistinctOptimization3() throws Exception{
+        // Distinct on the prefix of main sort key
+        LogicalPlanTester planTester = new LogicalPlanTester() ;
+        planTester.buildPlan("A = LOAD 'input1' AS (a0, a1, a2);");
+        planTester.buildPlan("B = group A by $0;");
+        planTester.buildPlan("C = foreach B { D = A.a0; E = distinct D; generate group, E;};");
+        
+        LogicalPlan lp = planTester.buildPlan("store C into '/tmp';");
+        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+        
+        SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+        so.visit();
+        
+        assertTrue(so.getNumMRUseSecondaryKey()==0);
+        assertTrue(so.getNumSortRemoved()==0);
+        assertTrue(so.getDistinctChanged()==1);
+    }
+    
+    public void testDistinctOptimization4() throws Exception{
+        // Distinct on secondary key again, should remove
+        LogicalPlanTester planTester = new LogicalPlanTester() ;
+        planTester.buildPlan("A = LOAD 'input1' AS (a0, a1, a2);");
+        planTester.buildPlan("B = group A by $0;");
+        planTester.buildPlan("C = foreach B { D = A.a1; E = distinct D; F = distinct E; generate group, F;};");
+        
+        LogicalPlan lp = planTester.buildPlan("store C into '/tmp';");
+        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+        
+        SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+        so.visit();
+        
+        assertTrue(so.getNumMRUseSecondaryKey()==1);
+        assertTrue(so.getNumSortRemoved()==0);
+        assertTrue(so.getDistinctChanged()==2);
+    }
+    
+    public void testDistinctOptimization5() throws Exception{
+        // Filter in foreach plan
+        LogicalPlanTester planTester = new LogicalPlanTester() ;
+        planTester.buildPlan("A = LOAD 'input1' AS (a0, a1, a2);");
+        planTester.buildPlan("B = group A by $0;");
+        planTester.buildPlan("C = foreach B { D = A.a1; E = distinct D; F = filter E by $0=='1'; generate group, F;};");
+        
+        LogicalPlan lp = planTester.buildPlan("store C into '/tmp';");
+        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+        
+        SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+        so.visit();
+        
+        assertTrue(so.getNumMRUseSecondaryKey()==1);
+        assertTrue(so.getNumSortRemoved()==0);
+        assertTrue(so.getDistinctChanged()==1);
+    }
+    
+    public void testDistinctOptimization6() throws Exception{
+        // group by * with no schema, and distinct key is not part of main key
+        LogicalPlanTester planTester = new LogicalPlanTester() ;
+        planTester.buildPlan("A = LOAD 'input1';");
+        planTester.buildPlan("B = group A by *;");
+        planTester.buildPlan("C = foreach B { D = limit A 10; E = D.$1; F = DISTINCT E; generate group, COUNT(F);};");
+        
+        LogicalPlan lp = planTester.buildPlan("store C into '/tmp';");
+        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+        
+        SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+        so.visit();
+        
+        assertTrue(so.getNumMRUseSecondaryKey()==1);
+        assertTrue(so.getNumSortRemoved()==0);
+        assertTrue(so.getDistinctChanged()==1);
+    }
+
+    public void testDistinctOptimization7() throws Exception{
+        // group by * with no schema, distinct key is more specific than the main key
+        LogicalPlanTester planTester = new LogicalPlanTester() ;
+        planTester.buildPlan("A = LOAD 'input1';");
+        planTester.buildPlan("B = group A by *;");
+        planTester.buildPlan("C = foreach B { D = limit A 10; E = D.$0; F = DISTINCT E; generate group, COUNT(F);};");
+        
+        LogicalPlan lp = planTester.buildPlan("store C into '/tmp';");
+        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+        
+        SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+        so.visit();
+        
+        assertTrue(so.getNumMRUseSecondaryKey()==1);
+        assertTrue(so.getNumSortRemoved()==0);
+        assertTrue(so.getDistinctChanged()==1);
+    }
+    
+    public void testDistinctOptimization8() throws Exception{
+        // local arrange plan is an expression
+        LogicalPlanTester planTester = new LogicalPlanTester() ;
+        planTester.buildPlan("A = LOAD 'input1' AS (a0, a1, a2);");
+        planTester.buildPlan("B = group A by $0+$1;");
+        planTester.buildPlan("C = foreach B { D = limit A 10; E = D.$0; F = DISTINCT E; generate group, COUNT(F);};");
+        
+        LogicalPlan lp = planTester.buildPlan("store C into '/tmp';");
+        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+        
+        SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+        so.visit();
+        
+        assertTrue(so.getNumMRUseSecondaryKey()==1);
+        assertTrue(so.getNumSortRemoved()==0);
+        assertTrue(so.getDistinctChanged()==1);
+    }
+    
+    public void testDistinctOptimization9() throws Exception{
+        // local arrange plan is nested project
+        LogicalPlanTester planTester = new LogicalPlanTester() ;
+        planTester.buildPlan("A = LOAD 'input1' as (a:tuple(a0:int, a1:chararray));");
+        planTester.buildPlan("B = group A by a.a1;");
+        planTester.buildPlan("C = foreach B { D = A.a; E = DISTINCT D; generate group, COUNT(E);};");
+        
+        LogicalPlan lp = planTester.buildPlan("store C into '/tmp';");
+        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+        
+        SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+        so.visit();
+        
+        assertTrue(so.getNumMRUseSecondaryKey()==1);
+        assertTrue(so.getNumSortRemoved()==0);
+        assertTrue(so.getDistinctChanged()==1);
+    }
+    
+    public void testSortOptimization1() throws Exception{
+        // Sort on something other than the main key
+        LogicalPlanTester planTester = new LogicalPlanTester() ;
+        planTester.buildPlan("A = LOAD 'input1' AS (a0, a1, a2);");
+        planTester.buildPlan("B = group A by $0;");
+        planTester.buildPlan("C = foreach B { D = limit A 10; E = order D by $1; generate group, E;};");
+        
+        LogicalPlan lp = planTester.buildPlan("store C into '/tmp';");
+        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+        
+        SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+        so.visit();
+        
+        assertTrue(so.getNumMRUseSecondaryKey()==1);
+        assertTrue(so.getNumSortRemoved()==1);
+        assertTrue(so.getDistinctChanged()==0);
+    }
+    
+    public void testSortOptimization2() throws Exception{
+        // Sort on the prefix of the main key
+        LogicalPlanTester planTester = new LogicalPlanTester() ;
+        planTester.buildPlan("A = LOAD 'input1' AS (a0, a1, a2);");
+        planTester.buildPlan("B = group A by $0;");
+        planTester.buildPlan("C = foreach B { D = limit A 10; E = order D by $0; generate group, E;};");
+        
+        LogicalPlan lp = planTester.buildPlan("store C into '/tmp';");
+        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+        
+        SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+        so.visit();
+        
+        assertTrue(so.getNumMRUseSecondaryKey()==0);
+        assertTrue(so.getNumSortRemoved()==1);
+        assertTrue(so.getDistinctChanged()==0);
+    }
+    
+    public void testSortOptimization3() throws Exception{
+        // Sort on the main key prefix / non main key prefix mixed
+        LogicalPlanTester planTester = new LogicalPlanTester() ;
+        planTester.buildPlan("A = LOAD 'input1' AS (a0, a1, a2);");
+        planTester.buildPlan("B = group A by $0;");
+        planTester.buildPlan("C = foreach B { D = limit A 10; E = order D by $1; F = order E by $0; generate group, F;};");
+        
+        LogicalPlan lp = planTester.buildPlan("store C into '/tmp';");
+        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+        
+        SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+        so.visit();
+        
+        assertTrue(so.getNumMRUseSecondaryKey()==1);
+        assertTrue(so.getNumSortRemoved()==2);
+        assertTrue(so.getDistinctChanged()==0);
+    }
+    
+    public void testSortOptimization4() throws Exception{
+        // Sort on the main key again
+        LogicalPlanTester planTester = new LogicalPlanTester() ;
+        planTester.buildPlan("A = LOAD 'input1' AS (a0, a1, a2);");
+        planTester.buildPlan("B = group A by $0;");
+        planTester.buildPlan("C = foreach B { D = limit A 10; E = order D by $0, $1, $2; generate group, E;};");
+        
+        LogicalPlan lp = planTester.buildPlan("store C into '/tmp';");
+        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+        
+        SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+        so.visit();
+        
+        assertTrue(so.getNumMRUseSecondaryKey()==1);
+        assertTrue(so.getNumSortRemoved()==1);
+        assertTrue(so.getDistinctChanged()==0);
+    }
+    
+    public void testSortOptimization5() throws Exception{
+        // Sort on the two keys, we can only take off 1
+        LogicalPlanTester planTester = new LogicalPlanTester() ;
+        planTester.buildPlan("A = LOAD 'input1' AS (a0, a1, a2);");
+        planTester.buildPlan("B = group A by $0;");
+        planTester.buildPlan("C = foreach B { D = limit A 10; E = order D by $1; F = order E by $2; generate group, F;};");
+        
+        LogicalPlan lp = planTester.buildPlan("store C into '/tmp';");
+        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+        
+        SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+        so.visit();
+        
+        assertTrue(so.getNumMRUseSecondaryKey()==1);
+        assertTrue(so.getNumSortRemoved()==1);
+        assertTrue(so.getDistinctChanged()==0);
+    }
+    
+    public void testSortOptimization6() throws Exception{
+        // Sort desc
+        LogicalPlanTester planTester = new LogicalPlanTester() ;
+        planTester.buildPlan("A = LOAD 'input1' AS (a0, a1, a2);");
+        planTester.buildPlan("B = group A by $0;");
+        planTester.buildPlan("C = foreach B { D = order A by $0 desc; generate group, D;};");
+        
+        LogicalPlan lp = planTester.buildPlan("store C into '/tmp';");
+        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+        
+        SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+        so.visit();
+        
+        assertTrue(so.getNumMRUseSecondaryKey()==1);
+        assertTrue(so.getNumSortRemoved()==1);
+        assertTrue(so.getDistinctChanged()==0);
+    }
+    
+    public void testSortOptimization7() throws Exception{
+        // Sort asc on 1st key, desc on 2nd key
+        LogicalPlanTester planTester = new LogicalPlanTester() ;
+        planTester.buildPlan("A = LOAD 'input1' AS (a0, a1, a2);");
+        planTester.buildPlan("B = group A by ($0, $1);");
+        planTester.buildPlan("C = foreach B { D = order A by $0, $1 desc; generate group, D;};");
+        
+        LogicalPlan lp = planTester.buildPlan("store C into '/tmp';");
+        PhysicalPlan pp = Util.buildPhysicalPlan(lp, pc);
+        MROperPlan mrPlan = Util.buildMRPlan(pp, pc);
+        
+        SecondaryKeyOptimizer so = new SecondaryKeyOptimizer(mrPlan);
+        so.visit();
+        
+        assertTrue(so.getNumMRUseSecondaryKey()==1);
+        assertTrue(so.getNumSortRemoved()==1);
+        assertTrue(so.getDistinctChanged()==0);
+    }
+    
+    public void testNestedDistinctEndToEnd1() throws Exception{
+        File tmpFile1 = File.createTempFile("test", "txt");
+        PrintStream ps1 = new PrintStream(new FileOutputStream(tmpFile1));
+        ps1.println("1\t2\t3");
+        ps1.println("1\t3\t4");
+        ps1.println("1\t2\t4");
+        ps1.println("1\t2\t4");
+        ps1.println("1\t2\t4");
+        ps1.println("2\t3\t4");
+        ps1.close();
+        
+        File tmpFile2 = File.createTempFile("test", "txt");
+        PrintStream ps2 = new PrintStream(new FileOutputStream(tmpFile2));
+        ps2.println("1\t4\t4");
+        ps2.println("2\t3\t1");
+        ps2.close();
+        Util.copyFromLocalToCluster(cluster, tmpFile1.getCanonicalPath(), 
+                tmpFile1.getCanonicalPath());
+        Util.copyFromLocalToCluster(cluster, tmpFile2.getCanonicalPath(), 
+                tmpFile2.getCanonicalPath());
+
+        pigServer.registerQuery("A = LOAD '" + tmpFile1.getCanonicalPath() + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = LOAD '" + tmpFile2.getCanonicalPath() + "' AS (b0, b1, b2);");
+        pigServer.registerQuery("C = cogroup A by a0, B by b0 parallel 2;");
+        pigServer.registerQuery("D = foreach C { E = limit A 10; F = E.a1; G = DISTINCT F; generate group, COUNT(G);};");
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+        assertTrue(iter.hasNext());
+        assertTrue(iter.next().toString().equals("(1,2L)"));
+        assertTrue(iter.hasNext());
+        assertTrue(iter.next().toString().equals("(2,1L)"));
+        assertFalse(iter.hasNext());
+        Util.deleteFile(cluster, tmpFile1.getCanonicalPath());
+        Util.deleteFile(cluster, tmpFile2.getCanonicalPath());
+    }
+    
+    public void testNestedDistinctEndToEnd2() throws Exception{
+        File tmpFile1 = File.createTempFile("test", "txt");
+        PrintStream ps1 = new PrintStream(new FileOutputStream(tmpFile1));
+        ps1.println("1\t2\t3");
+        ps1.println("1\t3\t4");
+        ps1.println("1\t2\t4");
+        ps1.println("1\t2\t4");
+        ps1.println("1\t2\t4");
+        ps1.println("2\t3\t4");
+        ps1.close();
+        Util.copyFromLocalToCluster(cluster, tmpFile1.getCanonicalPath(), 
+                tmpFile1.getCanonicalPath());
+        pigServer.registerQuery("A = LOAD '" + tmpFile1.getCanonicalPath() + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = group A by $0 parallel 2;");
+        pigServer.registerQuery("C = foreach B { D = distinct A; generate group, D;};");
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        assertTrue(iter.hasNext());
+        assertTrue(iter.next().toString().equals("(1,{(1,2,3),(1,2,4),(1,3,4)})"));
+        assertTrue(iter.hasNext());
+        assertTrue(iter.next().toString().equals("(2,{(2,3,4)})"));
+        assertFalse(iter.hasNext());
+        Util.deleteFile(cluster, tmpFile1.getCanonicalPath());
+    }
+    
+    public void testNestedSortEndToEnd1() throws Exception{
+        File tmpFile1 = File.createTempFile("test", "txt");
+        PrintStream ps1 = new PrintStream(new FileOutputStream(tmpFile1));
+        ps1.println("1\t2\t3");
+        ps1.println("1\t3\t4");
+        ps1.println("1\t2\t4");
+        ps1.println("1\t2\t4");
+        ps1.println("1\t2\t4");
+        ps1.println("2\t3\t4");
+        ps1.close();
+        Util.copyFromLocalToCluster(cluster, tmpFile1.getCanonicalPath(), 
+                tmpFile1.getCanonicalPath());
+        pigServer.registerQuery("A = LOAD '" + tmpFile1.getCanonicalPath() + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = group A by $0 parallel 2;");
+        pigServer.registerQuery("C = foreach B { D = limit A 10; E = order D by $1; generate group, E;};");
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        assertTrue(iter.hasNext());
+        assertTrue(iter.next().toString().equals("(1,{(1,2,3),(1,2,4),(1,2,4),(1,2,4),(1,3,4)})"));
+        assertTrue(iter.hasNext());
+        assertTrue(iter.next().toString().equals("(2,{(2,3,4)})"));
+        assertFalse(iter.hasNext());
+        Util.deleteFile(cluster, tmpFile1.getCanonicalPath());
+    }
+    
+    public void testNestedSortEndToEnd2() throws Exception{
+        File tmpFile1 = File.createTempFile("test", "txt");
+        PrintStream ps1 = new PrintStream(new FileOutputStream(tmpFile1));
+        ps1.println("1\t2\t3");
+        ps1.println("1\t3\t4");
+        ps1.println("1\t4\t4");
+        ps1.println("1\t2\t4");
+        ps1.println("1\t8\t4");
+        ps1.println("2\t3\t4");
+        ps1.close();
+        Util.copyFromLocalToCluster(cluster, tmpFile1.getCanonicalPath(), 
+                tmpFile1.getCanonicalPath());
+        pigServer.registerQuery("A = LOAD '" + tmpFile1.getCanonicalPath() + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = group A by $0 parallel 2;");
+        pigServer.registerQuery("C = foreach B { D = order A by a1 desc; generate group, D;};");
+        Iterator<Tuple> iter = pigServer.openIterator("C");
+        assertTrue(iter.hasNext());
+        assertTrue(iter.next().toString().equals("(1,{(1,8,4),(1,4,4),(1,3,4),(1,2,3),(1,2,4)})"));
+        assertTrue(iter.hasNext());
+        assertTrue(iter.next().toString().equals("(2,{(2,3,4)})"));
+        assertFalse(iter.hasNext());
+        Util.deleteFile(cluster, tmpFile1.getCanonicalPath());
+    }
+    
+    public void testNestedSortMultiQueryEndToEnd1() throws Exception{
+        pigServer.setBatchOn();
+        Util.copyFromLocalToCluster(cluster, 
+                "test/org/apache/pig/test/data/passwd",
+                "testNestedSortMultiQueryEndToEnd1-input.txt");
+        pigServer.registerQuery("a = load 'testNestedSortMultiQueryEndToEnd1-input.txt'" +
+                " using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+        pigServer.registerQuery("b = group a by uname parallel 2;");
+        pigServer.registerQuery("c = group a by gid parallel 2;");
+        pigServer.registerQuery("d = foreach b generate SUM(a.gid);");
+        pigServer.registerQuery("e = foreach c { f = order a by uid; generate group, f; };");
+        pigServer.registerQuery("store d into '/tmp/output1';");
+        pigServer.registerQuery("store e into '/tmp/output2';");
+        
+        List<ExecJob> jobs = pigServer.executeBatch();
+        for (ExecJob job : jobs) {
+            assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+        }
+        FileLocalizer.delete("/tmp/output1", pigServer.getPigContext());
+        FileLocalizer.delete("/tmp/output2", pigServer.getPigContext());
+        Util.deleteFile(cluster, "testNestedSortMultiQueryEndToEnd1-input.txt");
+    }
+}

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestSkewedJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestSkewedJoin.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestSkewedJoin.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestSkewedJoin.java Tue Nov 24 19:54:19 2009
@@ -138,7 +138,6 @@
 
     }
     
-    
     public void testSkewedJoinWithGroup() throws IOException{
         pigServer.registerQuery("A = LOAD '" + INPUT_FILE1 + "' as (id, name, n);");
         pigServer.registerQuery("B = LOAD '" + INPUT_FILE2 + "' as (id, name);");
@@ -290,17 +289,18 @@
       	        lineCount[key][i] ++;
       	    }
          }
+         
+         int fc = 0;
          for(int i=0; i<3; i++) {
-        	 int fc = 0;
         	 for(int j=0; j<7; j++) {
-        		 if (lineCount[i][j] > 0) {
+        	     if (lineCount[i][j] > 0) {
         			 fc ++;
         		 }
         	 }
-        	 // all three keys are skewed keys,
-        	 // check each key should appear in more than 1 part- file
-        	 assertTrue(fc > 1);
          }
+         // atleast one key should be a skewed key
+         // check atleast one key should appear in more than 1 part- file
+         assertTrue(fc > 3);
     }
     
     public void testSkewedJoinNullKeys() throws IOException {
@@ -324,4 +324,72 @@
         return;
     }
     
+    public void testSkewedJoinOuter() throws IOException {
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE5 + "' as (id,name);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE5 + "' as (id,name);");
+        try {
+            DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
+            {
+                pigServer.registerQuery("C = join A by id left, B by id using \"skewed\";");
+                Iterator<Tuple> iter = pigServer.openIterator("C");
+                
+                while(iter.hasNext()) {
+                    dbfrj.add(iter.next());
+                }
+            }
+            {
+                pigServer.registerQuery("C = join A by id right, B by id using \"skewed\";");
+                Iterator<Tuple> iter = pigServer.openIterator("C");
+                
+                while(iter.hasNext()) {
+                    dbfrj.add(iter.next());
+                }
+            }
+            {
+                pigServer.registerQuery("C = join A by id full, B by id using \"skewed\";");
+                Iterator<Tuple> iter = pigServer.openIterator("C");
+                
+                while(iter.hasNext()) {
+                    dbfrj.add(iter.next());
+                }
+            }
+        } catch(Exception e) {
+            System.out.println(e.getMessage());
+            e.printStackTrace();
+            fail("Should support outer join in skewed join");
+        }
+        return;
+    }
+    
+    // pig 1048
+    public void testSkewedJoinOneValue() throws IOException {
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE3 + "' as (id,name);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE3 + "' as (id,name);");
+        // Filter key with a single value
+
+        pigServer.registerQuery("C = FILTER A by id == 400;");
+        pigServer.registerQuery("D = FILTER B by id == 400;");
+
+        
+        DataBag dbfrj = BagFactory.getInstance().newDefaultBag(), dbrj = BagFactory.getInstance().newDefaultBag();
+        {
+            pigServer.registerQuery("E = join C by id, D by id using \"skewed\";");
+            Iterator<Tuple> iter = pigServer.openIterator("E");
+                
+            while(iter.hasNext()) {
+                dbfrj.add(iter.next());
+            }
+        }
+        {
+        	pigServer.registerQuery("E = join C by id, D by id;");
+        	Iterator<Tuple> iter = pigServer.openIterator("E");
+        
+        	while(iter.hasNext()) {
+        		dbrj.add(iter.next());
+        	}
+        }
+        Assert.assertEquals(dbfrj.size(), dbrj.size());
+        Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbrj));       
+       
+    }
 }

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java Tue Nov 24 19:54:19 2009
@@ -17,16 +17,12 @@
  */
 package org.apache.pig.test;
 
-import static org.junit.Assert.assertEquals;
-
 import java.util.*;
 
 import org.apache.pig.ExecType;
 
-import java.io.File;
 import java.io.BufferedReader;
 import java.io.FileReader;
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Random;
@@ -35,13 +31,11 @@
 import org.apache.pig.impl.plan.OperatorKey;
 
 import org.apache.pig.FuncSpec;
-import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.DefaultBagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.DefaultTuple;
-import org.apache.pig.data.DataByteArray;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.PigServer;
@@ -52,11 +46,8 @@
 import org.apache.pig.backend.local.executionengine.physicalLayer.counters.POCounter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
 import org.apache.pig.test.utils.GenPhyOp;
 import org.apache.pig.test.utils.GenRandomData;
 import org.apache.pig.test.utils.TestHelper;
@@ -65,10 +56,7 @@
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
-import org.apache.pig.backend.datastorage.ContainerDescriptor;
 import org.apache.pig.backend.datastorage.DataStorage;
-import org.apache.pig.backend.datastorage.DataStorageException;
-import org.apache.pig.backend.datastorage.ElementDescriptor;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestTypeCheckingValidator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestTypeCheckingValidator.java?rev=883836&r1=883835&r2=883836&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestTypeCheckingValidator.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestTypeCheckingValidator.java Tue Nov 24 19:54:19 2009
@@ -24,6 +24,7 @@
 import java.util.List;
 import java.util.ArrayList;
 
+import junit.framework.AssertionFailedError;
 import junit.framework.TestCase;
 
 import org.apache.pig.EvalFunc;
@@ -49,7 +50,18 @@
 
 public class TestTypeCheckingValidator extends TestCase {
 
-    LogicalPlanTester planTester = new LogicalPlanTester() ;
+    LogicalPlanTester planTester;
+    
+    /* (non-Javadoc)
+     * @see junit.framework.TestCase#setUp()
+     */
+    @Override
+    protected void setUp() throws Exception {
+        // create a new instance of the plan tester
+        // for each test so that different tests do not
+        // interact with each other's plans
+        planTester = new LogicalPlanTester() ;
+    }
     
 	private static final String simpleEchoStreamingCommand;
         static {
@@ -3287,77 +3299,19 @@
     }
 
     @Test
-    public void testCogroupStarLineageNoSchema() throws Throwable {
-        planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
-        planTester.buildPlan("b = load 'b' using PigStorage() ;") ;
-        planTester.buildPlan("c = cogroup a by *, b by * ;") ;
-        planTester.buildPlan("d = foreach c generate group, flatten($1), flatten($2);") ;
-        LogicalPlan plan = planTester.buildPlan("e = foreach d generate group, $1 + 1, $2 + 2.0;") ;
-
-        // validate
-        CompilationMessageCollector collector = new CompilationMessageCollector() ;
-        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
-        try {
-            typeValidator.validate(plan, collector) ;
-        }
-        catch (PlanValidationException pve) {
-            //not good
-        }
-
-        printMessageCollector(collector) ;
-        printTypeGraph(plan) ;
-        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
-
-        if (collector.hasError()) {
-            throw new AssertionError("Expect no  error") ;
-        }
-
-
-        LOForEach foreach = (LOForEach)plan.getLeaves().get(0);
-        LogicalPlan foreachPlan = foreach.getForEachPlans().get(1);
-
-        LogicalOperator exOp = foreachPlan.getRoots().get(0);
-
-        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
-
-        LOCast cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
-        assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("BinStorage"));
-
-        foreachPlan = foreach.getForEachPlans().get(2);
-        exOp = foreachPlan.getRoots().get(0);
-        if(! (exOp instanceof LOProject)) exOp = foreachPlan.getRoots().get(1);
-        cast = (LOCast)foreachPlan.getSuccessors(exOp).get(0);
-        assertTrue(cast.getLoadFuncSpec().getClassName().startsWith("PigStorage"));
-
-    }
-
-    @Test
     public void testCogroupStarLineageNoSchemaFail() throws Throwable {
         planTester.buildPlan("a = load 'a' using BinStorage() ;") ;
         planTester.buildPlan("b = load 'b' using PigStorage() ;") ;
-        planTester.buildPlan("c = cogroup a by *, b by * ;") ;
-        planTester.buildPlan("d = foreach c generate group, flatten($1), flatten($2);") ;
-        LogicalPlan plan = planTester.buildPlan("e = foreach d generate group + 1, $1 + 1, $2 + 2.0;") ;
-
-        // validate
-        CompilationMessageCollector collector = new CompilationMessageCollector() ;
-        TypeCheckingValidator typeValidator = new TypeCheckingValidator() ;
+        boolean exceptionThrown = false;
         try {
-            typeValidator.validate(plan, collector) ;
-            fail("Exception expected") ;
-        }
-        catch (PlanValidationException pve) {
-            //not good
-        }
-
-        printMessageCollector(collector) ;
-        printTypeGraph(plan) ;
-        planTester.printPlan(plan, TypeCheckingTestUtil.getCurrentMethodName());
-
-        if (!collector.hasError()) {
-            throw new AssertionError("Expect error") ;
+            LogicalPlan lp = planTester.buildPlan("c = cogroup a by *, b by *;");
+        } catch(AssertionFailedError e) {
+            assertTrue(e.getMessage().contains("Cogroup/Group by * is only allowed if " +
+            "the input has a schema"));
+            exceptionThrown = true;
         }
-
+        assertTrue(exceptionThrown);
+        
     }
 
     @Test

Added: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestUDFContext.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestUDFContext.java?rev=883836&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestUDFContext.java (added)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestUDFContext.java Tue Nov 24 19:54:19 2009
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Properties;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.BufferedPositionedInputStream;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.impl.util.UDFContext;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+
+public class TestUDFContext extends TestCase {
+    
+    static MiniCluster cluster = null;
+    
+    @Override 
+    protected void setUp() throws Exception {
+        cluster = MiniCluster.buildCluster();
+    }
+
+
+    @Test
+    public void testUDFContext() throws Exception {
+        Util.createInputFile(cluster, "a.txt", new String[] { "dumb" });
+        Util.createInputFile(cluster, "b.txt", new String[] { "dumber" });
+        FileLocalizer.deleteTempFiles();
+        PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+        String[] statement = { "A = LOAD 'a.txt' USING org.apache.pig.test.utils.UDFContextTestLoader('joe');",
+            "B = LOAD 'b.txt' USING org.apache.pig.test.utils.UDFContextTestLoader('jane');",
+            "C = union A, B;",
+            "D = FOREACH C GENERATE $0, $1, org.apache.pig.test.utils.UDFContextTestEvalFunc($0), org.apache.pig.test.utils.UDFContextTestEvalFunc2($0);" };
+
+        File tmpFile = File.createTempFile("temp_jira_851", ".pig");
+        FileWriter writer = new FileWriter(tmpFile);
+        for (String line : statement) {
+            writer.write(line + "\n");
+        }
+        writer.close();
+        
+        pig.registerScript(tmpFile.getAbsolutePath());
+        Iterator<Tuple> iterator = pig.openIterator("D");
+        while (iterator.hasNext()) {
+            Tuple tuple = iterator.next();
+            if ("dumb".equals(tuple.get(0).toString())) {
+                assertEquals(tuple.get(1).toString(), "joe");
+            } else if ("dumber".equals(tuple.get(0).toString())) {
+                assertEquals(tuple.get(1).toString(), "jane");
+            }
+        	assertEquals(Integer.valueOf(tuple.get(2).toString()), new Integer(5));
+        	assertEquals(tuple.get(3).toString(), "five");
+        }
+    }
+}