You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2008/12/11 23:29:29 UTC

svn commit: r725846 [1/3] - in /hadoop/pig/branches/types: src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/ src/org/apache/pig/backend/hadoop/executionengine/physica...

Author: gates
Date: Thu Dec 11 14:29:29 2008
New Revision: 725846

URL: http://svn.apache.org/viewvc?rev=725846&view=rev
Log:
PIG-543.  Add a true local mode instead of using the local map reduce cluster.  Contributed by Shubham.


Added:
    hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java
    hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCross.java
    hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POStreamLocal.java
    hadoop/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEvalLocal.java
    hadoop/pig/branches/types/test/org/apache/pig/test/TestEvalPipelineLocal.java
    hadoop/pig/branches/types/test/org/apache/pig/test/TestForEachNestedPlanLocal.java
    hadoop/pig/branches/types/test/org/apache/pig/test/TestLocal.java
    hadoop/pig/branches/types/test/org/apache/pig/test/TestLocal2.java
    hadoop/pig/branches/types/test/org/apache/pig/test/TestPOCross.java
    hadoop/pig/branches/types/test/org/apache/pig/test/TestStreamingLocal.java
Modified:
    hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
    hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
    hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
    hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
    hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java
    hadoop/pig/branches/types/test/org/apache/pig/test/PigExecTestCase.java

Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=725846&r1=725845&r2=725846&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Thu Dec 11 14:29:29 2008
@@ -71,7 +71,7 @@
 
     private Log log = LogFactory.getLog(getClass());
 
-    PigContext pc;
+    protected PigContext pc;
 
     LoadFunc load;
 

Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=725846&r1=725845&r2=725846&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Thu Dec 11 14:29:29 2008
@@ -261,5 +261,10 @@
         
     }
 
+    public void visitCross(POCross cross) {
+        // TODO Auto-generated method stub
+        
+    }
+
 
 }

Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java?rev=725846&r1=725845&r2=725846&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java Thu Dec 11 14:29:29 2008
@@ -45,15 +45,15 @@
     private StreamingCommand command;               // Actual command to be run
     private Properties properties;
 
-    private boolean initialized = false;
+    protected boolean initialized = false;
     
-    private BlockingQueue<Result> binaryOutputQueue = new ArrayBlockingQueue<Result>(1);
+    protected BlockingQueue<Result> binaryOutputQueue = new ArrayBlockingQueue<Result>(1);
 
-    private BlockingQueue<Result> binaryInputQueue = new ArrayBlockingQueue<Result>(1);
+    protected BlockingQueue<Result> binaryInputQueue = new ArrayBlockingQueue<Result>(1);
 
-    private boolean allInputFromPredecessorConsumed = false;
+    protected boolean allInputFromPredecessorConsumed = false;
 
-    private boolean allOutputFromBinaryProcessed = false;
+    protected boolean allOutputFromBinaryProcessed = false;
 
     public POStream(OperatorKey k, ExecutableManager executableManager, 
                       StreamingCommand command, Properties properties) {

Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java?rev=725846&r1=725845&r2=725846&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalExecutionEngine.java Thu Dec 11 14:29:29 2008
@@ -54,29 +54,30 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PlanPrinter;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.local.executionengine.physicalLayer.LocalLogToPhyTranslationVisitor;
 import org.apache.pig.impl.plan.VisitorException;
 import java.util.Iterator;
 
-
 public class LocalExecutionEngine implements ExecutionEngine {
 
     protected PigContext pigContext;
     protected DataStorage ds;
     protected NodeIdGenerator nodeIdGenerator;
 
-    // key: the operator key from the logical plan that originated the physical plan
+    // key: the operator key from the logical plan that originated the physical
+    // plan
     // val: the operator key for the root of the phyisical plan
     protected Map<OperatorKey, OperatorKey> logicalToPhysicalKeys;
-    
+
     protected Map<OperatorKey, PhysicalOperator> physicalOpTable;
-    
+
     // map from LOGICAL key to into about the execution
     protected Map<OperatorKey, LocalResult> materializedResults;
-    
+
     public LocalExecutionEngine(PigContext pigContext) {
         this.pigContext = pigContext;
         this.ds = pigContext.getLfs();
-        this.nodeIdGenerator = NodeIdGenerator.getGenerator(); 
+        this.nodeIdGenerator = NodeIdGenerator.getGenerator();
         this.logicalToPhysicalKeys = new HashMap<OperatorKey, OperatorKey>();
         this.physicalOpTable = new HashMap<OperatorKey, PhysicalOperator>();
         this.materializedResults = new HashMap<OperatorKey, LocalResult>();
@@ -85,7 +86,7 @@
     public DataStorage getDataStorage() {
         return this.ds;
     }
-    
+
     public void init() throws ExecException {
         ;
     }
@@ -93,30 +94,46 @@
     public void close() throws ExecException {
         ;
     }
-        
+
     public Properties getConfiguration() throws ExecException {
         return this.pigContext.getProperties();
     }
-        
-    public void updateConfiguration(Properties newConfiguration) 
-        throws ExecException {
+
+    public void updateConfiguration(Properties newConfiguration)
+            throws ExecException {
         // there is nothing to do here.
     }
-        
+
     public Map<String, Object> getStatistics() throws ExecException {
         throw new UnsupportedOperationException();
     }
 
-    
-    public PhysicalPlan compile(LogicalPlan plan,
-                                Properties properties) throws ExecException {
+    // public PhysicalPlan compile(LogicalPlan plan,
+    // Properties properties) throws ExecException {
+    // if (plan == null) {
+    // throw new ExecException("No Plan to compile");
+    // }
+    //
+    // try {
+    // LogToPhyTranslationVisitor translator =
+    // new LogToPhyTranslationVisitor(plan);
+    // translator.setPigContext(pigContext);
+    // translator.visit();
+    // return translator.getPhysicalPlan();
+    // } catch (VisitorException ve) {
+    // throw new ExecException(ve);
+    // }
+    // }
+
+    public PhysicalPlan compile(LogicalPlan plan, Properties properties)
+            throws ExecException {
         if (plan == null) {
             throw new ExecException("No Plan to compile");
         }
 
         try {
-            LogToPhyTranslationVisitor translator = 
-                new LogToPhyTranslationVisitor(plan);
+            LocalLogToPhyTranslationVisitor translator = new LocalLogToPhyTranslationVisitor(
+                    plan);
             translator.setPigContext(pigContext);
             translator.visit();
             return translator.getPhysicalPlan();
@@ -125,42 +142,45 @@
         }
     }
 
-    public ExecJob execute(PhysicalPlan plan,
-                            String jobName) throws ExecException {
+    public ExecJob execute(PhysicalPlan plan, String jobName)
+            throws ExecException {
         try {
-            PhysicalOperator leaf = (PhysicalOperator)plan.getLeaves().get(0);
+            PhysicalOperator leaf = (PhysicalOperator) plan.getLeaves().get(0);
             FileSpec spec = null;
-            if(!(leaf instanceof POStore)){
+            if (!(leaf instanceof POStore)) {
                 String scope = leaf.getOperatorKey().getScope();
                 POStore str = new POStore(new OperatorKey(scope,
-                    NodeIdGenerator.getGenerator().getNextNodeId(scope)));
+                        NodeIdGenerator.getGenerator().getNextNodeId(scope)));
                 str.setPc(pigContext);
                 spec = new FileSpec(FileLocalizer.getTemporaryPath(null,
-                    pigContext).toString(),
-                    new FuncSpec(BinStorage.class.getName()));
+                        pigContext).toString(), new FuncSpec(BinStorage.class
+                        .getName()));
                 str.setSFile(spec);
                 plan.addAsLeaf(str);
-            }
-            else{
-                spec = ((POStore)leaf).getSFile();
+            } else {
+                spec = ((POStore) leaf).getSFile();
             }
 
-            LocalLauncher launcher = new LocalLauncher();
+            // LocalLauncher launcher = new LocalLauncher();
+            LocalPigLauncher launcher = new LocalPigLauncher();
             boolean success = launcher.launchPig(plan, jobName, pigContext);
-            if(success)
-                return new LocalJob(ExecJob.JOB_STATUS.COMPLETED, pigContext, spec);
+            if (success)
+                return new LocalJob(ExecJob.JOB_STATUS.COMPLETED, pigContext,
+                        spec);
             else
                 return new LocalJob(ExecJob.JOB_STATUS.FAILED, pigContext, null);
         } catch (Exception e) {
-            // There are a lot of exceptions thrown by the launcher.  If this
-            // is an ExecException, just let it through.  Else wrap it.
-            if (e instanceof ExecException) throw (ExecException)e;
-            else throw new ExecException(e.getMessage(), e);
+            // There are a lot of exceptions thrown by the launcher. If this
+            // is an ExecException, just let it through. Else wrap it.
+            if (e instanceof ExecException)
+                throw (ExecException) e;
+            else
+                throw new ExecException(e.getMessage(), e);
         }
     }
 
-    public LocalJob submit(PhysicalPlan plan,
-                           String jobName) throws ExecException {
+    public LocalJob submit(PhysicalPlan plan, String jobName)
+            throws ExecException {
         throw new UnsupportedOperationException();
     }
 
@@ -172,33 +192,32 @@
 
             ExecTools.checkLeafIsStore(plan, pigContext);
 
-            LocalLauncher launcher = new LocalLauncher();
+            // LocalLauncher launcher = new LocalLauncher();
+            LocalPigLauncher launcher = new LocalPigLauncher();
             launcher.explain(plan, pigContext, stream);
         } catch (Exception ve) {
             throw new RuntimeException(ve);
         }
     }
 
-    public Collection<ExecJob> runningJobs(Properties properties) throws ExecException {
+    public Collection<ExecJob> runningJobs(Properties properties)
+            throws ExecException {
         return new HashSet<ExecJob>();
     }
-    
+
     public Collection<String> activeScopes() throws ExecException {
         throw new UnsupportedOperationException();
     }
-    
+
     public void reclaimScope(String scope) throws ExecException {
         throw new UnsupportedOperationException();
     }
-    
+
     private OperatorKey doCompile(OperatorKey logicalKey,
-                                  Map<OperatorKey, LogicalOperator> logicalOpTable,
-                                  Properties properties) 
-            throws ExecException {
-        
+            Map<OperatorKey, LogicalOperator> logicalOpTable,
+            Properties properties) throws ExecException {
+
         return null;
     }
-    
-}
-
 
+}

Added: hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java?rev=725846&view=auto
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java (added)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/LocalPigLauncher.java Thu Dec 11 14:29:29 2008
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.backend.local.executionengine;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.JobCreationException;
+import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.Launcher;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+
+public class LocalPigLauncher extends Launcher {
+
+    Log log = LogFactory.getLog(getClass());
+
+    @Override
+    public void explain(PhysicalPlan pp, PigContext pc, PrintStream ps)
+            throws PlanException, VisitorException, IOException {
+        // TODO Auto-generated method stub
+        pp.explain(ps);
+        ps.append('\n');
+    }
+
+    @Override
+    public boolean launchPig(PhysicalPlan php, String grpName, PigContext pc)
+            throws PlanException, VisitorException, IOException, ExecException,
+            JobCreationException {
+        // TODO Auto-generated method stub
+        List<PhysicalOperator> stores = php.getLeaves();
+        int noJobs = stores.size();
+        int failedJobs = 0;
+
+        for (PhysicalOperator op : stores) {
+            POStore store = (POStore) op;
+            Result res = store.store();
+            if (res.returnStatus != POStatus.STATUS_EOP)
+                failedJobs++;
+        }
+
+        if (failedJobs == 0) {
+            log.info("100% complete!");
+            log.info("Success!!");
+            return true;
+        } else {
+            log.info("Failed jobs!!");
+            log.info(failedJobs + " out of " + noJobs + " failed!");
+        }
+        return false;
+
+    }
+
+}

Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java?rev=725846&r1=725845&r2=725846&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/LocalLogToPhyTranslationVisitor.java Thu Dec 11 14:29:29 2008
@@ -26,15 +26,18 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.LogToPhyTranslationVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrangeForIllustrate;
 import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POCogroup;
+import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POCross;
 import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POSplit;
 import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POSplitOutput;
+import org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators.POStreamLocal;
 import org.apache.pig.data.DataType;
 import org.apache.pig.impl.logicalLayer.LOCogroup;
+import org.apache.pig.impl.logicalLayer.LOCross;
 import org.apache.pig.impl.logicalLayer.LOSplit;
 import org.apache.pig.impl.logicalLayer.LOSplitOutput;
+import org.apache.pig.impl.logicalLayer.LOStream;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.plan.OperatorKey;
@@ -62,7 +65,7 @@
         List<LogicalOperator> inputs = cg.getInputs();
         
         POCogroup poc = new POCogroup(new OperatorKey(scope, nodeGen.getNextNodeId(scope)), cg.getRequestedParallelism());
-        
+        poc.setInner(cg.getInner());
         currentPlan.add(poc);
         
         int count = 0;
@@ -159,5 +162,48 @@
             throw new VisitorException(e);
         }
     }
+    
+    @Override
+    public void visit(LOStream stream) throws VisitorException {
+        String scope = stream.getOperatorKey().scope;
+        POStreamLocal poStream = new POStreamLocal(new OperatorKey(scope, nodeGen
+                .getNextNodeId(scope)), stream.getExecutableManager(), 
+                stream.getStreamingCommand(), pc.getProperties());
+        currentPlan.add(poStream);
+        LogToPhyMap.put(stream, poStream);
+        
+        List<LogicalOperator> op = stream.getPlan().getPredecessors(stream);
+
+        PhysicalOperator from = LogToPhyMap.get(op.get(0));
+        try {
+            currentPlan.connect(from, poStream);
+        } catch (PlanException e) {
+            log.error("Invalid physical operators in the physical plan"
+                    + e.getMessage());
+            throw new VisitorException(e);
+        }
+    }
+    
+    @Override
+    public void visit(LOCross cross) throws VisitorException {
+        String scope = cross.getOperatorKey().scope;
+        
+        POCross pocross = new POCross(new OperatorKey(scope, nodeGen.getNextNodeId(scope)));
+        LogToPhyMap.put(cross, pocross);
+        currentPlan.add(pocross);
+        
+        
+        for(LogicalOperator in : cross.getInputs()) {
+            PhysicalOperator from = LogToPhyMap.get(in);
+            try {
+                currentPlan.connect(from, pocross);
+            } catch (PlanException e) {
+                log.error("Invalid physical operators in the physical plan"
+                        + e.getMessage());
+                throw new VisitorException(e);
+            }
+        }
+        //currentPlan.explain(System.out);
+    }
 
 }

Modified: hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java?rev=725846&r1=725845&r2=725846&view=diff
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java (original)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCogroup.java Thu Dec 11 14:29:29 2008
@@ -51,6 +51,7 @@
     
     Tuple[] data = null;
     Iterator<Tuple>[] its = null;
+    boolean[] inner;
 
     public POCogroup(OperatorKey k) {
 	super(k);
@@ -69,6 +70,10 @@
     public POCogroup(OperatorKey k, int rp, List<PhysicalOperator> inp) {
 	super(k, rp, inp);
     }
+    
+    public void setInner(boolean[] inner) {
+        this.inner = inner;
+    }
 
     @Override
     public void visit(PhyPlanVisitor v) throws VisitorException {
@@ -149,6 +154,14 @@
 	    res.result = output;
 	
 	res.returnStatus = POStatus.STATUS_OK;
+//    System.out.println(output);
+	for(int i = 0; i < size; i++) {
+	    if(inner != null && inner[i] && ((DataBag)output.get(i+1)).size() == 0) {
+	        res.returnStatus = POStatus.STATUS_NULL;
+	        break;
+	    }
+	}
+	
 	
 	return res;
     }
@@ -160,15 +173,18 @@
 	for(int i = 0; i < size; i++) {
 	    DataBag bag = new SortedDataBag(new groupComparator());
 	    for(Result input = inputs.get(i).getNext(dummyTuple); input.returnStatus != POStatus.STATUS_EOP; input = inputs.get(i).getNext(dummyTuple)) {
-		if(input.returnStatus == POStatus.STATUS_ERR) {
-		    throw new ExecException("Error accumulating output at local Cogroup operator");
-		}
-		bag.add((Tuple) input.result);
+	        if(input.returnStatus == POStatus.STATUS_ERR) {
+	            throw new ExecException("Error accumulating output at local Cogroup operator");
+	        }
+	        if(input.returnStatus == POStatus.STATUS_NULL)
+	            continue;
+	        bag.add((Tuple) input.result);
 	    }
+	    
 	    its[i] = bag.iterator();
 	    data[i] = its[i].next();
 	}
-	
+
     }
     
 //    private Tuple getSmallest(Tuple[] data) {
@@ -191,7 +207,7 @@
 		t = data[i];
 		continue; //since the previous data was probably null so we dont really need a comparison
 	    }
-	    if(comp.compare(t, (Tuple) data[i]) < 0) 
+	    if(comp.compare(t, (Tuple) data[i]) > 0) 
 		t = data[i];
 	}
 	return t;

Added: hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCross.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCross.java?rev=725846&view=auto
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCross.java (added)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POCross.java Thu Dec 11 14:29:29 2008
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.ExampleTuple;
+
+/**
+ * This is a local implementation of the cross. Its a blocking operator.
+ * It accumulates inputs into databags and then applies logic similar to 
+ * foreach flatten(*) to get the output tuples
+ * 
+ * @author shubhamc
+ *
+ */
+public class POCross extends PhysicalOperator {
+    
+    DataBag [] inputBags;
+    Tuple [] data;
+    Iterator [] its;
+
+    public POCross(OperatorKey k) {
+        super(k);
+        // TODO Auto-generated constructor stub
+    }
+
+    public POCross(OperatorKey k, int rp) {
+        super(k, rp);
+        // TODO Auto-generated constructor stub
+    }
+
+    public POCross(OperatorKey k, List<PhysicalOperator> inp) {
+        super(k, inp);
+        // TODO Auto-generated constructor stub
+    }
+
+    public POCross(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+        super(k, rp, inp);
+        // TODO Auto-generated constructor stub
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        // TODO Auto-generated method stub
+        v.visitCross(this);
+
+    }
+    
+    @Override
+    public Result getNext(Tuple t) throws ExecException {
+        Result res = new Result();
+        int noItems = inputs.size();
+        if(inputBags == null) {
+            accumulateData();
+        }
+        
+        if(its != null) {
+            //we check if we are done with processing
+            //we do that by checking if all the iterators are used up
+            boolean finished = true;
+            for(int i = 0; i < its.length; i++) {
+                finished &= !its[i].hasNext();
+            }
+            if(finished) {
+                res.returnStatus = POStatus.STATUS_EOP;
+                return res;
+            }
+            
+        }
+        
+        if(data == null) {
+            //getNext being called for the first time or starting on new input data
+            //we instantiate the template array and start populating it with data
+            data = new Tuple[noItems];
+            for(int i = 0; i < noItems; ++i) {
+                data[i] = (Tuple) its[i].next();
+
+            }
+            res.result = CreateTuple(data);
+            res.returnStatus = POStatus.STATUS_OK;
+            return res;
+        } else {
+            for(int index = noItems - 1; index >= 0; --index) {
+                if(its[index].hasNext()) {
+                    data[index] =  (Tuple) its[index].next();
+                    res.result = CreateTuple(data);
+                    res.returnStatus = POStatus.STATUS_OK;
+                    return res;
+                }
+                else{
+                    // reset this index's iterator so cross product can be achieved
+                    // we would be resetting this way only for the indexes from the end
+                    // when the first index which needs to be flattened has reached the
+                    // last element in its iterator, we won't come here - instead, we reset
+                    // all iterators at the beginning of this method.
+                    its[index] = (inputBags[index]).iterator();
+                    data[index] = (Tuple) its[index].next();
+                }
+
+            }
+        }
+        
+        return null;
+    }
+    
+    private void accumulateData() throws ExecException {
+        int count = 0;
+        inputBags = new DataBag[inputs.size()];
+        
+        its = new Iterator[inputs.size()];
+        for(PhysicalOperator op : inputs) {
+            DataBag bag = BagFactory.getInstance().newDefaultBag();
+            inputBags[count] = bag;
+            for(Result res = op.getNext(dummyTuple); res.returnStatus != POStatus.STATUS_EOP; res = op.getNext(dummyTuple)) {
+                if(res.returnStatus == POStatus.STATUS_NULL)
+                    continue;
+                if(res.returnStatus == POStatus.STATUS_ERR)
+                    throw new ExecException("Error accumulating data in the local Cross operator");
+                if(res.returnStatus == POStatus.STATUS_OK)
+                    bag.add((Tuple) res.result);
+            }
+            its[count++] = bag.iterator();
+        }
+    }
+    
+    private Tuple CreateTuple(Tuple[] data) throws ExecException {
+        Tuple out =  TupleFactory.getInstance().newTuple();
+        
+        for(int i = 0; i < data.length; ++i) {
+            Tuple t = data[i];
+            int size = t.size();
+            for(int j = 0; j < size; ++j) {
+                out.append(t.get(j));
+            }
+
+        }
+        
+        if(lineageTracer != null) {
+            ExampleTuple tOut = new ExampleTuple();
+            tOut.reference(out);
+            lineageTracer.insert(tOut);
+            for(int i = 0; i < data.length; i++) {
+                lineageTracer.union(tOut, data[i]);
+            }
+            return tOut;
+        }
+        return out;
+    }
+
+    @Override
+    public String name() {
+        // TODO Auto-generated method stub
+        return "POCrossLocal" + " - " + mKey.toString();
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        // TODO Auto-generated method stub
+        return true;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        // TODO Auto-generated method stub
+        return false;
+    }
+
+}

Added: hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POStreamLocal.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POStreamLocal.java?rev=725846&view=auto
==============================================================================
--- hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POStreamLocal.java (added)
+++ hadoop/pig/branches/types/src/org/apache/pig/backend/local/executionengine/physicalLayer/relationalOperators/POStreamLocal.java Thu Dec 11 14:29:29 2008
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.local.executionengine.physicalLayer.relationalOperators;
+
+import java.util.Properties;
+
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.streaming.ExecutableManager;
+import org.apache.pig.impl.streaming.StreamingCommand;
+
+public class POStreamLocal extends POStream {
+
+    /**
+     * 
+     */
+    private static final long serialVersionUID = 2L;
+
+    public POStreamLocal(OperatorKey k, ExecutableManager executableManager,
+            StreamingCommand command, Properties properties) {
+        super(k, executableManager, command, properties);
+        // TODO Auto-generated constructor stub
+    }
+    
+    
+    /**
+     * This is different from the Map-Reduce implementation of the POStream since there is no
+     * push model here. POStatus_EOP signals the end of input and can be used to decide when 
+     * to stop the stdin to the process
+     */
+    @Override
+    public Result getNext(Tuple t) throws ExecException {
+        // The POStream Operator works with ExecutableManager to
+        // send input to the streaming binary and to get output
+        // from it. To achieve a tuple oriented behavior, two queues
+        // are used - one for output from the binary and one for 
+        // input to the binary. In each getNext() call:
+        // 1) If there is no more output expected from the binary, an EOP is
+        // sent to successor
+        // 2) If there is any output from the binary in the queue, it is passed
+        // down to the successor
+        // 3) if neither of these two are true and if it is possible to
+        // send input to the binary, then the next tuple from the
+        // predecessor is got and passed to the binary
+        try {
+            // if we are being called AFTER all output from the streaming 
+            // binary has already been sent to us then just return EOP
+            // The "allOutputFromBinaryProcessed" flag is set when we see
+            // an EOS (End of Stream output) from streaming binary
+            if(allOutputFromBinaryProcessed) {
+                return new Result(POStatus.STATUS_EOP, null);
+            }
+            
+            // if we are here AFTER all map() calls have been completed
+            // AND AFTER we process all possible input to be sent to the
+            // streaming binary, then all we want to do is read output from
+            // the streaming binary
+            if(allInputFromPredecessorConsumed) {
+                Result r = binaryOutputQueue.take();
+                if(r.returnStatus == POStatus.STATUS_EOS) {
+                    // If we received EOS, it means all output
+                    // from the streaming binary has been sent to us
+                    // So we can send an EOP to the successor in
+                    // the pipeline. Also since we are being called
+                    // after all input from predecessor has been processed
+                    // it means we got here from a call from close() in
+                    // map or reduce. So once we send this EOP down, 
+                    // getNext() in POStream should never be called. So
+                    // we don't need to set any flag noting we saw all output
+                    // from binary
+                    r.returnStatus = POStatus.STATUS_EOP;
+                }
+                return(r);
+            }
+            
+            // if we are here, we haven't consumed all input to be sent
+            // to the streaming binary - check if we are being called
+            // from close() on the map or reduce
+            //if(this.parentPlan.endOfAllInput) {
+                Result r = getNextHelper(t);
+                if(r.returnStatus == POStatus.STATUS_EOP) {
+                    // we have now seen *ALL* possible input
+                    // check if we ever had any real input
+                    // in the course of the map/reduce - if we did
+                    // then "initialized" will be true. If not, just
+                    // send EOP down.
+                    if(initialized) {
+                        // signal End of ALL input to the Executable Manager's 
+                        // Input handler thread
+                        binaryInputQueue.put(r);
+                        // note this state for future calls
+                        allInputFromPredecessorConsumed  = true;
+                        // look for output from binary
+                        r = binaryOutputQueue.take();
+                        if(r.returnStatus == POStatus.STATUS_EOS) {
+                            // If we received EOS, it means all output
+                            // from the streaming binary has been sent to us
+                            // So we can send an EOP to the successor in
+                            // the pipeline. Also since we are being called
+                            // after all input from predecessor has been processed
+                            // it means we got here from a call from close() in
+                            // map or reduce. So once we send this EOP down, 
+                            // getNext() in POStream should never be called. So
+                            // we don't need to set any flag noting we saw all output
+                            // from binary
+                            r.returnStatus = POStatus.STATUS_EOP;
+                        }
+                    }
+                    
+                } else if(r.returnStatus == POStatus.STATUS_EOS) {
+                    // If we received EOS, it means all output
+                    // from the streaming binary has been sent to us
+                    // So we can send an EOP to the successor in
+                    // the pipeline. Also we are being called
+                    // from close() in map or reduce (this is so because
+                    // only then this.parentPlan.endOfAllInput is true).
+                    // So once we send this EOP down, getNext() in POStream
+                    // should never be called. So we don't need to set any 
+                    // flag noting we saw all output from binary
+                    r.returnStatus = POStatus.STATUS_EOP;
+                }
+                return r;
+//            } else {
+//                // we are not being called from close() - so
+//                // we must be called from either map() or reduce()
+//                // get the next Result from helper
+//                Result r = getNextHelper(t);
+//                if(r.returnStatus == POStatus.STATUS_EOS) {
+//                    // If we received EOS, it means all output
+//                    // from the streaming binary has been sent to us
+//                    // So we can send an EOP to the successor in
+//                    // the pipeline and also note this condition
+//                    // for future calls
+//                    r.returnStatus = POStatus.STATUS_EOP;
+//                    allOutputFromBinaryProcessed  = true;
+//                }
+//                return r;
+//            }
+            
+        } catch(Exception e) {
+            throw new ExecException("Error while trying to get next result in POStream", e);
+        }
+            
+        
+    }
+
+    
+
+}

Modified: hadoop/pig/branches/types/test/org/apache/pig/test/PigExecTestCase.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/PigExecTestCase.java?rev=725846&r1=725845&r2=725846&view=diff
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/PigExecTestCase.java (original)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/PigExecTestCase.java Thu Dec 11 14:29:29 2008
@@ -32,7 +32,7 @@
 
     protected final Log log = LogFactory.getLog(getClass());
     
-    protected ExecType execType = MAPREDUCE;
+    protected ExecType execType = LOCAL;
     
     private MiniCluster cluster;
     protected PigServer pigServer;

Added: hadoop/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEvalLocal.java
URL: http://svn.apache.org/viewvc/hadoop/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEvalLocal.java?rev=725846&view=auto
==============================================================================
--- hadoop/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEvalLocal.java (added)
+++ hadoop/pig/branches/types/test/org/apache/pig/test/TestAlgebraicEvalLocal.java Thu Dec 11 14:29:29 2008
@@ -0,0 +1,305 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
+import java.util.Iterator;
+import java.util.Random;
+
+import junit.framework.TestCase;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.PigServer;
+import org.apache.pig.builtin.PigStorage;
+import org.apache.pig.data.DataByteArray;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+
+public class TestAlgebraicEvalLocal extends TestCase {
+    
+    private int LOOP_COUNT = 512;
+
+
+    private PigServer pig;
+    
+    @Before
+    @Override
+    protected void setUp() throws Exception {
+        pig = new PigServer("local");
+    }
+    
+    Boolean[] nullFlags = new Boolean[]{ false, true};
+
+    //MiniCluster cluster = MiniCluster.buildCluster();
+    @Test
+    public void testGroupCountWithMultipleFields() throws Throwable {
+        File tmpFile = File.createTempFile("test", "txt");
+        for (int k = 0; k < nullFlags.length; k++) {
+            System.err.println("Running testGroupCountWithMultipleFields with nullFlags set to " + nullFlags[k]);
+            // flag to indicate if both the keys forming
+            // the group key are null
+            int groupKeyWithNulls = 0;
+            if(nullFlags[k] == false) {
+                // generate data with no nulls
+                PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+                for(int i = 0; i < LOOP_COUNT; i++) {
+                    for(int j=0; j< LOOP_COUNT; j++) {
+                            ps.println(i + "\t" + i + "\t" + j%2);
+                    }
+                }
+                ps.close();
+            } else {
+                // generate data with nulls                
+                PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+                Random r = new Random();
+                for(int i = 0; i < LOOP_COUNT; i++) {
+                    int rand = r.nextInt(LOOP_COUNT);
+                    if(rand <= (0.2 * LOOP_COUNT) ) {
+                        for(int j=0; j< LOOP_COUNT; j++) {
+                            ps.println("\t" + i + "\t" + j%2);
+                        }
+                    } else if (rand > (0.2 * LOOP_COUNT) && rand <= (0.4 * LOOP_COUNT)) {
+                        for(int j=0; j< LOOP_COUNT; j++) {
+                            ps.println(i + "\t" + "\t" + j%2);
+                        }
+                    } else if (rand > (0.4 * LOOP_COUNT) && rand <= (0.6 * LOOP_COUNT)) {
+                        for(int j=0; j< LOOP_COUNT; j++) {
+                            ps.println("\t" + "\t" + j%2);                            
+                        }
+                        groupKeyWithNulls++;
+                    } else {
+                        for(int j=0; j< LOOP_COUNT; j++) {
+                            ps.println(i + "\t" + i + "\t" + j%2);
+                        }
+                    }                    
+                }
+                ps.close();                
+            }
+            pig.registerQuery(" a = group (load '" + Util.generateURI(tmpFile.toString()) + "') by ($0,$1);");
+            pig.registerQuery("b = foreach a generate flatten(group), SUM($1.$2);");
+            Iterator<Tuple> it = pig.openIterator("b");
+            int count = 0;
+            System.err.println("XX Starting");
+            while(it.hasNext()){
+                Tuple t = it.next();
+            System.err.println("XX "+ t);
+                int sum = ((Double)t.get(2)).intValue();
+                // if the first two fields (output of flatten(group))
+                // are both nulls then we should change the sum accordingly
+                if(t.get(0) == null && t.get(1) == null)                
+                    assertEquals( "Running testGroupCountWithMultipleFields with nullFlags set to " + nullFlags[k],
+                             (LOOP_COUNT/2)*groupKeyWithNulls, sum);
+                else
+                    assertEquals("Running testGroupCountWithMultipleFields with nullFlags set to " + nullFlags[k],
+                            LOOP_COUNT/2, sum);
+                    
+                count++;
+            }
+            System.err.println("XX done");
+            if(groupKeyWithNulls == 0)
+                assertEquals("Running testGroupCountWithMultipleFields with nullFlags set to " + nullFlags[k], LOOP_COUNT, count);
+            else
+                assertEquals("Running testGroupCountWithMultipleFields with nullFlags set to " + nullFlags[k], LOOP_COUNT - groupKeyWithNulls + 1, count);
+            
+        }
+        tmpFile.delete();
+        
+    }
+    
+    @Test
+    public void testSimpleCount() throws Exception {
+        File tmpFile = File.createTempFile("test", "txt");
+        for (int i = 0; i < nullFlags.length; i++) {
+            System.err.println("Testing testSimpleCount with null flag:" + nullFlags[i]);
+        
+            PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+            int numNulls = generateInput(ps, nullFlags[i]);
+            String query = "myid =  foreach (group (load '" + Util.generateURI(tmpFile.toString()) + "') all) generate COUNT($1);";
+            System.out.println(query);
+            pig.registerQuery(query);
+            Iterator it = pig.openIterator("myid");
+            tmpFile.delete();
+            Tuple t = (Tuple)it.next();
+            Long count = DataType.toLong(t.get(0));
+            assertEquals(this.getName() + "with nullFlags set to: " + nullFlags[i], count.longValue(), LOOP_COUNT);
+        }
+    }
+
+    @Test
+    public void testGroupCount() throws Throwable {
+        File tmpFile = File.createTempFile("test", "txt");
+        for (int i = 0; i < nullFlags.length; i++) {
+            System.err.println("Testing testGroupCount with null flag:" + nullFlags[i]);
+        
+            PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+            int numNulls = generateInput(ps, nullFlags[i]);
+            String query = "myid = foreach (group (load '" + Util.generateURI(tmpFile.toString()) + "') all) generate group, COUNT($1) ;";
+            System.out.println(query);
+            pig.registerQuery(query);
+            Iterator it = pig.openIterator("myid");
+            tmpFile.delete();
+            Tuple t = (Tuple)it.next();
+            Long count = DataType.toLong(t.get(1));
+            assertEquals(this.getName() + "with nullFlags set to: " + nullFlags[i], count.longValue(), LOOP_COUNT);
+        }
+    }
+    
+    @Test
+    public void testGroupReorderCount() throws Throwable {
+        File tmpFile = File.createTempFile("test", "txt");
+        for (int i = 0; i < nullFlags.length; i++) {
+            System.err.println("Testing testGroupCount with null flag:" + nullFlags[i]);
+            PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+            int numNulls = generateInput(ps, nullFlags[i]);
+            String query = "myid = foreach (group (load '" + Util.generateURI(tmpFile.toString()) + "') all) generate COUNT($1), group ;";
+            System.out.println(query);
+            pig.registerQuery(query);
+            Iterator it = pig.openIterator("myid");
+            tmpFile.delete();
+            Tuple t = (Tuple)it.next();
+            Long count = DataType.toLong(t.get(0));
+            assertEquals(this.getName() + "with nullFlags set to: " + nullFlags[i], count.longValue(), LOOP_COUNT);
+        }
+    }
+
+
+
+    @Test
+    public void testGroupUniqueColumnCount() throws Throwable {
+        File tmpFile = File.createTempFile("test", "txt");
+        for (int i = 0; i < nullFlags.length; i++) {
+            PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+            long groupsize = 0;
+            if(nullFlags[i] == false) {
+                // generate data without nulls
+                for(int j = 0; j < LOOP_COUNT; j++) {
+                    if(j%10 == 0) groupsize++;
+                    ps.println(j%10 + ":" + j);
+                }
+            } else {
+                // generate data with nulls
+                for(int j = 0; j < LOOP_COUNT; j++) {
+                    if(j%10 == 0) groupsize++;
+                    if(j % 20 == 0) {
+                        // for half the groups
+                        // emit nulls
+                        ps.println(j%10 + ":");
+                    } else {
+                        ps.println(j%10 + ":" + j);
+                    }
+                }
+            }         
+            ps.close();
+            String query = "myid = foreach (group (load '" + Util.generateURI(tmpFile.toString()) + "' using " + PigStorage.class.getName() + "(':')) by $0) generate group, COUNT($1.$1) ;";
+            System.out.println(query);
+            pig.registerQuery(query);
+            Iterator it = pig.openIterator("myid");
+            tmpFile.delete();
+            System.err.println("Output from testGroupUniqueColumnCount");
+            while(it.hasNext()) {
+                Tuple t = (Tuple)it.next();
+                System.err.println(t);
+                String a = t.get(0).toString();
+                Double group = Double.valueOf(a.toString());
+                if(group == 0.0) {
+                    Long count = DataType.toLong(t.get(1));
+                    // right now count with nulls is same as
+                    // count without nulls
+                    assertEquals(this.getName() + "with nullFlags set to: " + nullFlags[i], groupsize, count.longValue());                    
+                }
+            }
+        }
+    }
+
+    @Test
+    public void testGroupDuplicateColumnCount() throws Throwable {
+        File tmpFile = File.createTempFile("test", "txt");
+        for (int i = 0; i < nullFlags.length; i++) {
+            PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+            long groupsize = 0;
+            if(nullFlags[i] == false) {
+                // generate data without nulls
+                for(int j = 0; j < LOOP_COUNT; j++) {
+                    if(j%10 == 0) groupsize++;
+                    ps.println(j%10 + ":" + j);
+                }
+            } else {
+                // generate data with nulls
+                for(int j = 0; j < LOOP_COUNT; j++) {
+                    if(j%10 == 0) groupsize++;
+                    if(j % 20 == 0) {
+                        // for half the groups
+                        // emit nulls
+                        ps.println(j%10 + ":");
+                    } else {
+                        ps.println(j%10 + ":" + j);
+                    }
+                }
+            }
+            ps.close();
+            String query = "myid = foreach (group (load '" + Util.generateURI(tmpFile.toString()) + "' using " + PigStorage.class.getName() + "(':')) by $0) generate group, COUNT($1.$1), COUNT($1.$0) ;";
+            System.out.println(query);
+            pig.registerQuery(query);
+            Iterator it = pig.openIterator("myid");
+            tmpFile.delete();
+            System.err.println("Output from testGroupDuplicateColumnCount");
+            while(it.hasNext()) {
+                Tuple t = (Tuple)it.next();
+                System.err.println(t);
+                String a = t.get(0).toString();
+                Double group = Double.valueOf(a.toString());
+                if(group == 0.0) {
+                    // right now count with nulls is same
+                    // as count without nulls
+                    Long count = DataType.toLong(t.get(2));
+                    assertEquals(this.getName() + "with nullFlags set to: " + nullFlags[i],groupsize, count.longValue());
+                    count = DataType.toLong(t.get(1));
+                    assertEquals(this.getName() + "with nullFlags set to: " + nullFlags[i],groupsize, count.longValue());
+                }
+            }
+        }
+    }
+    
+    private int generateInput(PrintStream ps, boolean withNulls ) {
+        int numNulls = 0;
+        if(withNulls) {
+            // inject nulls randomly
+            for(int i = 0; i < LOOP_COUNT; i++) {
+                int rand = new Random().nextInt(LOOP_COUNT);
+                if(rand <= (0.3 * LOOP_COUNT) ) {
+                    ps.println(":");
+                    numNulls++;
+                } else {
+                    ps.println(i + ":" + i);
+                }
+            }
+        } else {
+            for(int i = 0; i < LOOP_COUNT; i++) {
+                ps.println(i + ":" + i);
+            }
+        }
+        ps.close();
+        return numNulls;
+    }
+
+}