You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by pr...@apache.org on 2010/01/08 19:17:12 UTC

svn commit: r897283 [5/5] - in /hadoop/pig/branches/load-store-redesign: ./ contrib/piggybank/java/ contrib/zebra/ contrib/zebra/src/java/org/apache/hadoop/zebra/pig/ contrib/zebra/src/java/org/apache/hadoop/zebra/types/ contrib/zebra/src/test/e2e/merg...

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POSplitOutput.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POSplitOutput.java?rev=897283&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POSplitOutput.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POSplitOutput.java Fri Jan  8 18:17:07 2010
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.pen.physicalOperators;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.ExampleTuple;
+
+public class POSplitOutput extends PhysicalOperator {
+
+    /**
+     * POSplitOutput reads from POSplit using an iterator
+     */
+    private static final long serialVersionUID = 1L;
+    
+    PhysicalOperator compOp;
+    PhysicalPlan compPlan;
+    transient Iterator<Tuple> it;
+    
+    public POSplitOutput(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+	super(k, rp, inp);
+	// TODO Auto-generated constructor stub
+    }
+
+    public POSplitOutput(OperatorKey k, int rp) {
+	super(k, rp);
+	// TODO Auto-generated constructor stub
+    }
+
+    public POSplitOutput(OperatorKey k, List<PhysicalOperator> inp) {
+	super(k, inp);
+	// TODO Auto-generated constructor stub
+    }
+
+    public POSplitOutput(OperatorKey k) {
+	super(k);
+	// TODO Auto-generated constructor stub
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+	// TODO Auto-generated method stub
+
+    }
+    
+    @SuppressWarnings("unchecked")
+    public Result getNext(Tuple t) throws ExecException {
+	if(it == null) {
+	    PhysicalOperator op = getInputs().get(0);
+	    Result res = getInputs().get(0).getNext(t);
+	    if(res.returnStatus == POStatus.STATUS_OK)
+		it = (Iterator<Tuple>) res.result;
+	}
+	Result res = null;
+	Result inp = new Result();
+	while(true) {
+	    if(it.hasNext())
+		inp.result = it.next();
+	    else {
+		inp.returnStatus = POStatus.STATUS_EOP;
+		return inp;
+	    }
+	    inp.returnStatus = POStatus.STATUS_OK;
+
+	    compPlan.attachInput((Tuple) inp.result);
+
+	    res = compOp.getNext(dummyBool);
+	    if (res.returnStatus != POStatus.STATUS_OK 
+		    && res.returnStatus != POStatus.STATUS_NULL) 
+		return res;
+
+	    if (res.result != null && (Boolean) res.result == true) {
+		if(lineageTracer != null) {
+		    ExampleTuple tIn = (ExampleTuple) inp.result;
+		    lineageTracer.insert(tIn);
+		    lineageTracer.union(tIn, tIn);
+		}
+		return inp;
+	    }
+	}
+        
+    }
+
+    @Override
+    public String name() {
+	// TODO Auto-generated method stub
+	return "POSplitOutput " + mKey.toString();
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+	// TODO Auto-generated method stub
+	return false;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+	// TODO Auto-generated method stub
+	return false;
+    }
+    
+    public void setPlan(PhysicalPlan compPlan) {
+	this.compPlan = compPlan;
+	this.compOp = compPlan.getLeaves().get(0);
+    }
+
+}

Added: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POStreamLocal.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POStreamLocal.java?rev=897283&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POStreamLocal.java (added)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/pen/physicalOperators/POStreamLocal.java Fri Jan  8 18:17:07 2010
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.pen.physicalOperators;
+
+import java.util.Properties;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.streaming.ExecutableManager;
+import org.apache.pig.impl.streaming.StreamingCommand;
+
+public class POStreamLocal extends POStream {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 2L;
+
+    public POStreamLocal(OperatorKey k, ExecutableManager executableManager,
+            StreamingCommand command, Properties properties) {
+        super(k, executableManager, command, properties);
+        // TODO Auto-generated constructor stub
+    }
+    
+    
+    /**
+     * This is different from the Map-Reduce implementation of the POStream since there is no
+     * push model here. POStatus_EOP signals the end of input and can be used to decide when 
+     * to stop the stdin to the process
+     */
+    @Override
+    public Result getNext(Tuple t) throws ExecException {
+        // The POStream Operator works with ExecutableManager to
+        // send input to the streaming binary and to get output
+        // from it. To achieve a tuple oriented behavior, two queues
+        // are used - one for output from the binary and one for 
+        // input to the binary. In each getNext() call:
+        // 1) If there is no more output expected from the binary, an EOP is
+        // sent to successor
+        // 2) If there is any output from the binary in the queue, it is passed
+        // down to the successor
+        // 3) if neither of these two are true and if it is possible to
+        // send input to the binary, then the next tuple from the
+        // predecessor is got and passed to the binary
+        try {
+            // if we are being called AFTER all output from the streaming 
+            // binary has already been sent to us then just return EOP
+            // The "allOutputFromBinaryProcessed" flag is set when we see
+            // an EOS (End of Stream output) from streaming binary
+            if(allOutputFromBinaryProcessed) {
+                return new Result(POStatus.STATUS_EOP, null);
+            }
+            
+            // if we are here AFTER all map() calls have been completed
+            // AND AFTER we process all possible input to be sent to the
+            // streaming binary, then all we want to do is read output from
+            // the streaming binary
+            if(allInputFromPredecessorConsumed) {
+                Result r = binaryOutputQueue.take();
+                if(r.returnStatus == POStatus.STATUS_EOS) {
+                    // If we received EOS, it means all output
+                    // from the streaming binary has been sent to us
+                    // So we can send an EOP to the successor in
+                    // the pipeline. Also since we are being called
+                    // after all input from predecessor has been processed
+                    // it means we got here from a call from close() in
+                    // map or reduce. So once we send this EOP down, 
+                    // getNext() in POStream should never be called. So
+                    // we don't need to set any flag noting we saw all output
+                    // from binary
+                    r.returnStatus = POStatus.STATUS_EOP;
+                }
+                return(r);
+            }
+            
+            // if we are here, we haven't consumed all input to be sent
+            // to the streaming binary - check if we are being called
+            // from close() on the map or reduce
+            //if(this.parentPlan.endOfAllInput) {
+                Result r = getNextHelper(t);
+                if(r.returnStatus == POStatus.STATUS_EOP) {
+                    // we have now seen *ALL* possible input
+                    // check if we ever had any real input
+                    // in the course of the map/reduce - if we did
+                    // then "initialized" will be true. If not, just
+                    // send EOP down.
+                    if(initialized) {
+                        // signal End of ALL input to the Executable Manager's 
+                        // Input handler thread
+                        binaryInputQueue.put(r);
+                        // note this state for future calls
+                        allInputFromPredecessorConsumed  = true;
+                        // look for output from binary
+                        r = binaryOutputQueue.take();
+                        if(r.returnStatus == POStatus.STATUS_EOS) {
+                            // If we received EOS, it means all output
+                            // from the streaming binary has been sent to us
+                            // So we can send an EOP to the successor in
+                            // the pipeline. Also since we are being called
+                            // after all input from predecessor has been processed
+                            // it means we got here from a call from close() in
+                            // map or reduce. So once we send this EOP down, 
+                            // getNext() in POStream should never be called. So
+                            // we don't need to set any flag noting we saw all output
+                            // from binary
+                            r.returnStatus = POStatus.STATUS_EOP;
+                        }
+                    }
+                    
+                } else if(r.returnStatus == POStatus.STATUS_EOS) {
+                    // If we received EOS, it means all output
+                    // from the streaming binary has been sent to us
+                    // So we can send an EOP to the successor in
+                    // the pipeline. Also we are being called
+                    // from close() in map or reduce (this is so because
+                    // only then this.parentPlan.endOfAllInput is true).
+                    // So once we send this EOP down, getNext() in POStream
+                    // should never be called. So we don't need to set any 
+                    // flag noting we saw all output from binary
+                    r.returnStatus = POStatus.STATUS_EOP;
+                }
+                return r;
+//            } else {
+//                // we are not being called from close() - so
+//                // we must be called from either map() or reduce()
+//                // get the next Result from helper
+//                Result r = getNextHelper(t);
+//                if(r.returnStatus == POStatus.STATUS_EOS) {
+//                    // If we received EOS, it means all output
+//                    // from the streaming binary has been sent to us
+//                    // So we can send an EOP to the successor in
+//                    // the pipeline and also note this condition
+//                    // for future calls
+//                    r.returnStatus = POStatus.STATUS_EOP;
+//                    allOutputFromBinaryProcessed  = true;
+//                }
+//                return r;
+//            }
+            
+        } catch(Exception e) {
+            throw new ExecException("Error while trying to get next result in POStream", e);
+        }
+            
+        
+    }
+
+    
+
+}

Modified: hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/pigstats/PigStats.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/pigstats/PigStats.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/pigstats/PigStats.java (original)
+++ hadoop/pig/branches/load-store-redesign/src/org/apache/pig/tools/pigstats/PigStats.java Fri Jan  8 18:17:07 2010
@@ -39,11 +39,13 @@
 import org.apache.hadoop.mapred.jobcontrol.Job;
 import org.apache.hadoop.mapred.jobcontrol.JobControl;
 import org.apache.pig.ExecType;
+import org.apache.pig.PigCounters;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROperPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.data.BagFactory;
 import org.apache.pig.impl.util.ObjectSerializer;
 
 public class PigStats {
@@ -186,6 +188,9 @@
                         jobStats.put("PIG_STATS_REDUCE_INPUT_RECORDS", (Long.valueOf(taskgroup.getCounterForName("REDUCE_INPUT_RECORDS").getCounter())).toString());
                         jobStats.put("PIG_STATS_REDUCE_OUTPUT_RECORDS", (Long.valueOf(taskgroup.getCounterForName("REDUCE_OUTPUT_RECORDS").getCounter())).toString());
                         jobStats.put("PIG_STATS_BYTES_WRITTEN", (Long.valueOf(hdfsgroup.getCounterForName("HDFS_BYTES_WRITTEN").getCounter())).toString());
+                        jobStats.put("PIG_STATS_SMM_SPILL_COUNT", (Long.valueOf(counters.findCounter(PigCounters.SPILLABLE_MEMORY_MANAGER_SPILL_COUNT).getCounter())).toString() );
+                        jobStats.put("PIG_STATS_PROACTIVE_SPILL_COUNT", (Long.valueOf(counters.findCounter(PigCounters.PROACTIVE_SPILL_COUNT).getCounter())).toString() );
+
                     }
                     else
                     {
@@ -194,6 +199,8 @@
                         jobStats.put("PIG_STATS_REDUCE_INPUT_RECORDS", "-1");
                         jobStats.put("PIG_STATS_REDUCE_OUTPUT_RECORDS", "-1");
                         jobStats.put("PIG_STATS_BYTES_WRITTEN", "-1");
+                        jobStats.put("PIG_STATS_SMM_SPILL_COUNT", "-1");
+                        jobStats.put("PIG_STATS_PROACTIVE_SPILL_COUNT", "-1");
                     }
                     
                 } catch (IOException e) {
@@ -294,6 +301,21 @@
     	
     }
     
+    public long getSMMSpillCount() {
+        long spillCount = 0;
+        for (String jid : rootJobIDs) {
+            Map<String, String> jobStats = stats.get(jid);
+            if (jobStats == null) continue;
+            if (Long.parseLong(jobStats.get("PIG_STATS_SMM_SPILL_COUNT"))==-1L)
+            {
+                spillCount = -1L;
+                break;
+            }
+            spillCount += Long.parseLong(jobStats.get("PIG_STATS_SMM_SPILL_COUNT"));
+        }
+        return spillCount;
+    }
+    
     private long getLocalBytesWritten() {
     	for(PhysicalOperator op : php.getLeaves())
     		return Long.parseLong(stats.get(op.toString()).get("PIG_STATS_LOCAL_BYTES_WRITTEN"));

Modified: hadoop/pig/branches/load-store-redesign/test/findbugsExcludeFile.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/findbugsExcludeFile.xml?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/findbugsExcludeFile.xml (original)
+++ hadoop/pig/branches/load-store-redesign/test/findbugsExcludeFile.xml Fri Jan  8 18:17:07 2010
@@ -324,5 +324,9 @@
         <Field name = "res" />
         <Bug pattern="MF_CLASS_MASKS_FIELD" />
     </Match>
-    
+    <Match>
+        <Class name="org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher" />
+        <Method name = "launchPig" />
+        <Bug pattern="DE_MIGHT_IGNORE" />
+    </Match>
 </FindBugsFilter>

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestBZip.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestBZip.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestBZip.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestBZip.java Fri Jan  8 18:17:07 2010
@@ -28,8 +28,8 @@
 
 import org.apache.pig.ExecType;
 import org.apache.pig.PigServer;
+import org.apache.pig.test.utils.LocalSeekableInputStream;
 import org.apache.pig.backend.datastorage.ElementDescriptor;
-import org.apache.pig.backend.local.datastorage.LocalSeekableInputStream;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.tools.bzip2r.CBZip2InputStream;

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestEvalPipeline2.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestEvalPipeline2.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestEvalPipeline2.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestEvalPipeline2.java Fri Jan  8 18:17:07 2010
@@ -424,4 +424,39 @@
         
         assertTrue(iter.hasNext()==false);
     }
+    
+    // See PIG-761
+    @Test
+    public void testLimitPOPackageAnnotator() throws Exception{
+        File tmpFile1 = File.createTempFile("test1", "txt");
+        PrintStream ps1 = new PrintStream(new FileOutputStream(tmpFile1));
+        ps1.println("1\t2\t3");
+        ps1.println("2\t5\t2");
+        ps1.close();
+        
+        File tmpFile2 = File.createTempFile("test2", "txt");
+        PrintStream ps2 = new PrintStream(new FileOutputStream(tmpFile2));
+        ps2.println("1\t1");
+        ps2.println("2\t2");
+        ps2.close();
+
+        pigServer.registerQuery("A = LOAD '" + Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = LOAD '" + Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
+        pigServer.registerQuery("C = LIMIT B 100;");
+        pigServer.registerQuery("D = COGROUP C BY b0, A BY a0 PARALLEL 2;");
+        Iterator<Tuple> iter = pigServer.openIterator("D");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.toString().equals("(1,{(1,1)},{(1,2,3)})"));
+        
+        assertTrue(iter.hasNext());
+        t = iter.next();
+        
+        assertTrue(t.toString().equals("(2,{(2,2)},{(2,5,2)})"));
+        
+        assertFalse(iter.hasNext());
+    }
+
 }

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLoadFunc.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLoadFunc.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLoadFunc.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLoadFunc.java Fri Jan  8 18:17:07 2010
@@ -74,7 +74,7 @@
     public void testGetAbsolutePath3() throws IOException {
         // test case: remote hdfs path
         String absPath = "hdfs://myhost.mydomain:37765/data/passwd";
-        Assert.assertEquals(curHdfsRoot + "/data/passwd",
+        Assert.assertEquals(absPath,
                 LoadFunc.getAbsolutePath(absPath, curHdfsDir));      
     }
     

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLocalPOSplit.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLocalPOSplit.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLocalPOSplit.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestLocalPOSplit.java Fri Jan  8 18:17:07 2010
@@ -37,7 +37,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.local.executionengine.physicalLayer.LocalLogToPhyTranslationVisitor;
+import org.apache.pig.pen.LocalLogToPhyTranslationVisitor;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestMultiQuery.java Fri Jan  8 18:17:07 2010
@@ -111,6 +111,63 @@
     public void tearDown() throws Exception {
         myPig = null;
     }
+
+    public void testMultiQueryJiraPig1171() {
+
+        // test case: Problems with some top N queries
+        
+        String INPUT_FILE = "abc";
+        
+        try {
+    
+            PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+            w.println("1\tapple\t3");
+            w.println("2\torange\t4");
+            w.println("3\tpersimmon\t5");
+            w.close();
+    
+            Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+           
+            myPig.setBatchOn();
+    
+            myPig.registerQuery("A = load '" + INPUT_FILE 
+                    + "' as (a:long, b, c);");
+            myPig.registerQuery("A1 = Order A by a desc;");
+            myPig.registerQuery("A2 = limit A1 1;");
+            myPig.registerQuery("B = load '" + INPUT_FILE 
+                    + "' as (a:long, b, c);");
+            myPig.registerQuery("B1 = Order B by a desc;");
+            myPig.registerQuery("B2 = limit B1 1;");
+            
+            myPig.registerQuery("C = cross A2, B2;");
+            
+            Iterator<Tuple> iter = myPig.openIterator("C");
+
+            List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+                    new String[] { 
+                            "(3L,'persimmon',5,3L,'persimmon',5)"
+                    });
+            
+            int counter = 0;
+            while (iter.hasNext()) {
+                assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());      
+            }
+
+            assertEquals(expectedResults.size(), counter);
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } finally {
+            new File(INPUT_FILE).delete();
+            try {
+                Util.deleteFile(cluster, INPUT_FILE);
+            } catch (IOException e) {
+                e.printStackTrace();
+                Assert.fail();
+            }
+        }
+    }
     
     public void testMultiQueryJiraPig1157() {
 

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPOCogroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPOCogroup.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPOCogroup.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPOCogroup.java Fri Jan  8 18:17:07 2010
@@ -32,7 +32,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrangeForIllustrate;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORead;
-import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POCogroup;
+import org.apache.pig.pen.physicalOperators.POCogroup;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPOCross.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPOCross.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPOCross.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPOCross.java Fri Jan  8 18:17:07 2010
@@ -28,7 +28,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORead;
-import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POCross;
+import org.apache.pig.pen.physicalOperators.POCross;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPruneColumn.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPruneColumn.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPruneColumn.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPruneColumn.java Fri Jan  8 18:17:07 2010
@@ -1656,4 +1656,115 @@
             "No map keys pruned for C"}));
     }
 
+    // See PIG-1165
+    @Test
+    public void testOrderbyWrongSignature() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1);");
+        pigServer.registerQuery("C = order A by a1;");
+        pigServer.registerQuery("D = join C by a1, B by b0;");
+        pigServer.registerQuery("E = foreach D generate a1, b0, b1;");
+        Iterator<Tuple> iter = pigServer.openIterator("E");
+        
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+        
+        assertTrue(t.size()==3);
+        assertTrue(t.toString().equals("(2,2,2)"));
+        
+        assertFalse(iter.hasNext());
+        
+        assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0, $2", 
+            "No map keys pruned for A", "No column pruned for B", "No map keys pruned for B"}));
+    }
+    
+    // See PIG-1146
+    @Test
+    public void testUnionMixedPruning() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:chararray, a2);");
+        pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b2);");
+        pigServer.registerQuery("C = foreach B generate b0, 'hello', b2;");
+        pigServer.registerQuery("D = union A, C;");
+        pigServer.registerQuery("E = foreach D generate $0, $2;");
+        Iterator<Tuple> iter = pigServer.openIterator("E");
+        Collection<String> results = new HashSet<String>();
+        results.add("(1,3)");
+        results.add("(2,2)");
+        results.add("(1,1)");
+        results.add("(2,2)");
+
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+
+        assertTrue(t.size()==2);
+        assertTrue(results.contains(t.toString()));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+
+        assertTrue(t.size()==2);
+        assertTrue(results.contains(t.toString()));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+
+        assertTrue(t.size()==2);
+        assertTrue(results.contains(t.toString()));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+
+        assertTrue(t.size()==2);
+        assertTrue(results.contains(t.toString()));
+
+        assertFalse(iter.hasNext());
+
+        assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1",
+            "No map keys pruned for A", "No column pruned for B",
+            "No map keys pruned for B"}));
+    }
+    
+    // See PIG-1176
+    @Test
+    public void testUnionMixedSchemaPruning() throws Exception {
+        pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
+        pigServer.registerQuery("B = foreach A generate a0;;");
+        pigServer.registerQuery("C = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "';");
+        pigServer.registerQuery("D = foreach C generate $0;");
+        pigServer.registerQuery("E = union B, D;");
+        Iterator<Tuple> iter = pigServer.openIterator("E");
+        Collection<String> results = new HashSet<String>();
+        results.add("(1)");
+        results.add("(2)");
+        results.add("(1)");
+        results.add("(2)");
+
+        assertTrue(iter.hasNext());
+        Tuple t = iter.next();
+
+        assertTrue(t.size()==1);
+        assertTrue(results.contains(t.toString()));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+
+        assertTrue(t.size()==1);
+        assertTrue(results.contains(t.toString()));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+
+        assertTrue(t.size()==1);
+        assertTrue(results.contains(t.toString()));
+
+        assertTrue(iter.hasNext());
+        t = iter.next();
+
+        assertTrue(t.size()==1);
+        assertTrue(results.contains(t.toString()));
+
+        assertFalse(iter.hasNext());
+
+        assertTrue(emptyLogFileMessage());
+    }
 }

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPushDownForeachFlatten.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPushDownForeachFlatten.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPushDownForeachFlatten.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestPushDownForeachFlatten.java Fri Jan  8 18:17:07 2010
@@ -977,5 +977,27 @@
 
     }
 
+    // See PIG-1172
+    @Test
+    public void testForeachJoinRequiredField() throws Exception {
+        planTester.buildPlan("A = load 'myfile' as (bg:bag{t:tuple(a0,a1)});");
+        planTester.buildPlan("B = FOREACH A generate flatten($0);");
+        planTester.buildPlan("C = load '3.txt' AS (c0, c1);");
+        planTester.buildPlan("D = JOIN B by a1, C by c1;");
+        LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+        
+        planTester.setPlan(lp);
+        planTester.setProjectionMap(lp);
+        planTester.rebuildSchema(lp);
+        
+        PushDownForeachFlatten pushDownForeach = new PushDownForeachFlatten(lp);
+
+        LOLoad loada = (LOLoad) lp.getRoots().get(0);
+        
+        assertTrue(!pushDownForeach.check(lp.getSuccessors(loada)));
+        assertTrue(pushDownForeach.getSwap() == false);
+        assertTrue(pushDownForeach.getInsertBetween() == false);
+    }
+
 }
 

Modified: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java?rev=897283&r1=897282&r2=897283&view=diff
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java (original)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/TestStore.java Fri Jan  8 18:17:07 2010
@@ -17,12 +17,12 @@
  */
 package org.apache.pig.test;
 
+import java.io.File;
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileReader;
 import java.io.IOException;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
@@ -44,6 +44,12 @@
 import org.apache.pig.data.DefaultTuple;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceLauncher;
+import org.apache.pig.pen.physicalOperators.POCounter;
+import org.apache.pig.tools.pigstats.PigStats;
 import org.apache.pig.impl.logicalLayer.LOStore;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
@@ -58,22 +64,24 @@
 import org.junit.Test;
 
 public class TestStore extends junit.framework.TestCase {
-    
+    POStore st;
     DataBag inpDB;
     static MiniCluster cluster = MiniCluster.buildCluster();
-    PigServer pig;
     PigContext pc;
+    POProject proj;
+    PigServer pig;
+    POCounter pcount;
+        
     String inputFileName;
     String outputFileName;
-    
+        
     @Override
     @Before
     public void setUp() throws Exception {
         pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
         pc = pig.getPigContext();
         inputFileName = "/tmp/TestStore-" + new Random().nextLong() + ".txt";
-        outputFileName = "/tmp/TestStore-output-" + new Random().nextLong() + 
-        ".txt";
+        outputFileName = "/tmp/TestStore-output-" + new Random().nextLong() + ".txt";
     }
 
     @Override
@@ -87,14 +95,26 @@
     private void storeAndCopyLocally(DataBag inpDB) throws Exception {
         setUpInputFileOnCluster(inpDB);
         String script = "a = load '" + inputFileName + "'; " +
-        "store a into '" + outputFileName + "' using PigStorage(':');" +
-        		"fs -ls /tmp";
+                "store a into '" + outputFileName + "' using PigStorage(':');" +
+                "fs -ls /tmp";
         pig.setBatchOn();
         Util.registerMultiLineQuery(pig, script);
         pig.executeBatch();
         Util.copyFromClusterToLocal(cluster, outputFileName + "/part-m-00000", outputFileName);
     }
 
+    private PigStats store() throws Exception {
+        PhysicalPlan pp = new PhysicalPlan();
+        pp.add(proj);
+        pp.add(st);
+        pp.add(pcount);
+        //pp.connect(proj, st);
+        pp.connect(proj, pcount);
+        pp.connect(pcount, st);
+        pc.setExecType(ExecType.LOCAL);
+        return new MapReduceLauncher().launchPig(pp, "TestStore", pc);
+    }
+
     @Test
     public void testStore() throws Exception {
         inpDB = GenRandomData.genRandSmallTupDataBag(new Random(), 10, 100);
@@ -159,7 +179,6 @@
         inpDB = GenRandomData.genRandFullTupTextDataBag(new Random(), 10, 100);
         storeAndCopyLocally(inpDB);
         PigStorage ps = new PigStorage(":");
-                
         int size = 0;
         BufferedReader br = new BufferedReader(new FileReader(outputFileName));
         for(String line=br.readLine();line!=null;line=br.readLine()){

Added: hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/LocalSeekableInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/LocalSeekableInputStream.java?rev=897283&view=auto
==============================================================================
--- hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/LocalSeekableInputStream.java (added)
+++ hadoop/pig/branches/load-store-redesign/test/org/apache/pig/test/utils/LocalSeekableInputStream.java Fri Jan  8 18:17:07 2010
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test.utils;
+
+import java.io.RandomAccessFile;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.pig.backend.datastorage.*;
+
+public class LocalSeekableInputStream extends SeekableInputStream {
+
+    protected RandomAccessFile file;
+    protected long curMark;
+    
+    public LocalSeekableInputStream(File file) throws FileNotFoundException {
+        this.file = new RandomAccessFile(file, "r");
+        this.curMark = 0;
+    }
+    
+    @Override
+    public void seek(long offset, FLAGS whence) throws IOException {
+        long targetPos;
+        
+        switch (whence) {
+        case SEEK_SET: {
+            targetPos = offset;
+            break;
+        }
+        case SEEK_CUR: {
+            targetPos = this.file.getFilePointer() + offset;
+            break;
+        }
+        case SEEK_END: {
+            targetPos = this.file.length() + offset;
+            break;
+        }
+        default: {
+            throw new IOException("Invalid seek option: " + whence);
+        }
+        }
+        
+        this.file.seek(targetPos);
+    }
+    
+    @Override
+    public long tell() throws IOException {
+        return this.file.getFilePointer();
+    }
+    
+    @Override
+    public int read() throws IOException {
+        return this.file.read();
+    }
+    
+    @Override
+    public int read(byte[] b) throws IOException {
+        return this.file.read(b);
+    }
+        
+    @Override
+    public int read(byte[] b, int off, int len ) throws IOException {
+        return this.file.read(b, off, len);
+    }
+    
+    @Override
+    public int available() throws IOException {
+    	return (int)( this.file.length() - this.file.getFilePointer() );
+    }
+    
+    @Override
+    public long skip(long n) throws IOException {
+        long skipped = 0;
+        
+        if (n > 0) {
+            skipped = this.file.length() - tell();
+
+            seek(n, FLAGS.SEEK_CUR);
+        }
+        
+        return skipped;
+    }
+    
+    @Override
+    public void close() throws IOException {
+        this.file.close();
+    }
+    
+    @Override
+    public void mark(int readlimit) {
+        try {
+            this.curMark = tell();
+        }
+        catch (IOException e) {
+            ;
+        }
+    }
+    
+    @Override
+    public void reset() throws IOException {
+        seek(this.curMark, FLAGS.SEEK_SET);
+    }
+    
+    @Override
+    public boolean markSupported() {
+        return true;
+    }
+}