You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2010/02/18 23:20:09 UTC

svn commit: r911616 [7/7] - in /hadoop/pig/branches/load-store-redesign: ./ contrib/piggybank/java/src/main/java/org/apache/pig/piggybank/evaluation/util/ contrib/piggybank/java/src/test/java/org/apache/pig/piggybank/test/evaluation/util/ src/org/apach...

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestExperimentalRule.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestExperimentalRule.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestExperimentalRule.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestExperimentalRule.java Thu Feb 18 22:20:07 2010
@@ -56,6 +56,11 @@
             super();
         }
 
+        @Override
+        public boolean isEqual(OperatorPlan other) {
+            return false;
+        }
+
     }
     
     private static class OP extends Operator {
@@ -66,6 +71,11 @@
         public void accept(PlanVisitor v) {
             
         }
+
+        @Override
+        public boolean isEqual(Operator operator) {
+            return false;
+        }
     }
     
     private static class OP_Load extends OP {
@@ -100,6 +110,7 @@
 
     
     OperatorPlan plan = null;
+    Operator join;
     
     public void setUp() {
         plan = new SillyPlan();
@@ -134,7 +145,47 @@
         plan.connect(t1, f3);
         plan.connect(t1, f4);
         plan.connect(f3, s1);
-        plan.connect(f4, s2);
+        plan.connect(f4, s2); 
+        
+        join = j1;
+    }
+    
+    
+    public void testMultiNode() throws Exception {    
+        //         load --|-join - filter - filter - split |- filter - store
+        //         load --|      
+        // load -- filter-|
+        Operator l3 = new OP_Load("p3", plan);
+        Operator f5 = new OP_Filter("f5", plan);
+        plan.add(l3);
+        plan.add(f5);
+        plan.connect(l3, f5);
+            
+         plan.connect(f5, join);
+       
+        
+         OperatorPlan pattern = new SillyPlan();
+         Operator op1 = new OP_Load("mmm1", pattern);
+         Operator op2 = new OP_Filter("mmm2", pattern);
+         Operator op3 = new OP_Join("mmm3", pattern);
+         pattern.add(op1);
+         pattern.add(op2);
+         pattern.add(op3);
+         pattern.connect(op1, op3);
+         pattern.connect(op2, op3);
+         
+         Rule r = new SillyRule("basic", pattern);
+         List<OperatorPlan> l = r.match(plan);
+         assertEquals(1, l.size());
+         OperatorPlan match = l.get(0);
+         assertEquals(match.size(), 3);
+         assertEquals(match.getSinks().size(), 1);
+         assertEquals(match.getSinks().get(0), join);
+         
+         assertEquals(match.getSources().size(), 2);
+         assertTrue(match.getSources().get(0).getClass().equals(OP_Load.class) || match.getSources().get(0).equals(f5) );
+         assertTrue(match.getSources().get(1).getClass().equals(OP_Load.class) || match.getSources().get(1).equals(f5) );
+         assertNotSame(match.getSources().get(0), match.getSources().get(1));
     }
     
     public void testSingleNodeMatch() {
@@ -146,11 +197,11 @@
         List<OperatorPlan> l = r.match(plan);
         assertEquals(l.size(), 2);
         
-        Operator m1 = l.get(0).getRoots().get(0);
+        Operator m1 = l.get(0).getSources().get(0);
         assertTrue(m1.getName().equals("p1") || m1.getName().equals("p2"));
         assertEquals(l.get(0).size(), 1);
         
-        Operator m2 = l.get(1).getRoots().get(0);
+        Operator m2 = l.get(1).getSources().get(0);
         assertTrue(m2.getName().equals("p1") || m2.getName().equals("p2"));
         assertEquals(l.get(1).size(), 1);
         assertNotSame(m1.getName(), m2.getName());
@@ -162,12 +213,12 @@
         l = r.match(plan);
         assertEquals(l.size(), 4);
         
-        m1 = l.get(0).getRoots().get(0);
+        m1 = l.get(0).getSources().get(0);
         assertTrue(m1.getName().equals("f1") || m1.getName().equals("f2") 
                 || m1.getName().equals("f3") || m1.getName().equals("f4"));
         assertEquals(l.get(0).size(), 1);
         
-        m2 = l.get(1).getRoots().get(0);
+        m2 = l.get(1).getSources().get(0);
         assertTrue(m1.getName().equals("f1") || m1.getName().equals("f2") 
                 || m1.getName().equals("f3") || m1.getName().equals("f4"));
         assertEquals(l.get(1).size(), 1);
@@ -180,11 +231,11 @@
         l = r.match(plan);
         assertEquals(l.size(), 2);
         
-        m1 = l.get(0).getRoots().get(0);
+        m1 = l.get(0).getSources().get(0);
         assertTrue(m1.getName().equals("s1") || m1.getName().equals("s2"));
         assertEquals(l.get(0).size(), 1);
         
-        m2 = l.get(1).getRoots().get(0);
+        m2 = l.get(1).getSources().get(0);
         assertTrue(m2.getName().equals("s1") || m2.getName().equals("s2"));
         assertEquals(l.get(1).size(), 1);
         assertNotSame(m1.getName(), m2.getName());
@@ -196,7 +247,7 @@
         l = r.match(plan);
         assertEquals(l.size(), 1);
         
-        m1 = l.get(0).getRoots().get(0);
+        m1 = l.get(0).getSources().get(0);
         assertTrue(m1.getName().equals("t1"));
         assertEquals(l.get(0).size(), 1);
         
@@ -207,7 +258,7 @@
         l = r.match(plan);
         assertEquals(l.size(), 1);
         
-        m1 = l.get(0).getRoots().get(0);
+        m1 = l.get(0).getSources().get(0);
         assertTrue(m1.getName().equals("j1"));
         assertEquals(l.get(0).size(), 1);
       
@@ -223,13 +274,13 @@
         List<OperatorPlan> l = r.match(plan);
         assertEquals(l.size(), 1);
         
-        assertEquals(l.get(0).getRoots().size(), 2);
-        assertEquals(l.get(0).getLeaves().size(), 2);
+        assertEquals(l.get(0).getSources().size(), 2);
+        assertEquals(l.get(0).getSinks().size(), 2);
         assertEquals(l.get(0).size(), 2);
         
-        Operator m1 = l.get(0).getRoots().get(0);
+        Operator m1 = l.get(0).getSources().get(0);
         assertTrue(m1.getName().equals("p1") || m1.getName().equals("p2"));
-        Operator m2 = l.get(0).getRoots().get(1);
+        Operator m2 = l.get(0).getSources().get(1);
         assertTrue(m2.getName().equals("p1") || m2.getName().equals("p2"));       
         assertNotSame(m1.getName(), m2.getName());
        
@@ -246,13 +297,13 @@
         l = r.match(plan);
         assertEquals(l.size(), 1);
         
-        assertEquals(l.get(0).getRoots().size(), 1);
-        assertEquals(l.get(0).getLeaves().size(), 1);
+        assertEquals(l.get(0).getSources().size(), 1);
+        assertEquals(l.get(0).getSinks().size(), 1);
         assertEquals(l.get(0).size(), 2);
         
-        m1 = l.get(0).getRoots().get(0);
+        m1 = l.get(0).getSources().get(0);
         assertTrue(m1.getName().equals("j1"));
-        m2 = l.get(0).getLeaves().get(0);
+        m2 = l.get(0).getSinks().get(0);
         assertTrue(m2.getName().equals("f1"));       
        
   
@@ -268,8 +319,8 @@
         l = r.match(plan);
         assertEquals(2, l.size());
         
-        assertEquals(l.get(0).getRoots().size(), 1);
-        assertEquals(l.get(0).getLeaves().size(), 1);                     
+        assertEquals(l.get(0).getSources().size(), 1);
+        assertEquals(l.get(0).getSinks().size(), 1);                     
         
         // search for 2 loads, then join
         pattern = new SillyPlan();
@@ -301,15 +352,15 @@
         l = r.match(plan);
         assertEquals(1, l.size());
         
-        assertEquals(l.get(0).getRoots().size(), 1);
-        assertEquals(l.get(0).getLeaves().size(), 2);
+        assertEquals(l.get(0).getSources().size(), 1);
+        assertEquals(l.get(0).getSinks().size(), 2);
         assertEquals(l.get(0).size(), 3);
         
-        m1 = l.get(0).getRoots().get(0);
+        m1 = l.get(0).getSources().get(0);
         assertTrue(m1.getName().equals("t1"));
-        m2 = l.get(0).getLeaves().get(0);
+        m2 = l.get(0).getSinks().get(0);
         assertTrue(m2.getName().equals("f3") || m2.getName().equals("f4"));    
-        m2 = l.get(0).getLeaves().get(1);
+        m2 = l.get(0).getSinks().get(1);
         assertTrue(m2.getName().equals("f3") || m2.getName().equals("f4"));    
     }
    

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestGrunt.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestGrunt.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestGrunt.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestGrunt.java Thu Feb 18 22:20:07 2010
@@ -811,7 +811,7 @@
         PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
         PigContext context = server.getPigContext();
 
-        String strCmd = "register 'pig.jar'\n";
+        String strCmd = "register 'pig-withouthadoop.jar'\n";
 
         ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
         InputStreamReader reader = new InputStreamReader(cmd);
@@ -819,14 +819,14 @@
         Grunt grunt = new Grunt(new BufferedReader(reader), context);
 
         grunt.exec();
-        assertTrue(context.extraJars.contains(ClassLoader.getSystemResource("pig.jar")));
+        assertTrue(context.extraJars.contains(ClassLoader.getSystemResource("pig-withouthadoop.jar")));
     }
     
     public void testRegisterWithoutQuotes() throws Throwable {
         PigServer server = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
         PigContext context = server.getPigContext();
 
-        String strCmd = "register pig.jar\n";
+        String strCmd = "register pig-withouthadoop.jar\n";
 
         ByteArrayInputStream cmd = new ByteArrayInputStream(strCmd.getBytes());
         InputStreamReader reader = new InputStreamReader(cmd);
@@ -834,6 +834,6 @@
         Grunt grunt = new Grunt(new BufferedReader(reader), context);
 
         grunt.exec();
-        assertTrue(context.extraJars.contains(ClassLoader.getSystemResource("pig.jar")));
+        assertTrue(context.extraJars.contains(ClassLoader.getSystemResource("pig-withouthadoop.jar")));
     }
 }

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestJoin.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestJoin.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestJoin.java Thu Feb 18 22:20:07 2010
@@ -98,6 +98,40 @@
     }
 
     @Test
+    public void testJoinWithMissingFieldsInTuples() throws IOException{
+        
+        setUp(ExecType.MAPREDUCE);
+        String[] input1 = {
+                "ff ff ff",
+                "",
+                "",
+                "",
+                "",
+                "ff ff ff",
+                "",
+                ""
+                };
+        String[] input2 = {
+                "",
+                "",
+                "",
+                "",
+                ""
+                };
+        
+        String firstInput = createInputFile(ExecType.MAPREDUCE, "a.txt", input1);
+        String secondInput = createInputFile(ExecType.MAPREDUCE, "b.txt", input2);
+        String script = "a = load 'a.txt'  using PigStorage(' ');" +
+        "b = load 'b.txt'  using PigStorage('\u0001');" +
+        "c = join a by $0, b by $0;";
+        Util.registerMultiLineQuery(pigServer, script);
+        Iterator<Tuple> it = pigServer.openIterator("c");
+        assertFalse(it.hasNext());
+        deleteInputFile(ExecType.MAPREDUCE, firstInput);
+        deleteInputFile(ExecType.MAPREDUCE, secondInput);
+    }
+    
+    @Test
     public void testJoinUnkownSchema() throws Exception {
         // If any of the input schema is unknown, the resulting schema should be unknown as well
         for (ExecType execType : execTypes) {

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLocalRearrange.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLocalRearrange.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLocalRearrange.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLocalRearrange.java Thu Feb 18 22:20:07 2010
@@ -195,22 +195,29 @@
         try {
             PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
             w.println("10\t2\t3");
+            w.println("10\t4\t5");
+            w.println("20\t3000\t2");
+            w.println("20\t4000\t3");
             w.println("20\t3\t");
+            w.println("21\t4\t");
+            w.println("22\t5\t");
             w.close();
             Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
 
             PigServer myPig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
 
             myPig.registerQuery("data = load '" + INPUT_FILE + "' as (a0, a1, a2);");
-            myPig.registerQuery("grp = GROUP data BY (((double) a2)/((double) a1) > .001 OR a0 < 11 ? a0 : -1);");
+            myPig.registerQuery("grp = GROUP data BY (((double) a2)/((double) a1) > .001 OR a0 < 11 ? a0 : 0);");
+            myPig.registerQuery("res = FOREACH grp GENERATE group, SUM(data.a1), SUM(data.a2);");
             
             List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
-                    new String[] { 
-                            "(10,{(10,2,3)})",
-                            "(null,{(20,3,null)})"
+                    new String[] {   
+                            "(0,7000.0,5.0)",
+                            "(10,6.0,8.0)",                            
+                            "(null,12.0,null)"
                     });
             
-            Iterator<Tuple> iter = myPig.openIterator("grp");
+            Iterator<Tuple> iter = myPig.openIterator("res");
             int counter = 0;
             while (iter.hasNext()) {
                 assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());      

Added: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLogicalPlanMigrationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLogicalPlanMigrationVisitor.java?rev=911616&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLogicalPlanMigrationVisitor.java (added)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLogicalPlanMigrationVisitor.java Thu Feb 18 22:20:07 2010
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.FuncSpec;
+import org.apache.pig.data.DataType;
+import org.apache.pig.experimental.logical.LogicalPlanMigrationVistor;
+import org.apache.pig.experimental.logical.expression.AndExpression;
+import org.apache.pig.experimental.logical.expression.CastExpression;
+import org.apache.pig.experimental.logical.expression.ConstantExpression;
+import org.apache.pig.experimental.logical.expression.EqualExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.expression.ProjectExpression;
+import org.apache.pig.experimental.logical.optimizer.UidStamper;
+import org.apache.pig.experimental.logical.relational.LOForEach;
+import org.apache.pig.experimental.logical.relational.LOGenerate;
+import org.apache.pig.experimental.logical.relational.LOInnerLoad;
+import org.apache.pig.experimental.logical.relational.LOJoin;
+import org.apache.pig.experimental.logical.relational.LOLoad;
+import org.apache.pig.experimental.logical.relational.LOStore;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.logical.relational.LogicalSchema;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.test.utils.LogicalPlanTester;
+
+import junit.framework.TestCase;
+
+public class TestLogicalPlanMigrationVisitor extends TestCase {
+
+    public void testSimplePlan() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt';");
+        lpt.buildPlan("b = filter a by $0==NULL;");        
+        LogicalPlan plan = lpt.buildPlan("store b into 'empty';");
+        
+        // check basics
+        org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan);
+        assertEquals(3, newPlan.size());
+        assertEquals(newPlan.getSources().size(), 1);
+        
+        // check load
+        LogicalRelationalOperator op = (LogicalRelationalOperator)newPlan.getSources().get(0);
+        assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOLoad.class);
+        
+        // check filter
+        op = (LogicalRelationalOperator)newPlan.getSuccessors(op).get(0);
+        assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOFilter.class);
+        
+        LogicalExpressionPlan exp = ((org.apache.pig.experimental.logical.relational.LOFilter)op).getFilterPlan();
+        
+        EqualExpression eq = (EqualExpression)exp.getSources().get(0);
+        assertEquals(eq.getLhs().getClass(), ProjectExpression.class);
+        assertEquals(((ProjectExpression)eq.getLhs()).getColNum(), 0);
+        assertEquals(((ProjectExpression)eq.getLhs()).getInputNum(), 0);
+        
+        assertEquals(eq.getRhs().getClass(), ConstantExpression.class);
+        
+        // check store
+        op = (LogicalRelationalOperator)newPlan.getSuccessors(op).get(0);
+        assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOStore.class);
+    }
+    
+    public void testPlanWithCast() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (id, c);");
+        lpt.buildPlan("b = filter a by (int)id==10;");        
+        LogicalPlan plan = lpt.buildPlan("store b into 'empty';");
+        
+        // check basics
+        org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan);
+        assertEquals(3, newPlan.size());
+        assertEquals(newPlan.getSources().size(), 1);
+        
+        // check load
+        LogicalRelationalOperator op = (LogicalRelationalOperator)newPlan.getSources().get(0);
+        assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOLoad.class);
+        
+        // check filter
+        op = (LogicalRelationalOperator)newPlan.getSuccessors(op).get(0);
+        assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOFilter.class);
+        
+        LogicalExpressionPlan exp = ((org.apache.pig.experimental.logical.relational.LOFilter)op).getFilterPlan();
+        
+        EqualExpression eq = (EqualExpression)exp.getSources().get(0);
+        assertEquals(eq.getLhs().getClass(), CastExpression.class);
+                
+        assertEquals(eq.getLhs().getClass(), CastExpression.class);
+        LogicalExpression ep = (LogicalExpression)exp.getSuccessors(eq.getLhs()).get(0);
+        assertEquals(ep.getClass(), ProjectExpression.class);
+        assertEquals(((ProjectExpression)ep).getColNum(), 0);
+        assertEquals(((ProjectExpression)ep).getInputNum(), 0);
+        
+        assertEquals(eq.getRhs().getClass(), ConstantExpression.class);
+        
+        // check store
+        op = (LogicalRelationalOperator)newPlan.getSuccessors(op).get(0);
+        assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOStore.class);
+    }
+    
+    public void testJoinPlan() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd1.txt' as (id, c);");
+        lpt.buildPlan("b = load 'd2.txt'as (id, c);");
+        lpt.buildPlan("c = join a by id, b by c;");
+        lpt.buildPlan("d = filter c by a::id==NULL AND b::c==NULL;");        
+        LogicalPlan plan = lpt.buildPlan("store d into 'empty';");
+      
+        // check basics
+        org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan);
+        assertEquals(5, newPlan.size());
+        assertEquals(newPlan.getSources().size(), 2);
+       
+        // check load and join
+        LogicalRelationalOperator op = (LogicalRelationalOperator)newPlan.getSuccessors(newPlan.getSources().get(0)).get(0);
+        assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOJoin.class);
+        assertEquals(((LOJoin)op).getJoinType(), LOJoin.JOINTYPE.HASH);
+        
+        LogicalRelationalOperator l1 = (LogicalRelationalOperator)newPlan.getPredecessors(op).get(0);
+        assertEquals(l1.getClass(), org.apache.pig.experimental.logical.relational.LOLoad.class);
+        assertEquals(l1.getAlias(), "a");
+        
+        LogicalRelationalOperator l2 = (LogicalRelationalOperator)newPlan.getPredecessors(op).get(1);
+        assertEquals(l2.getClass(), org.apache.pig.experimental.logical.relational.LOLoad.class);
+        assertEquals(l2.getAlias(), "b");
+
+        // check join input plans
+        LogicalExpressionPlan p1 = ((LOJoin)op).getJoinPlan(0).iterator().next();
+        assertEquals(p1.size(), 1);
+        
+        ProjectExpression prj = (ProjectExpression)p1.getSources().get(0);
+       
+        assertEquals(prj.getInputNum(), 0);
+        assertEquals(prj.getColNum(), 0);
+        
+        LogicalExpressionPlan p2 = ((LOJoin)op).getJoinPlan(1).iterator().next();
+        assertEquals(p2.size(), 1);
+        
+        prj = (ProjectExpression)p2.getSources().get(0);
+     
+        assertEquals(prj.getInputNum(), 1);
+        assertEquals(prj.getColNum(), 1);
+        
+        // check filter
+        op = (LogicalRelationalOperator)newPlan.getSuccessors(op).get(0);
+        assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOFilter.class);        
+        LogicalExpressionPlan exp = ((org.apache.pig.experimental.logical.relational.LOFilter)op).getFilterPlan();
+        
+        AndExpression ae = (AndExpression)exp.getSources().get(0);
+        
+        EqualExpression eq = (EqualExpression)exp.getSuccessors(ae).get(0);
+        assertEquals(eq.getLhs().getClass(), ProjectExpression.class);
+        assertEquals(((ProjectExpression)eq.getLhs()).getColNum(), 0);
+        assertEquals(((ProjectExpression)eq.getLhs()).getInputNum(), 0);
+        
+        assertEquals(eq.getRhs().getClass(), ConstantExpression.class);
+        
+        eq = (EqualExpression)exp.getSuccessors(ae).get(1);
+        assertEquals(eq.getLhs().getClass(), ProjectExpression.class);
+        assertEquals(((ProjectExpression)eq.getLhs()).getColNum(), 3);
+        assertEquals(((ProjectExpression)eq.getLhs()).getInputNum(), 0);
+        
+        assertEquals(eq.getRhs().getClass(), ConstantExpression.class);
+        
+        // check store
+        op = (LogicalRelationalOperator)newPlan.getSuccessors(op).get(0);
+        assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOStore.class); 
+    }
+    
+    public void testForeachPlan() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load '/test/d.txt' as (id, d);");
+        lpt.buildPlan("b = foreach a generate id, FLATTEN(d);");        
+        LogicalPlan plan = lpt.buildPlan("store b into '/test/empty';");
+        
+        // check basics
+        org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan);
+        
+        org.apache.pig.experimental.logical.relational.LogicalPlan expected = 
+            new org.apache.pig.experimental.logical.relational.LogicalPlan();
+        
+        LogicalSchema aschema = new LogicalSchema();    	
+        aschema.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.BYTEARRAY));
+        aschema.addField(new LogicalSchema.LogicalFieldSchema("d", null, DataType.BYTEARRAY));
+        LOLoad load = new LOLoad(new FileSpec("file:///test/d.txt", new FuncSpec("org.apache.pig.builtin.PigStorage")), aschema, expected);
+        expected.add(load);
+        
+        LOForEach foreach = new LOForEach(expected);
+        org.apache.pig.experimental.logical.relational.LogicalPlan innerPlan = new org.apache.pig.experimental.logical.relational.LogicalPlan();
+        LOInnerLoad l1 = new LOInnerLoad(innerPlan, foreach, 0);
+        innerPlan.add(l1);
+        LOInnerLoad l2 = new LOInnerLoad(innerPlan, foreach, 1);
+        
+        List<LogicalExpressionPlan> eps = new ArrayList<LogicalExpressionPlan>();
+        LogicalExpressionPlan p1 = new LogicalExpressionPlan();
+        p1.add(new ProjectExpression(p1, DataType.BYTEARRAY, 0, 0));
+        LogicalExpressionPlan p2 = new LogicalExpressionPlan();
+        p2.add(new ProjectExpression(p2, DataType.BYTEARRAY, 1, 0));
+        eps.add(p1);
+        eps.add(p2);
+        
+        LOGenerate gen = new LOGenerate(innerPlan, eps, new boolean[] {false, true});
+        innerPlan.add(gen);
+        innerPlan.connect(l1, gen);
+        innerPlan.connect(l2, gen);
+        
+        foreach.setInnerPlan(innerPlan);    	
+        expected.add(foreach);
+        
+        LOStore s = new LOStore(expected, new FileSpec("file:///test/empty", new FuncSpec("org.apache.pig.builtin.PigStorage")));
+      
+        expected.add(s);
+        
+        expected.connect(load, foreach);
+        expected.connect(foreach, s);
+        
+        try {
+            UidStamper stamper = new UidStamper(expected);
+            stamper.visit();         
+        }catch(Exception e) {
+            throw new VisitorException(e);
+        }
+
+        assertTrue(expected.isEqual(newPlan));
+        
+        LogicalSchema schema = foreach.getSchema();
+        aschema = new LogicalSchema();
+        aschema.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.BYTEARRAY));
+        aschema.addField(new LogicalSchema.LogicalFieldSchema("d", null, DataType.BYTEARRAY));
+        assertTrue(schema.isEqual(aschema));
+    }
+
+    public void testForeachSchema() throws Exception {
+        // test flatten
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load '/test/d.txt' as (id, d:tuple(v, s));");
+        LogicalPlan plan = lpt.buildPlan("b = foreach a generate id, FLATTEN(d);");  
+                
+        org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan);
+        LogicalRelationalOperator op = (LogicalRelationalOperator)newPlan.getSinks().get(0);
+        
+        LogicalSchema s2 = new LogicalSchema();
+        s2.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.BYTEARRAY));
+        s2.addField(new LogicalSchema.LogicalFieldSchema("v", null, DataType.BYTEARRAY));
+        s2.addField(new LogicalSchema.LogicalFieldSchema("s", null, DataType.BYTEARRAY));
+        assertTrue(s2.isEqual(op.getSchema()));
+        
+        // test no flatten
+        lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load '/test/d.txt' as (id, d:bag{t:(v, s)});");
+        plan = lpt.buildPlan("b = foreach a generate id, d;");  
+                
+        newPlan = migratePlan(plan);
+        op = (LogicalRelationalOperator)newPlan.getSinks().get(0);
+        
+        LogicalSchema aschema = new LogicalSchema();    	
+        aschema.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.BYTEARRAY));
+        LogicalSchema aschema2 = new LogicalSchema();
+        LogicalSchema aschema3 = new LogicalSchema();
+        aschema3.addField(new LogicalSchema.LogicalFieldSchema("v", null, DataType.BYTEARRAY));
+        aschema3.addField(new LogicalSchema.LogicalFieldSchema("s", null, DataType.BYTEARRAY));
+        aschema2.addField(new LogicalSchema.LogicalFieldSchema("t", aschema3, DataType.TUPLE));
+        aschema.addField(new LogicalSchema.LogicalFieldSchema("d", aschema2, DataType.BAG));  
+        
+        assertTrue(aschema.isEqual(op.getSchema()));
+    }
+    
+    public void testForeachPlan2() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load '/test/d.txt' as (id, d:bag{t:(v, s)});");
+        lpt.buildPlan("b = foreach a generate id, FLATTEN(d);");        
+        LogicalPlan plan = lpt.buildPlan("store b into '/test/empty';");
+        
+        // check basics
+        org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan);
+        
+        org.apache.pig.experimental.logical.relational.LogicalPlan expected = 
+            new org.apache.pig.experimental.logical.relational.LogicalPlan();
+        
+        LogicalSchema aschema = new LogicalSchema();    	
+        aschema.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.BYTEARRAY));
+        LogicalSchema aschema2 = new LogicalSchema();
+        LogicalSchema aschema3 = new LogicalSchema();
+        aschema3.addField(new LogicalSchema.LogicalFieldSchema("v", null, DataType.BYTEARRAY));
+        aschema3.addField(new LogicalSchema.LogicalFieldSchema("s", null, DataType.BYTEARRAY));
+        aschema2.addField(new LogicalSchema.LogicalFieldSchema("t", aschema3, DataType.TUPLE));
+        aschema.addField(new LogicalSchema.LogicalFieldSchema("d", aschema2, DataType.BAG));        
+        
+        LOLoad load = new LOLoad(new FileSpec("file:///test/d.txt", new FuncSpec("org.apache.pig.builtin.PigStorage")), aschema, expected);
+        expected.add(load);         
+        
+        LOForEach foreach2 = new LOForEach(expected);
+        org.apache.pig.experimental.logical.relational.LogicalPlan innerPlan = new org.apache.pig.experimental.logical.relational.LogicalPlan();
+        LOInnerLoad l1 = new LOInnerLoad(innerPlan, foreach2, 0);
+        innerPlan.add(l1);
+        LOInnerLoad l2 = new LOInnerLoad(innerPlan, foreach2, 1);
+        
+        List<LogicalExpressionPlan>  eps = new ArrayList<LogicalExpressionPlan>();
+        LogicalExpressionPlan p1 = new LogicalExpressionPlan();
+        new ProjectExpression(p1, DataType.BYTEARRAY, 0, 0);
+        LogicalExpressionPlan p2 = new LogicalExpressionPlan();        
+        new ProjectExpression(p2, DataType.BAG, 1, 0);
+        eps.add(p1);
+        eps.add(p2);
+        
+        LOGenerate gen = new LOGenerate(innerPlan, eps, new boolean[] {false, true});
+        innerPlan.add(gen);
+        innerPlan.connect(l1, gen);
+        innerPlan.connect(l2, gen);
+        
+        foreach2.setInnerPlan(innerPlan);    	
+        expected.add(foreach2); 
+                
+        LOStore s = new LOStore(expected, new FileSpec("file:///test/empty", new FuncSpec("org.apache.pig.builtin.PigStorage")));
+      
+        expected.add(s);
+        
+        expected.connect(load, foreach2);
+    
+        expected.connect(foreach2, s);
+        try {
+            UidStamper stamper = new UidStamper(expected);
+            stamper.visit();         
+        }catch(Exception e) {
+            throw new VisitorException(e);
+        }
+        
+        assertTrue(expected.isEqual(newPlan));
+        
+        LogicalSchema schema = foreach2.getSchema();
+        aschema = new LogicalSchema();
+        aschema.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.BYTEARRAY));
+        aschema.addField(new LogicalSchema.LogicalFieldSchema("v", null, DataType.BYTEARRAY));
+        aschema.addField(new LogicalSchema.LogicalFieldSchema("s", null, DataType.BYTEARRAY));
+        assertTrue(schema.isEqual(aschema));
+    }
+    
+    private org.apache.pig.experimental.logical.relational.LogicalPlan migratePlan(LogicalPlan lp) throws VisitorException{
+        LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(lp);    	
+        visitor.visit();
+        
+        org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan();
+        try {
+            UidStamper stamper = new UidStamper(newPlan);
+            stamper.visit();
+            
+            return newPlan;
+        }catch(Exception e) {
+            throw new VisitorException(e);
+        }
+    }    
+}

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java Thu Feb 18 22:20:07 2010
@@ -108,6 +108,66 @@
         myPig = null;
     }
 
+    public void testMultiQueryJiraPig1169() {
+
+        // test case: Problems with some top N queries
+        
+        String INPUT_FILE = "abc";
+        
+        try {
+    
+            PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+            w.println("1\t2\t3");
+            w.println("2\t3\t4");
+            w.println("3\t4\t5");
+            w.println("5\t6\t7");
+            w.println("6\t7\t8");
+            w.close();
+    
+            Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+           
+            myPig.setBatchOn();
+    
+            myPig.registerQuery("A = load '" + INPUT_FILE 
+                    + "' as (a:int, b, c);");
+            myPig.registerQuery("A1 = Order A by a desc parallel 3;");
+            myPig.registerQuery("A2 = limit A1 2;");
+            myPig.registerQuery("store A1 into '/tmp/input1';");
+            myPig.registerQuery("store A2 into '/tmp/input2';");
+
+            myPig.executeBatch();
+
+            myPig.registerQuery("B = load '/tmp/input2' as (a:int, b, c);");
+            
+            Iterator<Tuple> iter = myPig.openIterator("B");
+
+            List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                    new String[] { 
+                            "(6,7,8)",
+                            "(5,6,7)"
+                    });
+            
+            int counter = 0;
+            while (iter.hasNext()) {
+                assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());      
+            }
+
+            assertEquals(expectedResults.size(), counter);
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } finally {
+            new File(INPUT_FILE).delete();
+            try {
+                Util.deleteFile(cluster, INPUT_FILE);
+            } catch (IOException e) {
+                e.printStackTrace();
+                Assert.fail();
+            }
+        }
+    }
+  
     public void testMultiQueryJiraPig1171() {
 
         // test case: Problems with some top N queries

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigContext.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigContext.java?rev=911616&r1=911615&r2=911616&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigContext.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPigContext.java Thu Feb 18 22:20:07 2010
@@ -149,12 +149,16 @@
                 
         // generate jar file
         String jarName = "TestUDFJar.jar";
+        String jarFile = tmpDir.getAbsolutePath() + FILE_SEPARATOR + jarName;
         status = Util.executeShellCommand("jar -cf " + tmpDir.getAbsolutePath() + FILE_SEPARATOR + jarName + 
                               " -C " + tmpDir.getAbsolutePath() + " " + "com");
         assertTrue(status==0);
+        Properties properties = cluster.getProperties();
+        PigContext pigContext = new PigContext(ExecType.MAPREDUCE, properties);
         
-        PigServer pig = new PigServer(pigContext);
-        pig.registerJar(tmpDir.getAbsolutePath() + FILE_SEPARATOR + jarName);
+        //register jar using properties
+        pigContext.getProperties().setProperty("pig.additional.jars", jarFile);
+        PigServer pigServer = new PigServer(pigContext);
 
         PigContext.initializeImportList("com.xxx.udf1:com.xxx.udf2.");
         ArrayList<String> importList = PigContext.getPackageImportList();
@@ -180,7 +184,6 @@
         }
         Util.createInputFile(cluster, tmpFile.getCanonicalPath(), input);        
         FileLocalizer.deleteTempFiles();
-        PigServer pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
         pigServer.registerQuery("A = LOAD '" + tmpFile.getCanonicalPath() + "' using TestUDF2() AS (num:chararray);");
         pigServer.registerQuery("B = foreach A generate TestUDF1(num);");
         Iterator<Tuple> iter = pigServer.openIterator("B");