You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2009/10/22 21:32:22 UTC

svn commit: r828825 - in /hadoop/pig/trunk: ./ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ sr...

Author: gates
Date: Thu Oct 22 19:32:22 2009
New Revision: 828825

URL: http://svn.apache.org/viewvc?rev=828825&view=rev
Log:
PIG-984:  Add map side grouping for data that is already collected when it is read into the map.

Added:
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java
Modified:
    hadoop/pig/trunk/CHANGES.txt
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
    hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt

Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=828825&r1=828824&r2=828825&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Thu Oct 22 19:32:22 2009
@@ -26,6 +26,9 @@
 
 IMPROVEMENTS
 
+PIG-984:  Add map side grouping for data that is already collected when
+it is read into the map (rding via gates).
+
 PIG-1025: Add ability to set job priority from Pig Latin script (kevinweil via
 gates)
 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java?rev=828825&r1=828824&r2=828825&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MRCompiler.java Thu Oct 22 19:32:22 2009
@@ -79,6 +79,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
 import org.apache.pig.impl.plan.CompilationMessageCollector;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -900,7 +901,22 @@
             throw new MRCompilerException(msg, errCode, PigException.BUG, e);
         }
     }
-    
+
+    public void visitCollectedGroup(POCollectedGroup op) throws VisitorException {
+        try{
+            nonBlocking(op);
+            List<PhysicalPlan> plans = op.getPlans();
+            if(plans!=null)
+                for(PhysicalPlan ep : plans)
+                    addUDFs(ep);
+            phyToMROpMap.put(op, curMROp);
+        }catch(Exception e){
+            int errCode = 2034;
+            String msg = "Error compiling operator " + op.getClass().getSimpleName();
+            throw new MRCompilerException(msg, errCode, PigException.BUG, e);
+        }
+    }
+
     @Override
     public void visitPOForEach(POForEach op) throws VisitorException{
         try{

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java?rev=828825&r1=828824&r2=828825&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java Thu Oct 22 19:32:22 2009
@@ -63,6 +63,12 @@
     }
 
     @Override
+    public void visitCollectedGroup(POCollectedGroup mg) throws VisitorException{
+        super.visitCollectedGroup(mg);
+        mg.setParentPlan(parent);
+    }
+
+    @Override
     public void visitGlobalRearrange(POGlobalRearrange gr) throws VisitorException{
         gr.setParentPlan(parent);
     }

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java?rev=828825&r1=828824&r2=828825&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java Thu Oct 22 19:32:22 2009
@@ -23,6 +23,7 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
 import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.VisitorException;
 
@@ -80,6 +81,13 @@
             // merge join present
             endOfAllInputFlag = true;
         }
+       
+        @Override
+        public void visitCollectedGroup(POCollectedGroup mg) throws VisitorException {
+            // map side group present
+            endOfAllInputFlag = true;
+        }
+
         /**
          * @return if end of all input is present
          */
@@ -87,4 +95,5 @@
             return endOfAllInputFlag;
         }
     }
-}
\ No newline at end of file
+}
+

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java?rev=828825&r1=828824&r2=828825&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/LogToPhyTranslationVisitor.java Thu Oct 22 19:32:22 2009
@@ -645,13 +645,23 @@
     
     @Override
     public void visit(LOCogroup cg) throws VisitorException {
-        boolean currentPhysicalPlan = false;
+            
+        if (cg.getGroupType() == LOCogroup.GROUPTYPE.COLLECTED) {
+
+            translateCollectedCogroup(cg);
+
+        } else {
+            
+            translateRegularCogroup(cg);
+        }
+    }
+    
+    private void translateRegularCogroup(LOCogroup cg) throws VisitorException {
         String scope = cg.getOperatorKey().scope;
         List<LogicalOperator> inputs = cg.getInputs();
-
+        
         POGlobalRearrange poGlobal = new POGlobalRearrange(new OperatorKey(
-                scope, nodeGen.getNextNodeId(scope)), cg
-                .getRequestedParallelism());
+                scope, nodeGen.getNextNodeId(scope)), cg.getRequestedParallelism());
         POPackage poPackage = new POPackage(new OperatorKey(scope, nodeGen
                 .getNextNodeId(scope)), cg.getRequestedParallelism());
 
@@ -669,8 +679,7 @@
         int count = 0;
         Byte type = null;
         for (LogicalOperator op : inputs) {
-            List<LogicalPlan> plans = (List<LogicalPlan>) cg.getGroupByPlans()
-                    .get(op);
+            List<LogicalPlan> plans = (List<LogicalPlan>)cg.getGroupByPlans().get(op);
             POLocalRearrange physOp = new POLocalRearrange(new OperatorKey(
                     scope, nodeGen.getNextNodeId(scope)), cg
                     .getRequestedParallelism());
@@ -682,9 +691,8 @@
                         .spawnChildWalker(lp);
                 pushWalker(childWalker);
                 mCurrentWalker.walk(this);
-                exprPlans.add((PhysicalPlan) currentPlan);
+                exprPlans.add(currentPlan);
                 popWalker();
-
             }
             currentPlan = currentPlans.pop();
             try {
@@ -697,8 +705,8 @@
             try {
                 physOp.setIndex(count++);
             } catch (ExecException e1) {
-            	int errCode = 2058;
-            	String msg = "Unable to set index on newly create POLocalRearrange.";
+                int errCode = 2058;
+                String msg = "Unable to set index on newly create POLocalRearrange.";
                 throw new VisitorException(msg, errCode, PigException.BUG, e1);
             }
             if (plans.size() > 1) {
@@ -720,8 +728,8 @@
                 String msg = "Invalid physical operators in the physical plan" ;
                 throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
             }
-
         }
+        
         poPackage.setKeyType(type);
         poPackage.setResultType(DataType.TUPLE);
         poPackage.setNumInps(count);
@@ -729,6 +737,59 @@
         logToPhyMap.put(cg, poPackage);
     }
     
+    private void translateCollectedCogroup(LOCogroup cg) throws VisitorException {
+        String scope = cg.getOperatorKey().scope;
+        List<LogicalOperator> inputs = cg.getInputs();
+        
+        // can have only one input
+        LogicalOperator op = inputs.get(0);
+        List<LogicalPlan> plans = (List<LogicalPlan>) cg.getGroupByPlans().get(op);
+        POCollectedGroup physOp = new POCollectedGroup(new OperatorKey(
+                scope, nodeGen.getNextNodeId(scope)));
+        
+        List<PhysicalPlan> exprPlans = new ArrayList<PhysicalPlan>();
+        currentPlans.push(currentPlan);
+        for (LogicalPlan lp : plans) {
+            currentPlan = new PhysicalPlan();
+            PlanWalker<LogicalOperator, LogicalPlan> childWalker = 
+                mCurrentWalker.spawnChildWalker(lp);
+            pushWalker(childWalker);
+            mCurrentWalker.walk(this);
+            exprPlans.add(currentPlan);
+            popWalker();
+        }
+        currentPlan = currentPlans.pop();
+        
+        try {
+            physOp.setPlans(exprPlans);
+        } catch (PlanException pe) {
+            int errCode = 2071;
+            String msg = "Problem with setting up map group's plans.";
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, pe);
+        }
+        Byte type = null;
+        if (plans.size() > 1) {
+            type = DataType.TUPLE;
+            physOp.setKeyType(type);
+        } else {
+            type = exprPlans.get(0).getLeaves().get(0).getResultType();
+            physOp.setKeyType(type);
+        }
+        physOp.setResultType(DataType.TUPLE);
+
+        currentPlan.add(physOp);
+              
+        try {
+            currentPlan.connect(logToPhyMap.get(op), physOp);
+        } catch (PlanException e) {
+            int errCode = 2015;
+            String msg = "Invalid physical operators in the physical plan" ;
+            throw new LogicalToPhysicalTranslatorException(msg, errCode, PigException.BUG, e);
+        }
+
+        logToPhyMap.put(cg, physOp);
+    }
+    
 	@Override
 	protected void visit(LOJoin loj) throws VisitorException {
 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=828825&r1=828824&r2=828825&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Thu Oct 22 19:32:22 2009
@@ -59,6 +59,15 @@
         visit();
         popWalker();
     }
+ 
+    public void visitCollectedGroup(POCollectedGroup mg) throws VisitorException{
+        List<PhysicalPlan> inpPlans = mg.getPlans();
+        for (PhysicalPlan plan : inpPlans) {
+            pushWalker(mCurrentWalker.spawnChildWalker(plan));
+            visit();
+            popWalker();
+        }
+    }
     
     public void visitLocalRearrange(POLocalRearrange lr) throws VisitorException{
         List<PhysicalPlan> inpPlans = lr.getPlans();

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java?rev=828825&r1=828824&r2=828825&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PlanPrinter.java Thu Oct 22 19:32:22 2009
@@ -156,6 +156,9 @@
           else if(node instanceof POLocalRearrange){
             sb.append(planString(((POLocalRearrange)node).getPlans()));
           }
+          else if(node instanceof POCollectedGroup){
+            sb.append(planString(((POCollectedGroup)node).getPlans()));
+          }
           else if(node instanceof POSort){
             sb.append(planString(((POSort)node).getSortPlans())); 
           }

Added: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java?rev=828825&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java Thu Oct 22 19:32:22 2009
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ExpressionOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.data.TupleFactory;
+import org.apache.pig.impl.io.PigNullableWritable;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * The collected group operator is a special operator used when users give
+ * the hint 'using "collected"' in a group by clause. It implements a map-side  
+ * group that collects all records for a given key into a buffer. When it sees 
+ * a key change it will emit the key and bag for records it had buffered. 
+ * It will assume that all keys for a given record are collected together 
+ * and thus there is not need to buffer across keys.
+ *
+ */
+public class POCollectedGroup extends PhysicalOperator {
+
+    private static final List<PhysicalPlan> EMPTY_PLAN_LIST = new ArrayList<PhysicalPlan>();
+
+    protected static final long serialVersionUID = 1L;
+
+    protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
+
+//    private Log log = LogFactory.getLog(getClass());
+
+    protected List<PhysicalPlan> plans;
+    
+    protected List<ExpressionOperator> leafOps;
+
+    protected byte keyType;
+
+    private Tuple output;
+
+    private DataBag outputBag = null;
+    
+    private Object prevKey = null;
+    
+    public POCollectedGroup(OperatorKey k) {
+        this(k, -1, null);
+    }
+
+    public POCollectedGroup(OperatorKey k, int rp) {
+        this(k, rp, null);
+    }
+
+    public POCollectedGroup(OperatorKey k, List<PhysicalOperator> inp) {
+        this(k, -1, inp);
+    }
+
+    public POCollectedGroup(OperatorKey k, int rp, List<PhysicalOperator> inp) {
+        super(k, rp, inp);
+        leafOps = new ArrayList<ExpressionOperator>();
+        output = mTupleFactory.newTuple(2);
+    }
+
+    @Override
+    public void visit(PhyPlanVisitor v) throws VisitorException {
+        v.visitCollectedGroup(this);
+    }
+
+    @Override
+    public String name() {
+        return "Map side group " + "[" + DataType.findTypeName(resultType) +
+            "]" + "{" + DataType.findTypeName(keyType) + "}" + " - " + mKey.toString();
+    }
+
+    @Override
+    public boolean supportsMultipleInputs() {
+        return false;
+    }
+
+    @Override
+    public boolean supportsMultipleOutputs() {
+        return false;
+    }
+
+    /**
+     * Overridden since the attachment of the new input should cause the old
+     * processing to end.
+     */
+    @Override
+    public void attachInput(Tuple t) {
+        super.attachInput(t);
+    }
+    
+    @SuppressWarnings("unchecked")
+    @Override
+    public Result getNext(Tuple t) throws ExecException {
+    
+        // Since the output is buffered, we need to flush the last
+        // set of records when the close method is called by mapper. 
+        if (this.parentPlan.endOfAllInput) {
+            if (outputBag != null) {
+                Tuple tup = mTupleFactory.newTuple(2);
+                tup.set(0, prevKey);
+                tup.set(1, outputBag);
+                outputBag = null;
+                return new Result(POStatus.STATUS_OK, tup);
+            } 
+                
+            return new Result(POStatus.STATUS_EOP, null);
+        }
+
+        Result inp = null;
+        Result res = null;
+
+        while (true) {
+            inp = processInput();
+            if (inp.returnStatus == POStatus.STATUS_EOP || 
+                    inp.returnStatus == POStatus.STATUS_ERR) {
+                break;
+            }
+            
+            if (inp.returnStatus == POStatus.STATUS_NULL) {
+                continue;
+            }
+            
+            for (PhysicalPlan ep : plans) {
+                ep.attachInput((Tuple)inp.result);
+            }
+            
+            List<Result> resLst = new ArrayList<Result>();
+            for (ExpressionOperator op : leafOps) {
+                
+                switch (op.getResultType()){
+                case DataType.BAG:
+                    res = op.getNext(dummyBag);
+                    break;
+                case DataType.BOOLEAN:
+                    res = op.getNext(dummyBool);
+                    break;
+                case DataType.BYTEARRAY:
+                    res = op.getNext(dummyDBA);
+                    break;
+                case DataType.CHARARRAY:
+                    res = op.getNext(dummyString);
+                    break;
+                case DataType.DOUBLE:
+                    res = op.getNext(dummyDouble);
+                    break;
+                case DataType.FLOAT:
+                    res = op.getNext(dummyFloat);
+                    break;
+                case DataType.INTEGER:
+                    res = op.getNext(dummyInt);
+                    break;
+                case DataType.LONG:
+                    res = op.getNext(dummyLong);
+                    break;
+                case DataType.MAP:
+                    res = op.getNext(dummyMap);
+                    break;
+                case DataType.TUPLE:
+                    res = op.getNext(dummyTuple);
+                    break;
+                }
+                if (res.returnStatus != POStatus.STATUS_OK) {
+                    return new Result();
+                }
+                resLst.add(res);
+            }
+            
+            Tuple tup = constructOutput(resLst,(Tuple)inp.result);
+            Object curKey = tup.get(0);
+
+            // the first time, just create a new buffer and continue.
+            if (prevKey == null && outputBag == null) {
+                prevKey = curKey;
+                outputBag = BagFactory.getInstance().newDefaultBag();
+                outputBag.add((Tuple)tup.get(1));
+                continue;
+            }
+            
+            // no key change
+            if (prevKey == null && curKey == null) {
+                outputBag.add((Tuple)tup.get(1));
+                continue;
+            }
+            
+            // no key change
+            if (prevKey != null && curKey != null && ((Comparable)curKey).compareTo(prevKey) == 0) {
+                outputBag.add((Tuple)tup.get(1));
+                continue;
+            } 
+            
+            // key change
+            Tuple tup2 = mTupleFactory.newTuple(2);
+            tup2.set(0, prevKey);
+            tup2.set(1, outputBag);
+            res.result = tup2;
+               
+            prevKey = curKey;
+            outputBag = BagFactory.getInstance().newDefaultBag();
+            outputBag.add((Tuple)tup.get(1));
+
+            return res;
+        }
+
+        return inp;
+    }
+    
+    protected Tuple constructOutput(List<Result> resLst, Tuple value) throws ExecException{
+        
+        // Construct key
+        Object key;
+        
+        if (resLst.size() > 1) {
+            Tuple t = mTupleFactory.newTuple(resLst.size());
+            int i = -1;
+            for (Result res : resLst) {
+                t.set(++i, res.result);
+            }
+            key = t;           
+        } 
+        else {
+            key = resLst.get(0).result;
+        }
+        
+        // Put key and value in a tuple and return
+        output.set(0, key);
+        output.set(1, value);
+                
+        return output;
+    }
+
+    public byte getKeyType() {
+        return keyType;
+    }
+
+    public void setKeyType(byte keyType) {
+        this.keyType = keyType;
+    }
+
+    public List<PhysicalPlan> getPlans() {
+        return (plans == null) ? EMPTY_PLAN_LIST : plans;
+    }
+
+    public void setPlans(List<PhysicalPlan> plans) throws PlanException {
+        this.plans = plans;
+        leafOps.clear();
+        for (PhysicalPlan plan : plans) {
+            ExpressionOperator leaf = (ExpressionOperator)plan.getLeaves().get(0); 
+            leafOps.add(leaf);
+        }            
+   }
+}

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java?rev=828825&r1=828824&r2=828825&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOCogroup.java Thu Oct 22 19:32:22 2009
@@ -18,6 +18,7 @@
 package org.apache.pig.impl.logicalLayer;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
@@ -43,6 +44,14 @@
 
 public class LOCogroup extends RelationalOperator {
     private static final long serialVersionUID = 2L;
+   
+    /**
+     * Enum for the type of group
+     */
+    public static enum GROUPTYPE {
+        REGULAR,    // Regular (co)group
+        COLLECTED   // Collected group
+    };
 
     /**
      * Cogroup contains a list of logical operators corresponding to the
@@ -53,6 +62,7 @@
     private boolean[] mIsInner;
     private static Log log = LogFactory.getLog(LOCogroup.class);
     private MultiMap<LogicalOperator, LogicalPlan> mGroupByPlans;
+    private GROUPTYPE mGroupType;
 
     /**
      * 
@@ -70,9 +80,34 @@
             OperatorKey k,
             MultiMap<LogicalOperator, LogicalPlan> groupByPlans,
             boolean[] isInner) {
+        this(plan, k, groupByPlans, GROUPTYPE.REGULAR, isInner);
+    }
+   
+    /**
+     * 
+     * @param plan
+     *            LogicalPlan this operator is a part of.
+     * @param k
+     *            OperatorKey for this operator
+     * @param groupByPlans
+     *            the group by columns
+     * @param type
+     *            the type of this group           
+     * @param isInner
+     *            indicates whether the cogroup is inner for each relation
+     */
+    public LOCogroup(
+            LogicalPlan plan,
+            OperatorKey k,
+            MultiMap<LogicalOperator, LogicalPlan> groupByPlans,
+            GROUPTYPE type,
+            boolean[] isInner) {
         super(plan, k);
         mGroupByPlans = groupByPlans;
-        mIsInner = isInner;
+        if (isInner != null) {
+            mIsInner = Arrays.copyOf(isInner, isInner.length);
+        }
+        mGroupType = type;
     }
 
     public List<LogicalOperator> getInputs() {
@@ -95,6 +130,10 @@
         mIsInner = inner;
     }
 
+    public GROUPTYPE getGroupType() {
+        return mGroupType;
+    }
+
     @Override
     public String name() {
         return "CoGroup " + mKey.scope + "-" + mKey.id;

Modified: hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt?rev=828825&r1=828824&r2=828825&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt (original)
+++ hadoop/pig/trunk/src/org/apache/pig/impl/logicalLayer/parser/QueryParser.jjt Thu Oct 22 19:32:22 2009
@@ -249,7 +249,7 @@
         return fname;
     }
 	
-	LogicalOperator parseCogroup(ArrayList<CogroupInput> gis, LogicalPlan lp) throws ParseException, PlanException{
+	LogicalOperator parseCogroup(ArrayList<CogroupInput> gis, LogicalPlan lp, LOCogroup.GROUPTYPE type) throws ParseException, PlanException{
 		
 		log.trace("Entering parseCogroup");
 		log.debug("LogicalPlan: " + lp);
@@ -286,7 +286,7 @@
 			isInner[i] = gi.isInner;
 		}
 		
-		LogicalOperator cogroup = new LOCogroup(lp, new OperatorKey(scope, getNextId()), groupByPlans, isInner);
+		LogicalOperator cogroup = new LOCogroup(lp, new OperatorKey(scope, getNextId()), groupByPlans, type, isInner);
 		lp.add(cogroup);
 		log.debug("Added operator " + cogroup.getClass().getName() + " object " + cogroup + " to the logical plan " + lp);
 		
@@ -388,7 +388,7 @@
         for (int i = 0; i < n; i++) {
 			(gis.get(i)).isInner = true;
         }
-		LogicalOperator cogroup = parseCogroup(gis, lp);
+		LogicalOperator cogroup = parseCogroup(gis, lp, LOCogroup.GROUPTYPE.REGULAR);
 		lp.add(cogroup);
 		log.debug("Added operator " + cogroup.getClass().getName() + " to the logical plan");
 		
@@ -676,7 +676,21 @@
         }
         log.trace("Exiting attachPlan");
     }
-	
+
+    boolean isColumnProjectionsOrStar(CogroupInput cgi) {
+        if (cgi == null || cgi.plans == null || cgi.plans.size() == 0) {
+            return false;
+        }
+        for (LogicalPlan keyPlan: cgi.plans) {
+            for (LogicalOperator op : keyPlan) {
+                if(!(op instanceof LOProject)) {
+                    return false;
+                }
+            }
+        }
+        return true;    
+    }
+
 }
 
 
@@ -1623,20 +1637,40 @@
 
 LogicalOperator CogroupClause(LogicalPlan lp) : 
 {
-	CogroupInput gi; 
-	ArrayList<CogroupInput> gis = new ArrayList<CogroupInput>(); 
-	LogicalOperator cogroup; 
-	log.trace("Entering CoGroupClause");
+    CogroupInput gi; 
+    ArrayList<CogroupInput> gis = new ArrayList<CogroupInput>(); 
+    LogicalOperator cogroup = null; 
+    log.trace("Entering CoGroupClause");
 }
 {
 
-	(gi = GroupItem(lp) { gis.add(gi); }
-	("," gi = GroupItem(lp) { gis.add(gi); })*)
-	{
-		cogroup = parseCogroup(gis, lp);
-		log.trace("Exiting CoGroupClause");
-		return cogroup;		
-	}
+    (gi = GroupItem(lp) { gis.add(gi); }
+        ("," gi = GroupItem(lp) { gis.add(gi); })*
+        (
+            [<USING> ("\"collected\"" { 
+                if (gis.size() != 1) {
+                    throw new ParseException("Collected group is only supported for single input");  
+                }
+                if (!isColumnProjectionsOrStar(gis.get(0))) {
+                    throw new ParseException("Collected group is only supported for columns or star projection");
+                }
+                cogroup = parseCogroup(gis, lp, LOCogroup.GROUPTYPE.COLLECTED);
+                }
+                )
+            ]                                                                        
+        )
+    )
+
+    {
+        if (cogroup != null) {
+            log.trace("Exiting CoGroupClause");
+            return cogroup;
+        }
+
+        cogroup = parseCogroup(gis, lp, LOCogroup.GROUPTYPE.REGULAR);
+        log.trace("Exiting CoGroupClause");
+        return cogroup;		
+    }
 
 }
 

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java?rev=828825&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestCollectedGroup.java Thu Oct 22 19:32:22 2009
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.*;
+import java.util.Iterator;
+import java.util.List;
+
+import junit.framework.Assert;
+import junit.framework.TestCase;
+
+import org.apache.pig.EvalFunc;
+import org.apache.pig.ExecType;
+import org.apache.pig.FuncSpec;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.test.utils.TestHelper;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.plan.OperatorKey;
+import org.junit.After;
+import org.junit.Before;
+
+public class TestCollectedGroup extends TestCase {
+    private static final String INPUT_FILE = "MapSideGroupInput.txt";
+    
+    private PigServer pigServer;
+    private MiniCluster cluster = MiniCluster.buildCluster();
+
+    public TestCollectedGroup() throws ExecException, IOException{
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+    }
+    
+    @Before
+    public void setUp() throws Exception {
+        createFiles();
+    }
+
+    private void createFiles() throws IOException {
+        PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
+        w.println("100\tapple1\t95");
+        w.println("100\tapple2\t83");
+        w.println("100\tapple2\t74");
+        w.println("200\torange1\t100");
+        w.println("200\torange2\t89");
+        w.println("300\tstrawberry\t64");      
+        w.println("300\tstrawberry\t64");      
+        w.println("300\tstrawberry\t76");      
+        w.println("400\tpear\t78");
+        w.close();
+        
+        Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
+    }
+    
+    @After
+    public void tearDown() throws Exception {
+        new File(INPUT_FILE).delete();
+        Util.deleteFile(cluster, INPUT_FILE);
+    }
+    
+    public void testPOMapsideGroupNoNullPlans() throws IOException {
+        POCollectedGroup pmg = new POCollectedGroup(new OperatorKey());
+        List<PhysicalPlan> plans = pmg.getPlans();
+
+        Assert.assertTrue(plans != null);
+        Assert.assertTrue(plans.size() == 0);
+    }      
+     
+    public void testMapsideGroupParserNoSupportForMultipleInputs() throws IOException {
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);");
+        pigServer.registerQuery("B = LOAD '" + INPUT_FILE + "' as (id, name, grade);");
+    
+        try {
+            pigServer.registerQuery("C = group A by id, B by id using \"collected\";");
+            fail("Pig doesn't support multi-input collected group.");
+        } catch (Exception e) {
+             Assert.assertEquals(e.getMessage(), 
+                "Error during parsing. Collected group is only supported for single input");
+        }
+    }
+    
+    public void testMapsideGroupParserNoSupportForGroupAll() throws IOException {
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);");
+    
+        try {
+            pigServer.registerQuery("B = group A all using \"collected\";");
+            fail("Pig doesn't support collected group all.");
+        } catch (Exception e) {
+            Assert.assertEquals(e.getMessage(), 
+                "Error during parsing. Collected group is only supported for columns or star projection");
+        }
+    }
+     
+    public void testMapsideGroupParserNoSupportForByExpression() throws IOException {
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);");
+    
+        try {
+            pigServer.registerQuery("B = group A by id*grade using \"collected\";");
+            fail("Pig doesn't support collected group by expression.");
+        } catch (Exception e) {
+            Assert.assertEquals(e.getMessage(), 
+                "Error during parsing. Collected group is only supported for columns or star projection");
+        }
+    }
+
+    public void testMapsideGroupByOneColumn() throws IOException{
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);");
+    
+        try {
+            DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
+            DataBag dbshj = BagFactory.getInstance().newDefaultBag();
+            {
+                pigServer.registerQuery("B = group A by id using \"collected\";");
+                pigServer.registerQuery("C = foreach B generate group, COUNT(A);");
+                Iterator<Tuple> iter = pigServer.openIterator("C");
+
+                while (iter.hasNext()) {
+                    dbfrj.add(iter.next());
+                }
+            }
+            {
+                pigServer.registerQuery("D = group A by id;");
+                pigServer.registerQuery("E = foreach D generate group, COUNT(A);");
+                Iterator<Tuple> iter = pigServer.openIterator("E");
+
+                while (iter.hasNext()) {
+                    dbshj.add(iter.next());
+                }
+            }
+            Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
+            Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+ 
+    public void testMapsideGroupByMultipleColumns() throws IOException{
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);");
+    
+        try {
+            DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
+            DataBag dbshj = BagFactory.getInstance().newDefaultBag();
+            {
+                pigServer.registerQuery("B = group A by (id, name) using \"collected\";");
+                pigServer.registerQuery("C = foreach B generate group, COUNT(A);");
+                Iterator<Tuple> iter = pigServer.openIterator("C");
+
+                while (iter.hasNext()) {
+                    dbfrj.add(iter.next());
+                }
+            }
+            {
+                pigServer.registerQuery("D = group A by (id, name);");
+                pigServer.registerQuery("E = foreach D generate group, COUNT(A);");
+                Iterator<Tuple> iter = pigServer.openIterator("E");
+
+                while (iter.hasNext()) {
+                    dbshj.add(iter.next());
+                }
+            }
+            Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
+            Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+  
+    public void testMapsideGroupByStar() throws IOException{
+        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+
+        pigServer.registerQuery("A = LOAD '" + INPUT_FILE + "' as (id, name, grade);");
+    
+        try {
+            DataBag dbfrj = BagFactory.getInstance().newDefaultBag();
+            DataBag dbshj = BagFactory.getInstance().newDefaultBag();
+            {
+                pigServer.registerQuery("B = group A by * using \"collected\";");
+                pigServer.registerQuery("C = foreach B generate group, COUNT(A);");
+                Iterator<Tuple> iter = pigServer.openIterator("C");
+
+                while (iter.hasNext()) {
+                    dbfrj.add(iter.next());
+                }
+            }
+            {
+                pigServer.registerQuery("D = group A by *;");
+                pigServer.registerQuery("E = foreach D generate group, COUNT(A);");
+                Iterator<Tuple> iter = pigServer.openIterator("E");
+
+                while (iter.hasNext()) {
+                    dbshj.add(iter.next());
+                }
+            }
+            Assert.assertTrue(dbfrj.size()>0 && dbshj.size()>0);
+            Assert.assertEquals(true, TestHelper.compareBags(dbfrj, dbshj));
+
+        } catch (Exception e) {
+            fail(e.getMessage());
+        }
+    }
+
+}